TechGeek

Untitled

May 22nd, 2020
177
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C# 4.61 KB | None | 0 0
  1. public class ScyllaUtils
  2. {
  3.     private const string itemMapStmt = "FROM product_data WHERE client_id = ? PER PARTITION LIMIT 1";
  4.     private const string itemSelectStatement = "FROM item WHERE item_id = ? PER PARTITION LIMIT 1";
  5.     private const string itemProStmt = "FROM process_holder WHERE item_id = ? AND process_id = ? PER PARTITION LIMIT 1";
  6.  
  7.     private readonly IResponseMapper rpm;
  8.     private readonly Cluster cluster;
  9.     private readonly ISession session;
  10.     private readonly IMapper mapper;
  11.  
  12.     public ScyllaUtils(string endpoints, IResponseMapper responseMapper)
  13.     {
  14.         rpm = responseMapper;
  15.      
  16.         var poolOptions = new PoolingOptions()
  17.             .SetMaxConnectionsPerHost(HostDistance.Remote, 20)
  18.             .SetMaxConnectionsPerHost(HostDistance.Local, 20)
  19.             .SetMaxRequestsPerConnection(1000);
  20.  
  21.         cluster = Cluster.Builder()
  22.             .AddContactPoints(endpoints)
  23.             .WithApplicationName("Test")
  24.             .WithReconnectionPolicy(new ConstantReconnectionPolicy(100L))
  25.             .WithLoadBalancingPolicy(new TokenAwarePolicy(new DCAwareRoundRobinPolicy("dc5")))
  26.             .WithPoolingOptions(poolOptions)
  27.             .WithQueryOptions(new QueryOptions()
  28.                 .SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
  29.                 .SetSerialConsistencyLevel(ConsistencyLevel.LocalSerial))
  30.             .WithCompression(CompressionType.LZ4)
  31.             .WithSocketOptions(new SocketOptions().SetReadTimeoutMillis(6000))
  32.             .Build();
  33.  
  34.         var keyspace = Environment.GetEnvironmentVariable("keyspace_name");
  35.         session = cluster.Connect(keyspace);
  36.         session.UserDefinedTypes.Define(
  37.             UdtMap.For<ASHelper>("as")
  38.                 .Map(a => a.Url, "url").Map(a => a.Id, "id"),
  39.             UdtMap.For<MonthlyProcess>("monthly_process")
  40.         );
  41.  
  42.         mapper = new Mapper(session);
  43.     }
  44.  
  45.     /**
  46.      *
  47.      * Below method gets data from all 3 different tables by making multiple async calls.
  48.      *
  49.      */
  50.     public async Task<IList<Item>> GetAsync(IList<int> clientIds, int processId, int proc, Kyte kt)
  51.     {
  52.         var clientMaps = await ProcessCassQueries(clientIds, (ct, batch) => mapper.SingleOrDefaultAsync<ItemMapPoco>(itemMapStmt, batch), "GetPIMValue");
  53.  
  54.         if (clientMaps == null || clientMaps.Count <= 0)
  55.         {
  56.             return null;
  57.         }
  58.  
  59.         var itemIds = clientMaps.SelectMany(x => x.ItemIds).Where(y => y != null).ToList();
  60.  
  61.         var itemsTask = ProcessCassQueries(itemIds, (ct, batch) => mapper.SingleOrDefaultAsync<ItemPoco>(itemSelectStmt, batch), "GetAsync");
  62.  
  63.         var itemProTask = ProcessCassQueries(itemIds, (ct, batch) => mapper.SingleOrDefaultAsync<ItemProPoco>(itemProStmt, batch, processId), "GetIPValue");
  64.  
  65.         var items = await itemsTask;
  66.         if (items.Count <= 0)
  67.         {
  68.             return null;
  69.         }
  70.  
  71.         var itmDictionary = items.ToDictionary(dto => dto.ItemId, dto => rpm.MapToItem(dto, proc));
  72.         var itmProDict = itemIds.ToDictionary<int, int, ItemProPoco>(id => id, id => null);
  73.         var holder = new List<int>();
  74.  
  75.         var itemPrices = await itemProTask;
  76.         itemPrices.ForEach(i => { if (i != null) itmProDict[i.ItemId] = i; });
  77.         foreach (var ip in itmProDict) if (ip.Value == null) holder.Add(ip.Key);
  78.  
  79.         if (holder.Count > 0)
  80.         {
  81.             var ipHolder = await ProcessCassQueries(itemIds, (ct, batch) => mapper.SingleOrDefaultAsync<ItemProPoco>(itemProStmt, batch, kt.Pid), "GetIPValue");
  82.             ipHolder.ToList().ForEach(ipf => { if (ipf != null) itmProDict[ipf.ItemId] = ipf; });
  83.         }
  84.  
  85.         return itmDictionary.Select(kvp =>
  86.         {
  87.             itmProDict.TryGetValue(kvp.Key, out var ip);
  88.  
  89.             if (kvp.Value != null)
  90.             {
  91.                 rpm.convert(ip, kvp.Value);
  92.                 return kvp.Value;
  93.             }
  94.  
  95.             return null;
  96.         }).Where(s => s != null).ToList();
  97.     }
  98.  
  99.     /**
  100.      *
  101.      * Below method does multiple async calls on each table for their corresponding id's.
  102.      *
  103.      */
  104.     private async Task<List<T>> ProcessCassQueries<T>(IList<int> ids, Func<CancellationToken, int, Task<T>> mapperFunc, string msg) where T : class
  105.     {
  106.         var requestTasks = ids.Select(id => ProcessCassQuery(ct => mapperFunc(ct, id), msg));
  107.         return (await Task.WhenAll(requestTasks)).Where(e => e != null).ToList();
  108.     }
  109.  
  110.     private Task<T> ProcessCassQuery<T>(Func<CancellationToken, Task<T>> requestExecuter, string msg) where T : class
  111.     {
  112.         return requestExecuter(CancellationToken.None);
  113.     }
  114. }
Add Comment
Please, Sign In to add comment