def getContext(self):
ctx = _create_ssl_context(self._key_filename,
self._cert_filename,
self._verify_ca_filename,
self._p12_filename,
self._verify_ca_from_p12,
self._key_pass, self._p12_pass)
if self._verify_ca_from_p12 or self._verify_ca_filename is not None:
opts = SSL.VERIFY_PEER
if self._enforce_cert:
opts |= SSL.VERIFY_FAIL_IF_NO_PEER_CERT
ctx.set_verify(opts, self._verify_callback)
return ctx
def _verify_callback(self, connection, x509, errnum, errdepth, ok):
if not ok:
log.warning("ssl-context", "Invalid certificate: %s",
x509.get_subject())
return False
return True
def _create_ssl_context(key_filename=None,
cert_filename=None,
verify_ca_filename=None,
p12_filename=None,
verify_ca_from_p12=False,
key_pass=None, p12_pass=None):
ctx = SSL.Context(SSL.SSLv3_METHOD)
if p12_filename is not None:
with open(p12_filename) as f:
try:
p12 = crypto.load_pkcs12(f.read(), p12_pass or "")
except crypto.Error as e:
raise SecurityError("Invalid PKCS12 or passphrase for %s: %s"
% (p12_filename, e), cause=e), \
None, sys.exc_info()[2]
ctx.use_certificate(p12.get_certificate())
ctx.use_privatekey(p12.get_privatekey())
if verify_ca_from_p12:
#FIXME: is there no way to set the chain directly ?
with tempfile.NamedTemporaryFile() as f:
certs = p12.get_ca_certificates()
write_certificates(f, *certs)
f.flush()
ctx.load_verify_locations(f.name)
elif cert_filename is not None and key_filename is not None:
try:
with open(cert_filename) as f:
ft = crypto.FILETYPE_PEM
cert = crypto.load_certificate(ft, f.read())
except IOError as e:
raise SecurityError("Certificate file access error for %s: %s"
% (cert_filename, e), cause=e), \
None, sys.exc_info()[2]
except crypto.Error as e:
raise SecurityError("Invalid certificate %s: %s"
% (cert_filename, e), cause=e), \
None, sys.exc_info()[2]
try:
with open(key_filename) as f:
ft = crypto.FILETYPE_PEM
key = crypto.load_privatekey(ft, f.read(), key_pass or "")
except IOError as e:
raise SecurityError("Private key file error for %s: %s"
% (cert_filename, e), cause=e), \
None, sys.exc_info()[2]
except crypto.Error as e:
raise SecurityError("Invalid private key or passphrase for %s: %s"
% (key_filename, e), cause=e), \
None, sys.exc_info()[2]
ctx.use_certificate(cert)
ctx.use_privatekey(key)
try:
ctx.check_privatekey()
except crypto.Error as e:
raise SecurityError("Certificate and private key files do not "
"match; certificate: %s; private key: %s"
% (cert_filename, key_filename, ), cause=e), \
None, sys.exc_info()[2]
if not verify_ca_from_p12 and verify_ca_filename is not None:
ctx.load_verify_locations(verify_ca_filename)
return ctx