Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class ClientXMPP(sleekxmpp.ClientXMPP):
- # an hacked version of the base class which supports
- # digest-md5 authentication (for facebook)
- def __init__(self, *args, **kwargs):
- sleekxmpp.ClientXMPP.__init__(self, *args, **kwargs)
- sasl_ns = 'urn:ietf:params:xml:ns:xmpp-sasl'
- self.add_handler("<challenge xmlns='%s' />" % sasl_ns,
- self._handle_sasl_digest_md5_auth,
- instream=True)
- def _handle_sasl_auth(self, xml):
- if '{urn:ietf:params:xml:ns:xmpp-tls}starttls' in self.features:
- return False
- logger.debug("Starting SASL Auth")
- sasl_ns = 'urn:ietf:params:xml:ns:xmpp-sasl'
- self.add_handler("<success xmlns='%s' />" % sasl_ns,
- self._handle_auth_success,
- name='SASL Sucess',
- instream=True)
- self.add_handler("<failure xmlns='%s' />" % sasl_ns,
- self._handle_auth_fail,
- name='SASL Failure',
- instream=True)
- sasl_mechs = xml.findall('{%s}mechanism' % sasl_ns)
- if sasl_mechs:
- for sasl_mech in sasl_mechs:
- self.features.append("sasl:%s" % sasl_mech.text)
- if 'sasl:PLAIN' in self.features and self.boundjid.user:
- user = bytes(self.boundjid.user)
- password = bytes(self.password)
- auth = base64.b64encode('\x00' + user + \
- '\x00' + password).decode('utf-8')
- self.send("<auth xmlns='%s' mechanism='PLAIN'>%s</auth>" \
- % (sasl_ns, auth))
- elif 'sasl:ANONYMOUS' in self.features and not self.boundjid.user:
- self.send("<auth xmlns='%s' mechanism='%s' />" % \
- (sasl_ns, 'ANONYMOUS'))
- elif 'sasl:DIGEST-MD5' in self.features:
- self.send("<auth xmlns='%s' mechanism='DIGEST-MD5'/>" % sasl_ns)
- else:
- logger.error("no appropriate login method.")
- self.disconnect()
- return True
- def _handle_sasl_digest_md5_auth(self, xml):
- sasl_ns = 'urn:ietf:params:xml:ns:xmpp-sasl'
- self.add_handler("<success xmlns='%s' />" % sasl_ns,
- self._handle_auth_success,
- name='SASL Sucess',
- instream=True)
- self.add_handler("<failure xmlns='%s' />" % sasl_ns,
- self._handle_auth_fail,
- name='SASL Failure',
- instream=True)
- challenge = [item.split('=', 1) for item in base64.b64decode(xml.text).replace("\"", "").split(',', 6) ]
- challenge = dict(challenge)
- logger.debug("MD5 auth challenge: %s", challenge)
- # succesfully authenticated, send response
- if challenge.get('rspauth'):
- self.send("<response xmlns='urn:ietf:params:xml:ns:xmpp-sasl'/>")
- return
- # TODO: use realm if supplied by server, use default qop unless
- # supplied by server; realm, nonce, qop should all be present.
- if not challenge.get('qop') or not challenge.get('nonce'):
- logger.error("error during digest-md5 authentication. "
- "challenge missing critical information. "
- "(challenge: %s" % base64.b64decode(xml.text))
- self._handle_auth_fail(xml)
- return
- # TODO: charset can be either UTF-8 or if not present use ISO
- # 8859-1 defaulting for UTF-8 for now.
- # compute the cnonce - a unique hex string only used in this request
- cnonce = ""
- for i in range(7):
- cnonce += hex(int(random.random() * 65536 * 4096))[2:]
- cnonce = base64.encodestring(cnonce)[0:-1]
- a1 = b"%s:%s:%s" %(md5("%s:%s:%s" % (self.boundjid.user,
- self.boundjid.host,
- self.password)),
- challenge["nonce"].encode("UTF-8"),
- cnonce.encode("UTF-8"))
- a2 = "AUTHENTICATE:xmpp/%s" % self.boundjid.host
- resp_hash = md5digest("%s:%s:00000001:%s:auth:%s" \
- %(md5digest(a1),
- challenge["nonce"],
- cnonce, md5digest(a2)))
- response = 'charset=utf-8,username="%s",realm="%s",nonce="%s",nc=00000001,cnonce="%s",digest-uri="%s",response=%s,qop=%s,' \
- % (self.boundjid.user,
- self.boundjid.host,
- challenge["nonce"],
- cnonce,
- "xmpp/%s" % self.boundjid.host, resp_hash, challenge["qop"])
- self.send("<response xmlns='urn:ietf:params:xml:ns:xmpp-sasl'>%s</response>" \
- % base64.encodestring(response)[:-1])
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement