Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- from concurrent.futures import ThreadPoolExecutor
- import asyncio
- import os
- io_thread_pool = ThreadPoolExecutor()
- @asyncio.coroutine
- def verify_chain(*paths, loop=None):
- if loop is None:
- loop = asyncio.get_event_loop()
- chain = b''
- for path in paths:
- with open(path, 'rb') as pem:
- line = yield from loop.run_in_executor(io_thread_pool, pem.read)
- chain += line
- chain += b'\n'
- p = yield from asyncio.create_subprocess_exec(
- 'certtool', '--verify-chain', stdin=asyncio.subprocess.PIPE)
- yield from p.communicate(chain)
- assert p.returncode == 0
- @asyncio.coroutine
- def ensure_private_key(path):
- if os.path.exists(path):
- return
- p = yield from asyncio.create_subprocess_exec(
- 'certtool', '--generate-privkey', '--outfile', path)
- yield from p.wait()
- assert p.returncode == 0
- @asyncio.coroutine
- def ensure_self_signed_certificate(certpath, keypath, template):
- yield from ensure_private_key(keypath)
- if os.path.exists(certpath):
- return
- p = yield from asyncio.create_subprocess_exec(
- 'certtool', '--generate-self-signed', '--outfile', certpath,
- '--load-privkey', keypath, '--template', template)
- yield from p.wait()
- assert p.returncode == 0
- @asyncio.coroutine
- def ensure_ca_certificate(capem, cakey):
- yield from ensure_self_signed_certificate(capem, cakey, 'ca.template')
- @asyncio.coroutine
- def ensure_certificate(certpath, keypath, capem, cakey, template):
- yield from ensure_private_key(keypath)
- if os.path.exists(certpath):
- return
- p = yield from asyncio.create_subprocess_exec(
- 'certtool', '--generate-certificate',
- '--outfile', certpath, '--load-privkey', keypath,
- '--load-ca-certificate', capem, '--load-ca-privkey', cakey,
- '--template', template)
- yield from p.wait()
- assert p.returncode == 0
- @asyncio.coroutine
- def main():
- yield from ensure_ca_certificate('ca.pem', 'ca.key')
- yield from asyncio.gather(
- ensure_certificate('server.pem', 'server.key', 'ca.pem', 'ca.key', 'server.template'),
- ensure_certificate('client.pem', 'client.key', 'ca.pem', 'ca.key', 'client.template'))
- yield from asyncio.gather(
- verify_chain('ca.pem'),
- verify_chain('server.pem', 'ca.pem'),
- verify_chain('client.pem', 'ca.pem'))
- if __name__ == '__main__':
- loop = asyncio.get_event_loop()
- try:
- loop.run_until_complete(main())
- finally:
- loop.close()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement