Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007 Python Software
- # Foundation; All Rights Reserved
- """A HTTPSConnection/Handler with additional proxy and cert validation features.
- In particular, monkey patches in Python r74203 to provide support for CONNECT
- proxies and adds SSL cert validation if the ssl module is present.
- """
- __author__ = "{frew,nick.johnson}@google.com (Fred Wulff and Nick Johnson)"
- import base64
- import httplib
- import logging
- import socket
- from urllib import splitpasswd
- from urllib import splittype
- from urllib import splituser
- import urllib2
- class InvalidCertificateException(httplib.HTTPException):
- """Raised when a certificate is provided with an invalid hostname."""
- def __init__(self, host, cert, reason):
- """Constructor.
- Args:
- host: The hostname the connection was made to.
- cert: The SSL certificate (as a dictionary) the host returned.
- reason: user readable error reason.
- """
- httplib.HTTPException.__init__(self)
- self.host = host
- self.cert = cert
- self.reason = reason
- def __str__(self):
- return ("Host %s returned an invalid certificate (%s): %s\n"
- "To learn more, see "
- "http://code.google.com/appengine/kb/general.html#rpcssl" %
- (self.host, self.reason, self.cert))
- try:
- import ssl
- _CAN_VALIDATE_CERTS = True
- except ImportError:
- _CAN_VALIDATE_CERTS = False
- def can_validate_certs():
- """Return True if we have the SSL package and can validate certificates."""
- return _CAN_VALIDATE_CERTS
- # Reexport SSLError so clients don't have to to do their own checking for ssl's
- # existence.
- if can_validate_certs():
- SSLError = ssl.SSLError
- else:
- SSLError = None
- def create_fancy_connection(tunnel_host=None, key_file=None,
- cert_file=None, ca_certs=None,
- proxy_authorization=None):
- # This abomination brought to you by the fact that
- # the HTTPHandler creates the connection instance in the middle
- # of do_open so we need to add the tunnel host to the class.
- class PresetProxyHTTPSConnection(httplib.HTTPSConnection):
- """An HTTPS connection that uses a proxy defined by the enclosing scope."""
- def __init__(self, *args, **kwargs):
- httplib.HTTPSConnection.__init__(self, *args, **kwargs)
- self._tunnel_host = tunnel_host
- if tunnel_host:
- logging.debug("Creating preset proxy https conn: %s", tunnel_host)
- self.key_file = key_file
- self.cert_file = cert_file
- self.ca_certs = ca_certs
- if can_validate_certs():
- if self.ca_certs:
- self.cert_reqs = ssl.CERT_REQUIRED
- else:
- self.cert_reqs = ssl.CERT_NONE
- def _get_hostport(self, host, port):
- # Python 2.7.7rc1 (hg r90728:568041fd8090), 3.4.1 and 3.5 rename
- # _set_hostport to _get_hostport and changes it's functionality. The
- # Python 2.7.7rc1 version of this method is included here for
- # compatibility with earlier versions of Python. Without this, HTTPS over
- # HTTP CONNECT proxies cannot be used.
- # This method may be removed if compatibility with Python <2.7.7rc1 is not
- # required.
- # Python bug: http://bugs.python.org/issue7776
- if port is None:
- i = host.rfind(":")
- j = host.rfind("]") # ipv6 addresses have [...]
- if i > j:
- try:
- port = int(host[i+1:])
- except ValueError:
- if host[i+1:] == "": # http://foo.com:/ == http://foo.com/
- port = self.default_port
- else:
- raise httplib.InvalidURL("nonnumeric port: '%s'" % host[i+1:])
- host = host[:i]
- else:
- port = self.default_port
- if host and host[0] == "[" and host[-1] == "]":
- host = host[1:-1]
- return (host, port)
- def _tunnel(self):
- self.host, self.port = self._get_hostport(self._tunnel_host, None)
- logging.info("Connecting through tunnel to: %s:%d",
- self.host, self.port)
- self.send("CONNECT %s:%d HTTP/1.0\r\n" % (self.host, self.port))
- if proxy_authorization:
- self.send("Proxy-Authorization: %s\r\n" % proxy_authorization)
- # blank line
- self.send("\r\n")
- response = self.response_class(self.sock, strict=self.strict,
- method=self._method)
- # pylint: disable=protected-access
- (_, code, message) = response._read_status()
- if code != 200:
- self.close()
- raise socket.error("Tunnel connection failed: %d %s" %
- (code, message.strip()))
- while True:
- line = response.fp.readline()
- if line == "\r\n":
- break
- def _get_valid_hosts_for_cert(self, cert):
- """Returns a list of valid host globs for an SSL certificate.
- Args:
- cert: A dictionary representing an SSL certificate.
- Returns:
- list: A list of valid host globs.
- """
- if "subjectAltName" in cert:
- return [x[1] for x in cert["subjectAltName"] if x[0].lower() == "dns"]
- else:
- # Return a list of commonName fields
- return [x[0][1] for x in cert["subject"]
- if x[0][0].lower() == "commonname"]
- def _validate_certificate_hostname(self, cert, hostname):
- """Perform RFC2818/6125 validation against a cert and hostname.
- Args:
- cert: A dictionary representing an SSL certificate.
- hostname: The hostname to test.
- Returns:
- bool: Whether or not the hostname is valid for this certificate.
- """
- hosts = self._get_valid_hosts_for_cert(cert)
- for host in hosts:
- # Wildcards are only valid when the * exists at the end of the last
- # (left-most) label, and there are at least 3 labels in the expression.
- if ("*." in host and host.count("*") == 1 and
- host.count(".") > 1 and "." in hostname):
- left_expected, right_expected = host.split("*.")
- left_hostname, right_hostname = hostname.split(".", 1)
- if (left_hostname.startswith(left_expected) and
- right_expected == right_hostname):
- return True
- elif host == hostname:
- return True
- return False
- def connect(self):
- # TODO(frew): When we drop support for <2.6 (in the far distant future),
- # change this to socket.create_connection.
- self.sock = _create_connection((self.host, self.port))
- if self._tunnel_host:
- self._tunnel()
- # ssl and FakeSocket got deprecated. Try for the new hotness of wrap_ssl,
- # with fallback. Note: Since can_validate_certs() just checks for the
- # ssl module, it's equivalent to attempting to import ssl from
- # the function, but doesn't require a dynamic import, which doesn't
- # play nicely with dev_appserver.
- if can_validate_certs():
- self.sock = ssl.wrap_socket(self.sock,
- keyfile=self.key_file,
- certfile=self.cert_file,
- ca_certs=self.ca_certs,
- cert_reqs=self.cert_reqs)
- if self.cert_reqs & ssl.CERT_REQUIRED:
- cert = self.sock.getpeercert()
- hostname = self.host.split(":", 0)[0]
- if not self._validate_certificate_hostname(cert, hostname):
- raise InvalidCertificateException(hostname, cert,
- "hostname mismatch")
- else:
- ssl_socket = socket.ssl(self.sock,
- keyfile=self.key_file,
- certfile=self.cert_file)
- self.sock = httplib.FakeSocket(self.sock, ssl_socket)
- return PresetProxyHTTPSConnection
- # Here to end of _create_connection copied wholesale from Python 2.6"s socket.py
- _GLOBAL_DEFAULT_TIMEOUT = object()
- def _create_connection(address, timeout=_GLOBAL_DEFAULT_TIMEOUT):
- """Connect to *address* and return the socket object.
- Convenience function. Connect to *address* (a 2-tuple ``(host,
- port)``) and return the socket object. Passing the optional
- *timeout* parameter will set the timeout on the socket instance
- before attempting to connect. If no *timeout* is supplied, the
- global default timeout setting returned by :func:`getdefaulttimeout`
- is used.
- """
- msg = "getaddrinfo returns an empty list"
- host, port = address
- for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM):
- af, socktype, proto, canonname, sa = res
- sock = None
- try:
- sock = socket.socket(af, socktype, proto)
- if timeout is not _GLOBAL_DEFAULT_TIMEOUT:
- sock.settimeout(timeout)
- sock.connect(sa)
- return sock
- except socket.error, msg:
- if sock is not None:
- sock.close()
- raise socket.error, msg
- class FancyRequest(urllib2.Request):
- """A request that allows the use of a CONNECT proxy."""
- def __init__(self, *args, **kwargs):
- urllib2.Request.__init__(self, *args, **kwargs)
- self._tunnel_host = None
- self._key_file = None
- self._cert_file = None
- self._ca_certs = None
- def set_proxy(self, host, type):
- saved_type = None
- if self.get_type() == "https" and not self._tunnel_host:
- self._tunnel_host = self.get_host()
- saved_type = self.get_type()
- urllib2.Request.set_proxy(self, host, type)
- if saved_type:
- # Don't set self.type, we want to preserve the
- # type for tunneling.
- self.type = saved_type
- def set_ssl_info(self, key_file=None, cert_file=None, ca_certs=None):
- self._key_file = key_file
- self._cert_file = cert_file
- self._ca_certs = ca_certs
- class FancyProxyHandler(urllib2.ProxyHandler):
- """A ProxyHandler that works with CONNECT-enabled proxies."""
- # Taken verbatim from /usr/lib/python2.5/urllib2.py
- def _parse_proxy(self, proxy):
- """Return (scheme, user, password, host/port) given a URL or an authority.
- If a URL is supplied, it must have an authority (host:port) component.
- According to RFC 3986, having an authority component means the URL must
- have two slashes after the scheme:
- >>> _parse_proxy('file:/ftp.example.com/')
- Traceback (most recent call last):
- ValueError: proxy URL with no authority: 'file:/ftp.example.com/'
- The first three items of the returned tuple may be None.
- Examples of authority parsing:
- >>> _parse_proxy('proxy.example.com')
- (None, None, None, 'proxy.example.com')
- >>> _parse_proxy('proxy.example.com:3128')
- (None, None, None, 'proxy.example.com:3128')
- The authority component may optionally include userinfo (assumed to be
- username:password):
- >>> _parse_proxy('joe:password@proxy.example.com')
- (None, 'joe', 'password', 'proxy.example.com')
- >>> _parse_proxy('joe:password@proxy.example.com:3128')
- (None, 'joe', 'password', 'proxy.example.com:3128')
- Same examples, but with URLs instead:
- >>> _parse_proxy('http://proxy.example.com/')
- ('http', None, None, 'proxy.example.com')
- >>> _parse_proxy('http://proxy.example.com:3128/')
- ('http', None, None, 'proxy.example.com:3128')
- >>> _parse_proxy('http://joe:password@proxy.example.com/')
- ('http', 'joe', 'password', 'proxy.example.com')
- >>> _parse_proxy('http://joe:password@proxy.example.com:3128')
- ('http', 'joe', 'password', 'proxy.example.com:3128')
- Everything after the authority is ignored:
- >>> _parse_proxy('ftp://joe:password@proxy.example.com/rubbish:3128')
- ('ftp', 'joe', 'password', 'proxy.example.com')
- Test for no trailing '/' case:
- >>> _parse_proxy('http://joe:password@proxy.example.com')
- ('http', 'joe', 'password', 'proxy.example.com')
- """
- scheme, r_scheme = splittype(proxy)
- if not r_scheme.startswith("/"):
- # authority
- scheme = None
- authority = proxy
- else:
- # URL
- if not r_scheme.startswith("//"):
- raise ValueError("proxy URL with no authority: %r" % proxy)
- # We have an authority, so for RFC 3986-compliant URLs (by ss 3.
- # and 3.3.), path is empty or starts with '/'
- end = r_scheme.find("/", 2)
- if end == -1:
- end = None
- authority = r_scheme[2:end]
- userinfo, hostport = splituser(authority)
- if userinfo is not None:
- user, password = splitpasswd(userinfo)
- else:
- user = password = None
- return scheme, user, password, hostport
- def proxy_open(self, req, proxy, type):
- # This block is copied wholesale from Python2.6 urllib2.
- # It is idempotent, so the superclass method call executes as normal
- # if invoked.
- orig_type = req.get_type()
- proxy_type, user, password, hostport = self._parse_proxy(proxy)
- if proxy_type is None:
- proxy_type = orig_type
- if user and password:
- user_pass = "%s:%s" % (urllib2.unquote(user), urllib2.unquote(password))
- creds = base64.b64encode(user_pass).strip()
- # Later calls overwrite earlier calls for the same header
- req.add_header("Proxy-authorization", "Basic " + creds)
- hostport = urllib2.unquote(hostport)
- req.set_proxy(hostport, proxy_type)
- # This condition is the change
- if orig_type == "https":
- return None
- return urllib2.ProxyHandler.proxy_open(self, req, proxy, type)
- class FancyHTTPSHandler(urllib2.HTTPSHandler):
- """An HTTPSHandler that works with CONNECT-enabled proxies."""
- def do_open(self, http_class, req, *args, **kwargs):
- proxy_authorization = None
- for header in req.headers:
- if header.lower() == "proxy-authorization":
- proxy_authorization = req.headers[header]
- break
- # Intentionally very specific so as to opt for false negatives
- # rather than false positives.
- try:
- return urllib2.HTTPSHandler.do_open(
- self,
- create_fancy_connection(req._tunnel_host,
- req._key_file,
- req._cert_file,
- req._ca_certs,
- proxy_authorization),
- req, *args, **kwargs)
- except urllib2.URLError, url_error:
- try:
- import ssl
- if (type(url_error.reason) == ssl.SSLError and
- url_error.reason.args[0] == 1):
- # Display the reason to the user. Need to use args for python2.5
- # compat.
- raise InvalidCertificateException(req.host, "",
- url_error.reason.args[1])
- except ImportError:
- pass
- raise url_error
- # We have to implement this so that we persist the tunneling behavior
- # through redirects.
- class FancyRedirectHandler(urllib2.HTTPRedirectHandler):
- """A redirect handler that persists CONNECT-enabled proxy information."""
- def redirect_request(self, req, *args, **kwargs):
- new_req = urllib2.HTTPRedirectHandler.redirect_request(
- self, req, *args, **kwargs)
- # Same thing as in our set_proxy implementation, but in this case
- # we"ve only got a Request to work with, so it was this or copy
- # everything over piecemeal.
- #
- # Note that we do not persist tunneling behavior from an http request
- # to an https request, because an http request does not set _tunnel_host.
- #
- # Also note that in Python < 2.6, you will get an error in
- # FancyHTTPSHandler.do_open() on an https urllib2.Request that uses an http
- # proxy, since the proxy type will be set to http instead of https.
- # (FancyRequest, and urllib2.Request in Python >= 2.6 set the proxy type to
- # https.) Such an urllib2.Request could result from this redirect
- # if you are redirecting from an http request (since an an http request
- # does not have _tunnel_host set, and thus you will not set the proxy
- # in the code below), and if you have defined a proxy for https in, say,
- # FancyProxyHandler, and that proxy has type http.
- if hasattr(req, "_tunnel_host") and isinstance(new_req, urllib2.Request):
- if new_req.get_type() == "https":
- if req._tunnel_host:
- # req is proxied, so copy the proxy info.
- new_req._tunnel_host = new_req.get_host()
- new_req.set_proxy(req.host, "https")
- else:
- # req is not proxied, so just make sure _tunnel_host is defined.
- new_req._tunnel_host = None
- new_req.type = "https"
- if hasattr(req, "_key_file") and isinstance(new_req, urllib2.Request):
- # Copy the auxiliary data in case this or any further redirect is https
- new_req._key_file = req._key_file
- new_req._cert_file = req._cert_file
- new_req._ca_certs = req._ca_certs
- return new_req
Add Comment
Please, Sign In to add comment