Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """
- this snippet demonstrates cooperative yielding to put a pythonic face on cuda streams,
- and their hacky C-style state-machine bullshit interface.
- One context per device in one process is usually a sensible design pattern,
- but while one can do multiple processes and context per device, this has both a rather
- high overhead on both host and device; but more importantly, does not allow for the mempool
- class to work its magic when farming out embarrasingly parallel kernels.
- In theory, threads should be a good match to cuda streams;
- However, the cuda api is not threadsafe.
- Cooperative yielding solves this. By simply yielding at set times, like after all async calls,
- we keep our GPU well queued with work, while the driver is just dealing with serial code,
- for all it knows.
- So far it only demonstrates how to handle workflows which are embarassingly parallel
- but that is the use case I encounter most often
- But with a little more tinkering, a general forking mechanism could be implemented;
- or something yet more elegant
- """
- import numpy as np
- import gevent
- from time import sleep, clock
- import pycuda.driver as drv
- drv.init()
- from pycuda import gpuarray
- class Context(object):
- """
- context wrapping object
- wraps a context, associated mempool, and exposes allocators acting on that mempool
- """
- def __init__(self, device=0):
- from pycuda.tools import DeviceMemoryPool
- self.device = drv.Device(device)
- self.context = self.device.make_context()
- self.memory_pool = DeviceMemoryPool()
- def empty(self, shape, dtype):
- return gpuarray.GPUArray(shape, dtype, allocator = self.memory_pool.allocate)
- def empty_like(self, arr):
- return gpuarray.empty_like(arr)
- def array(self, obj, stream = None):
- """copy constructor"""
- if isinstance(obj, gpuarray.GPUArray):
- arr = obj._new_like_me()
- return arr
- class GreenStream(gevent.Greenlet):
- """
- abstract class
- associates a greenlet with a backend-specific stream/queue object
- can be used to provide a backend neutral interface to the combination of both
- """
- def __init__(self, context):
- super(GreenStream, self).__init__()
- self.context = context
- self.queue = drv.Stream()
- def Yield(self):
- """canonical yield in gevent"""
- gevent.sleep()
- def _run(self):
- raise NotImplementedError()
- class CustomStream(GreenStream):
- def __init__(self, context, index):
- super(CustomStream, self).__init__(context)
- self.index = index
- def _run(self):
- """
- method to put body code; analogous to thread/process
- """
- print 'stream start', self.index
- print clock()
- shape = 10,10,8
- shape = 1000,1000,8
- arr = self.context.empty(shape, np.float32)
- out = self.context.array(arr)
- #submit an async command on this stream and yield
- arr.fill(self.index, self.queue)
- print 'fill kernel enqueued in stream', self.index
- print clock()
- self.Yield()
- #we are free to issue a bunch of commands before yielding
- for i in range(3):
- arr._axpbz(self.index, 0, out, self.queue)
- print 'multiply kernel enqueued in stream', self.index
- print clock()
- self.Yield()
- #typically, most streams are not done yet by the time we get here, which proves interleaving is going on
- print 'stream', self.index, 'done',self.queue.is_done()
- print 'stream end', self.index
- print clock()
- context = Context(device = 0)
- pool = [CustomStream(context, i) for i in xrange(10)]
- for s in pool: s.start()
- #main loop
- while not all(s.ready() for s in pool):
- gevent.sleep()
- """
- deprecated, non OOP interface
- """
- ##def greenstream(index):
- ## print 'stream start', index
- ## stream = drv.Stream()
- ##
- ## shape = 1000,1000,8
- ##
- ## arr = gpuarray.GPUArray(shape, np.float32, allocator = memory_pool.allocate)
- ## out = arr._new_like_me()
- ##
- ## arr.fill(index, stream)
- ## gevent.sleep()
- #### print arr
- ## arr._axpbz(index, 0, out, stream)
- ## print 'kernel enqueued in stream', index
- ## gevent.sleep()
- ## #uncomment to serialize
- #### stream.synchronize()
- ## #typically, most streams are not done yet by the time we get here, which proves interleaving is going on
- ## print 'stream', index, 'done',stream.is_done()
- #### print out
- ## print 'stream end', index
- ##quit()
- ##pool = [gevent.spawn(greenstream, i) for i in range(10)]
- ##
- ###main loop
- ##while not all(s.ready() for s in pool):
- ## gevent.sleep()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement