"""Simple test suite for ngx_http_wsgi_module.
THIS SOFTWARE IS UNDER MIT LICENSE.
Copyright (c) 2010 Perillo Manlio (manlio.perillo@gmail.com)
Read LICENSE file for more informations.
"""
from greenlet import getcurrent, greenlet
class middleware(object):
"""A middleware that use greenlet to provide a write callable with
the correct code flow.
"""
def __init__(self, application):
self.app = application
def __call__(self, environ, start_response):
def _start_response(status, headers):
start_response(status, headers)
return self.write
# Initialize greenlet and state
gp = greenlet(self.app)
self.write_allowed = True
r = gp.switch(environ, _start_response)
while isinstance(r, str):
# Got data from the write callable
yield r
r = gp.switch()
# The application returned the app_iter object
# From now on, it can no more call the write callable
# XXX check me
self.write_allowed = False
for buf in r:
yield buf
def write(self, buf):
gp = getcurrent()
if self.write_allowed:
gp.parent.switch(buf)
else:
exc = ValueError(
'write callable called from application iterator')
gp.throw(exc)
BUFSIZE = 128
N = 50
@middleware
def application(environ, start_response):
"""A simple WSGI application that use the write callable.
"""
headers = [('Content-Type', 'text/plain')]
write = start_response('200 OK', headers)
for i in range(0, N):
write('%02d' % i * BUFSIZE + '\n')
return ['Test write greenlet']
@middleware
def application_generator(environ, start_response):
"""A simple WSGI application that use the write callable and
return a generator.
"""
def app_iter():
# write('foo')
yield 'Test write greenlet generator\n'
for i in range(ord('a'), ord('z') + 1):
yield chr(i) * BUFSIZE + '\n'
headers = [('Content-Type', 'text/plain')]
write = start_response('200 OK', headers)
for i in range(0, N):
write('%02d' % i * BUFSIZE + '\n')
return app_iter()