Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import base64
- import cffi
- ffi = cffi.FFI()
- ffi.cdef("""
- typedef struct bio_st BIO;
- typedef struct bio_method_st BIO_METHOD;
- typedef struct env_md_st EVP_MD;
- typedef struct ssl_ctx_st SSL_CTX;
- typedef struct ssl_method_st SSL_METHOD;
- typedef struct ssl_st SSL;
- typedef struct x509_st X509;
- typedef struct x509_store_ctx_st X509_STORE_CTX;
- BIO_METHOD *BIO_s_mem(void);
- BIO *BIO_new(BIO_METHOD *type);
- int BIO_free(BIO *a);
- size_t BIO_ctrl_pending(BIO *b);
- int BIO_read(BIO *b, void *data, int len);
- int BIO_write(BIO *b, const void *data, int len);
- const SSL_METHOD *DTLSv1_method(void);
- SSL_CTX *SSL_CTX_new(const SSL_METHOD *meth);
- void SSL_CTX_free(SSL_CTX *);
- long SSL_CTX_ctrl(SSL_CTX *ctx, int cmd, long larg, void *parg);
- int SSL_CTX_set_cipher_list(SSL_CTX *, const char *str);
- int SSL_CTX_set_tlsext_use_srtp(SSL_CTX *ctx, const char *profiles);
- void SSL_CTX_set_verify(SSL_CTX *ctx, int mode,
- int (*verify_callback)(int, X509_STORE_CTX *));
- int SSL_CTX_use_certificate_file(SSL_CTX *ctx, const char *file, int type);
- int SSL_CTX_use_PrivateKey_file(SSL_CTX *ctx, const char *file, int type);
- SSL *SSL_new(SSL_CTX *ctx);
- void SSL_free(SSL *ssl);
- int SSL_do_handshake(SSL *s);
- int SSL_export_keying_material(SSL *s, unsigned char *out, size_t olen,
- const char *label, size_t llen,
- const unsigned char *context,
- size_t contextlen, int use_context);
- X509 *SSL_get_certificate(const SSL *ssl);
- int SSL_get_error(const SSL *s, int ret_code);
- X509 *SSL_get_peer_certificate(const SSL *ssl);
- void SSL_set_accept_state(SSL *s);
- void SSL_set_bio(SSL *s, BIO *rbio, BIO *wbio);
- void SSL_set_connect_state(SSL *s);
- const EVP_MD *EVP_get_digestbyname(const char *name);
- int X509_digest(const X509 *data, const EVP_MD *type,
- unsigned char *md, unsigned int *len);
- """)
- lib = ffi.dlopen('libssl.so.1.1')
- EVP_MAX_MD_SIZE = 36
- SSL_CTRL_GET_READ_AHEAD = 40
- SSL_CTRL_SET_READ_AHEAD = 41
- SSL_ERROR_WANT_READ = 2
- SSL_ERROR_WANT_WRITE = 3
- SSL_FILETYPE_PEM = 1
- SSL_VERIFY_PEER = 1
- SSL_VERIFY_FAIL_IF_NO_PEER_CERT = 2
- SRTP_KEY_LEN = 16
- SRTP_SALT_LEN = 14
- def certificate_digest(x509):
- digest = lib.EVP_get_digestbyname(b'SHA256')
- if digest == ffi.NULL:
- raise ValueError("No such digest method")
- result_buffer = ffi.new("unsigned char[]", EVP_MAX_MD_SIZE)
- result_length = ffi.new("unsigned int[]", 1)
- result_length[0] = len(result_buffer)
- digest_result = lib.X509_digest(x509, digest, result_buffer, result_length)
- assert digest_result == 1
- return b":".join([
- base64.b16encode(ch).upper() for ch
- in ffi.buffer(result_buffer, result_length[0])]).decode('ascii')
- def get_srtp_key_salt(dest, src, idx):
- # key
- start = idx * SRTP_KEY_LEN
- dest[0:SRTP_KEY_LEN] = src[start:start + SRTP_KEY_LEN]
- # salt
- start = 2 * SRTP_KEY_LEN + idx * SRTP_SALT_LEN
- dest[SRTP_KEY_LEN:SRTP_KEY_LEN + SRTP_SALT_LEN] = src[start:start + SRTP_SALT_LEN]
- @ffi.callback('int(int, X509_STORE_CTX *)')
- def verify_callback(x, y):
- return 1
- class DtlsSrtpContext:
- def __init__(self):
- self.ctx = lib.SSL_CTX_new(lib.DTLSv1_method())
- lib.SSL_CTX_set_verify(self.ctx, SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT,
- verify_callback)
- if not lib.SSL_CTX_use_certificate_file(self.ctx, b'bogus-client.crt', SSL_FILETYPE_PEM):
- print("SSL could not use certificate")
- if not lib.SSL_CTX_use_PrivateKey_file(self.ctx, b'bogus-client.key', SSL_FILETYPE_PEM):
- print("SSL could not use private key")
- if not lib.SSL_CTX_set_cipher_list(self.ctx, b'HIGH:!CAMELLIA:!aNULL'):
- print("SSL could not set cipher list")
- if lib.SSL_CTX_set_tlsext_use_srtp(self.ctx, b'SRTP_AES128_CM_SHA1_80'):
- print("SSL could not enable SRTP extension")
- if lib.SSL_CTX_ctrl(self.ctx, SSL_CTRL_SET_READ_AHEAD, 1, ffi.NULL):
- print("SSL could not enable read ahead")
- def close(self):
- lib.SSL_CTX_free(self.ctx)
- class DtlsSrtpSession:
- def __init__(self, context, is_server, transport):
- self.encrypted = False
- self.is_server = is_server
- self.remote_fingerprint = None
- self.ssl = lib.SSL_new(context.ctx)
- self.srtp_tx_key = ffi.new('char[]', SRTP_KEY_LEN + SRTP_SALT_LEN)
- self.srtp_rx_key = ffi.new('char[]', SRTP_KEY_LEN + SRTP_SALT_LEN)
- self.transport = transport
- self.read_bio = lib.BIO_new(lib.BIO_s_mem())
- self.write_bio = lib.BIO_new(lib.BIO_s_mem())
- lib.SSL_set_bio(self.ssl, self.read_bio, self.write_bio)
- if self.is_server:
- lib.SSL_set_accept_state(self.ssl)
- else:
- lib.SSL_set_connect_state(self.ssl)
- @property
- def local_fingerprint(self):
- x509 = lib.SSL_get_certificate(self.ssl)
- return certificate_digest(x509)
- async def connect(self):
- while not self.encrypted:
- result = lib.SSL_do_handshake(self.ssl)
- if result > 0:
- self.encrypted = True
- break
- error = lib.SSL_get_error(self.ssl, result)
- if error == SSL_ERROR_WANT_READ:
- data = await self.transport.recv()
- lib.BIO_write(self.read_bio, data, len(data))
- elif error != SSL_ERROR_WANT_WRITE:
- raise Exception('DTLS handshake failed (error %d)' % error)
- pending = lib.BIO_ctrl_pending(self.write_bio)
- if pending > 0:
- buf = ffi.new("char[]", pending)
- lib.BIO_read(self.write_bio, buf, len(buf))
- data = b''.join(buf)
- await self.transport.send(data)
- # check remote fingerprint
- x509 = lib.SSL_get_peer_certificate(self.ssl)
- remote_fingerprint = certificate_digest(x509)
- if remote_fingerprint != self.remote_fingerprint.upper():
- raise Exception('DTLS fingerprint does not match')
- # generate keying material
- buf = ffi.new("char[]", 2 * (SRTP_KEY_LEN + SRTP_SALT_LEN))
- extractor = b'EXTRACTOR-dtls_srtp'
- if not lib.SSL_export_keying_material(self.ssl, buf, len(buf),
- extractor, len(extractor),
- ffi.NULL, 0, 0):
- raise Exception('DTLS could not extract SRTP keying material')
- if self.is_server:
- get_srtp_key_salt(self.srtp_tx_key, buf, 1)
- get_srtp_key_salt(self.srtp_rx_key, buf, 0)
- else:
- get_srtp_key_salt(self.srtp_tx_key, buf, 0)
- get_srtp_key_salt(self.srtp_rx_key, buf, 1)
- print('DTLS handshake complete')
- def close(self):
- lib.SSL_free(self.ssl)
- if __name__ == '__main__':
- context = DtlsSrtpContext()
- session = DtlsSrtpSession(context, is_server=True, transport=None)
- print(session.local_fingerprint)
- session.close()
- context.close()
Add Comment
Please, Sign In to add comment