Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- import ssl
- async def wholething():
- async def handle_connection(reader, writer):
- addr = writer.get_extra_info('peername')
- data = await reader.readline()
- print(f'Server received {data!r} from {addr}')
- writer.write(b'pong\n')
- await writer.drain()
- writer.close()
- print('Server done')
- server_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
- server_context.load_cert_chain('/tmp/ssl/snakeoil.crt', '/tmp/ssl/snakeoil.key')
- server = await asyncio.start_server(handle_connection, '127.0.0.1', ssl=server_context, )
- port = server.sockets[0].getsockname()[1]
- print(f'Serving on {server.sockets[0].getsockname()}')
- async def tcp_echo_client():
- _NAIVE_SSL_CONTEXT = ssl.SSLContext()
- _NAIVE_SSL_CONTEXT.check_hostname = False
- reader, writer = await asyncio.open_connection('localhost', port, ssl=_NAIVE_SSL_CONTEXT, )
- writer.write(b'ping\n')
- await writer.drain()
- data = await reader.readline()
- print(f'Client received {data!r} from server')
- writer.close()
- print('Client done')
- await tcp_echo_client()
Add Comment
Please, Sign In to add comment