Advertisement
aviral2809

Untitled

Nov 17th, 2024
28
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 6.22 KB | None | 0 0
  1. from threading import Lock, RLock
  2. from typing import Any, Dict, Optional, TypeVar
  3. import threading
  4. from collections import defaultdict
  5. import time
  6. from concurrent.futures import ThreadPoolExecutor
  7. import queue
  8.  
  9. K = TypeVar('K')
  10. V = TypeVar('V')
  11.  
  12. class StripedLock:
  13.     """
  14.    Implements lock striping for more granular concurrency control.
  15.    Inspired by ConcurrentHashMap's segment locking in Java.
  16.    """
  17.     def __init__(self, num_stripes: int = 32):
  18.         self.num_stripes = num_stripes
  19.         self.locks = [RLock() for _ in range(num_stripes)]
  20.    
  21.     def get_lock(self, key: Any) -> Lock:
  22.         # Use hash of key to determine stripe
  23.         stripe = hash(key) % self.num_stripes
  24.         return self.locks[stripe]
  25.  
  26. class ConcurrentKVStore:
  27.     """
  28.    High-performance concurrent key-value store optimized for both reads and writes.
  29.    Features:
  30.    - Lock striping for fine-grained concurrency control
  31.    - Read-write separation using double buffering for bulk operations
  32.    - Configurable read/write optimization
  33.    - Lock-free reads for read-heavy workloads
  34.    """
  35.     def __init__(self, read_optimized: bool = True):
  36.         self.read_optimized = read_optimized
  37.         self.data: Dict[K, V] = {}
  38.         self.striped_locks = StripedLock()
  39.         self.stats = defaultdict(int)
  40.         self.stats_lock = Lock()
  41.        
  42.         # For read-optimization using double-buffering
  43.         self.read_buffer: Dict[K, V] = {}
  44.         self.buffer_lock = Lock()
  45.         self.last_sync = time.time()
  46.         self.sync_interval = 0.1  # 100ms
  47.        
  48.         # Request queues for async processing
  49.         self.write_queue = queue.Queue()
  50.         self.read_queue = queue.Queue()
  51.        
  52.         # Start background workers
  53.         self.start_background_workers()
  54.  
  55.     def start_background_workers(self):
  56.         """Initialize background workers for async operations"""
  57.         self.executor = ThreadPoolExecutor(max_workers=4)
  58.         self.executor.submit(self._write_worker)
  59.         self.executor.submit(self._read_worker)
  60.         if self.read_optimized:
  61.             self.executor.submit(self._buffer_sync_worker)
  62.  
  63.     def _write_worker(self):
  64.         """Background worker for processing write requests"""
  65.         while True:
  66.             try:
  67.                 key, value, result_future = self.write_queue.get()
  68.                 try:
  69.                     self.put_sync(key, value)
  70.                     result_future.set_result(True)
  71.                 except Exception as e:
  72.                     result_future.set_exception(e)
  73.                 self.write_queue.task_done()
  74.             except Exception:
  75.                 continue
  76.  
  77.     def _read_worker(self):
  78.         """Background worker for processing read requests"""
  79.         while True:
  80.             try:
  81.                 key, result_future = self.read_queue.get()
  82.                 try:
  83.                     value = self.get_sync(key)
  84.                     result_future.set_result(value)
  85.                 except Exception as e:
  86.                     result_future.set_exception(e)
  87.                 self.read_queue.task_done()
  88.             except Exception:
  89.                 continue
  90.  
  91.     def _buffer_sync_worker(self):
  92.         """Background worker for syncing read buffer"""
  93.         while True:
  94.             time.sleep(self.sync_interval)
  95.             current_time = time.time()
  96.             if current_time - self.last_sync >= self.sync_interval:
  97.                 with self.buffer_lock:
  98.                     self.read_buffer = self.data.copy()
  99.                 self.last_sync = current_time
  100.  
  101.     def put_sync(self, key: K, value: V) -> None:
  102.         """Synchronous put operation with fine-grained locking"""
  103.         with self.striped_locks.get_lock(key):
  104.             self.data[key] = value
  105.             with self.stats_lock:
  106.                 self.stats['writes'] += 1
  107.  
  108.     def get_sync(self, key: K) -> Optional[V]:
  109.         """Synchronous get operation with optimized reading strategy"""
  110.         with self.stats_lock:
  111.             self.stats['reads'] += 1
  112.            
  113.         if self.read_optimized:
  114.             # Try reading from buffer first (lock-free)
  115.             value = self.read_buffer.get(key)
  116.             if value is not None:
  117.                 return value
  118.        
  119.         # Fall back to main storage with fine-grained locking
  120.         with self.striped_locks.get_lock(key):
  121.             return self.data.get(key)
  122.  
  123.     async def put(self, key: K, value: V) -> None:
  124.         """Asynchronous put operation"""
  125.         future = self.executor.submit(self.put_sync, key, value)
  126.         return await future
  127.  
  128.     async def get(self, key: K) -> Optional[V]:
  129.         """Asynchronous get operation"""
  130.         future = self.executor.submit(self.get_sync, key)
  131.         return await future
  132.  
  133.     def get_stats(self) -> Dict[str, int]:
  134.         """Get current statistics"""
  135.         with self.stats_lock:
  136.             return dict(self.stats)
  137.  
  138. # Example usage and testing
  139. def run_concurrent_test():
  140.     """Test concurrent operations on the KV store"""
  141.     store = ConcurrentKVStore(read_optimized=True)
  142.     num_threads = 4
  143.     operations_per_thread = 10000
  144.    
  145.     def writer_task():
  146.         for i in range(operations_per_thread):
  147.             key = f"key_{threading.get_ident()}_{i}"
  148.             store.put_sync(key, i)
  149.    
  150.     def reader_task():
  151.         for i in range(operations_per_thread):
  152.             key = f"key_{threading.get_ident()}_{i}"
  153.             store.get_sync(key)
  154.    
  155.     threads = []
  156.     start_time = time.time()
  157.    
  158.     # Create reader and writer threads
  159.     for _ in range(num_threads):
  160.         threads.append(threading.Thread(target=writer_task))
  161.         threads.append(threading.Thread(target=reader_task))
  162.    
  163.     # Start all threads
  164.     for thread in threads:
  165.         thread.start()
  166.    
  167.     # Wait for all threads to complete
  168.     for thread in threads:
  169.         thread.join()
  170.    
  171.     end_time = time.time()
  172.     duration = end_time - start_time
  173.    
  174.     stats = store.get_stats()
  175.     print(f"Test completed in {duration:.2f} seconds")
  176.     print(f"Total operations: {stats['reads'] + stats['writes']}")
  177.     print(f"Operations per second: {(stats['reads'] + stats['writes']) / duration:.2f}")
  178.    
  179. if __name__ == "__main__":
  180.     run_concurrent_test()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement