Advertisement
EelcoHoogendoorn

pycuda and gevent

Oct 9th, 2012
235
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 4.72 KB | None | 0 0
  1.  
  2. """
  3. this snippet demonstrates cooperative yielding to put a pythonic face on cuda streams,
  4. and their hacky C-style state-machine bullshit interface.
  5.  
  6. One context per device in one process is usually a sensible design pattern,
  7. but while one can do multiple processes and context per device, this has both a rather
  8. high overhead on both host and device; but more importantly, does not allow for the mempool
  9. class to work its magic when farming out embarrasingly parallel kernels.
  10.  
  11. In theory, threads should be a good match to cuda streams;
  12. However, the cuda api is not threadsafe.
  13.  
  14. Cooperative yielding solves this. By simply yielding at set times, like after all async calls,
  15. we keep our GPU well queued with work, while the driver is just dealing with serial code,
  16. for all it knows.
  17.  
  18. So far it only demonstrates how to handle workflows which are embarassingly parallel
  19. but that is the use case I encounter most often
  20. But with a little more tinkering, a general forking mechanism could be implemented;
  21. or something yet more elegant
  22.  
  23. """
  24.  
  25. import numpy as np
  26. import gevent
  27. from time import sleep, clock
  28.  
  29. import pycuda.driver as drv
  30. drv.init()
  31. from pycuda import gpuarray
  32.  
  33.  
  34.  
  35.  
  36. class Context(object):
  37.     """
  38.    context wrapping object
  39.    wraps a context, associated mempool, and exposes allocators acting on that mempool
  40.    """
  41.     def __init__(self, device=0):
  42.         from pycuda.tools import DeviceMemoryPool
  43.  
  44.         self.device = drv.Device(device)
  45.         self.context = self.device.make_context()
  46.         self.memory_pool = DeviceMemoryPool()
  47.  
  48.     def empty(self, shape, dtype):
  49.         return gpuarray.GPUArray(shape, dtype, allocator = self.memory_pool.allocate)
  50.     def empty_like(self, arr):
  51.         return gpuarray.empty_like(arr)
  52.  
  53.     def array(self, obj, stream = None):
  54.         """copy constructor"""
  55.         if isinstance(obj, gpuarray.GPUArray):
  56.             arr = obj._new_like_me()
  57.             return arr
  58.  
  59.  
  60.  
  61.  
  62. class GreenStream(gevent.Greenlet):
  63.     """
  64.    abstract class
  65.    associates a greenlet with a backend-specific stream/queue object
  66.    can be used to provide a backend neutral interface to the combination of both
  67.    """
  68.     def __init__(self, context):
  69.         super(GreenStream, self).__init__()
  70.         self.context = context
  71.         self.queue = drv.Stream()
  72.  
  73.     def Yield(self):
  74.         """canonical yield in gevent"""
  75.         gevent.sleep()
  76.  
  77.     def _run(self):
  78.         raise NotImplementedError()
  79.  
  80.  
  81.  
  82. class CustomStream(GreenStream):
  83.  
  84.     def __init__(self, context, index):
  85.         super(CustomStream, self).__init__(context)
  86.         self.index = index
  87.  
  88.     def _run(self):
  89.         """
  90.        method to put body code; analogous to thread/process
  91.        """
  92.         print 'stream start', self.index
  93.         print clock()
  94.  
  95.         shape = 10,10,8
  96.         shape = 1000,1000,8
  97.         arr = self.context.empty(shape, np.float32)
  98.         out = self.context.array(arr)
  99.  
  100.         #submit an async command on this stream and yield
  101.         arr.fill(self.index, self.queue)
  102.         print 'fill kernel enqueued in stream', self.index
  103.         print clock()
  104.         self.Yield()
  105.  
  106.         #we are free to issue a bunch of commands before yielding
  107.         for i in range(3):
  108.             arr._axpbz(self.index, 0, out, self.queue)
  109.             print 'multiply kernel enqueued in stream', self.index
  110.             print clock()
  111.         self.Yield()
  112.  
  113.         #typically, most streams are not done yet by the time we get here, which proves interleaving is going on
  114.         print 'stream', self.index, 'done',self.queue.is_done()
  115.         print 'stream end', self.index
  116.         print clock()
  117.  
  118.  
  119.  
  120.  
  121.  
  122. context = Context(device = 0)
  123.  
  124. pool = [CustomStream(context, i) for i in xrange(10)]
  125. for s in pool: s.start()
  126.  
  127. #main loop
  128. while not all(s.ready() for s in pool):
  129.     gevent.sleep()
  130.  
  131.  
  132.  
  133.  
  134. """
  135. deprecated, non OOP interface
  136. """
  137.  
  138.  
  139. ##def greenstream(index):
  140. ##    print 'stream start', index
  141. ##    stream = drv.Stream()
  142. ##
  143. ##    shape = 1000,1000,8
  144. ##
  145. ##    arr = gpuarray.GPUArray(shape, np.float32, allocator = memory_pool.allocate)
  146. ##    out = arr._new_like_me()
  147. ##
  148. ##    arr.fill(index, stream)
  149. ##    gevent.sleep()
  150. ####    print arr
  151. ##    arr._axpbz(index, 0, out, stream)
  152. ##    print 'kernel enqueued in stream', index
  153. ##    gevent.sleep()
  154. ##    #uncomment to serialize
  155. ####    stream.synchronize()
  156. ##    #typically, most streams are not done yet by the time we get here, which proves interleaving is going on
  157. ##    print 'stream', index, 'done',stream.is_done()
  158. ####    print out
  159. ##    print 'stream end', index
  160.  
  161.  
  162. ##quit()
  163. ##pool = [gevent.spawn(greenstream, i) for i in range(10)]
  164. ##
  165. ###main loop
  166. ##while not all(s.ready() for s in pool):
  167. ##    gevent.sleep()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement