Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public class ScyllaUtils
- {
- private const string itemMapStmt = "FROM product_data WHERE client_id = ? PER PARTITION LIMIT 1";
- private const string itemSelectStatement = "FROM item WHERE item_id = ? PER PARTITION LIMIT 1";
- private const string itemProStmt = "FROM process_holder WHERE item_id = ? AND process_id = ? PER PARTITION LIMIT 1";
- private readonly IResponseMapper rpm;
- private readonly Cluster cluster;
- private readonly ISession session;
- private readonly IMapper mapper;
- public ScyllaUtils(string endpoints, IResponseMapper responseMapper)
- {
- rpm = responseMapper;
- var poolOptions = new PoolingOptions()
- .SetMaxConnectionsPerHost(HostDistance.Remote, 20)
- .SetMaxConnectionsPerHost(HostDistance.Local, 20)
- .SetMaxRequestsPerConnection(1000);
- cluster = Cluster.Builder()
- .AddContactPoints(endpoints)
- .WithApplicationName("Test")
- .WithReconnectionPolicy(new ConstantReconnectionPolicy(100L))
- .WithLoadBalancingPolicy(new TokenAwarePolicy(new DCAwareRoundRobinPolicy("dc5")))
- .WithPoolingOptions(poolOptions)
- .WithQueryOptions(new QueryOptions()
- .SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
- .SetSerialConsistencyLevel(ConsistencyLevel.LocalSerial))
- .WithCompression(CompressionType.LZ4)
- .WithSocketOptions(new SocketOptions().SetReadTimeoutMillis(6000))
- .Build();
- var keyspace = Environment.GetEnvironmentVariable("keyspace_name");
- session = cluster.Connect(keyspace);
- session.UserDefinedTypes.Define(
- UdtMap.For<ASHelper>("as")
- .Map(a => a.Url, "url").Map(a => a.Id, "id"),
- UdtMap.For<MonthlyProcess>("monthly_process")
- );
- mapper = new Mapper(session);
- }
- /**
- *
- * Below method gets data from all 3 different tables by making multiple async calls.
- *
- */
- public async Task<IList<Item>> GetAsync(IList<int> clientIds, int processId, int proc, Kyte kt)
- {
- var clientMaps = await ProcessCassQueries(clientIds, (ct, batch) => mapper.SingleOrDefaultAsync<ItemMapPoco>(itemMapStmt, batch), "GetPIMValue");
- if (clientMaps == null || clientMaps.Count <= 0)
- {
- return null;
- }
- var itemIds = clientMaps.SelectMany(x => x.ItemIds).Where(y => y != null).ToList();
- var itemsTask = ProcessCassQueries(itemIds, (ct, batch) => mapper.SingleOrDefaultAsync<ItemPoco>(itemSelectStmt, batch), "GetAsync");
- var itemProTask = ProcessCassQueries(itemIds, (ct, batch) => mapper.SingleOrDefaultAsync<ItemProPoco>(itemProStmt, batch, processId), "GetIPValue");
- var items = await itemsTask;
- if (items.Count <= 0)
- {
- return null;
- }
- var itmDictionary = items.ToDictionary(dto => dto.ItemId, dto => rpm.MapToItem(dto, proc));
- var itmProDict = itemIds.ToDictionary<int, int, ItemProPoco>(id => id, id => null);
- var holder = new List<int>();
- var itemPrices = await itemProTask;
- itemPrices.ForEach(i => { if (i != null) itmProDict[i.ItemId] = i; });
- foreach (var ip in itmProDict) if (ip.Value == null) holder.Add(ip.Key);
- if (holder.Count > 0)
- {
- var ipHolder = await ProcessCassQueries(itemIds, (ct, batch) => mapper.SingleOrDefaultAsync<ItemProPoco>(itemProStmt, batch, kt.Pid), "GetIPValue");
- ipHolder.ToList().ForEach(ipf => { if (ipf != null) itmProDict[ipf.ItemId] = ipf; });
- }
- return itmDictionary.Select(kvp =>
- {
- itmProDict.TryGetValue(kvp.Key, out var ip);
- if (kvp.Value != null)
- {
- rpm.convert(ip, kvp.Value);
- return kvp.Value;
- }
- return null;
- }).Where(s => s != null).ToList();
- }
- /**
- *
- * Below method does multiple async calls on each table for their corresponding id's.
- *
- */
- private async Task<List<T>> ProcessCassQueries<T>(IList<int> ids, Func<CancellationToken, int, Task<T>> mapperFunc, string msg) where T : class
- {
- var requestTasks = ids.Select(id => ProcessCassQuery(ct => mapperFunc(ct, id), msg));
- return (await Task.WhenAll(requestTasks)).Where(e => e != null).ToList();
- }
- private Task<T> ProcessCassQuery<T>(Func<CancellationToken, Task<T>> requestExecuter, string msg) where T : class
- {
- return requestExecuter(CancellationToken.None);
- }
- }
Add Comment
Please, Sign In to add comment