Guest User

Untitled

a guest
Mar 22nd, 2018
143
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.17 KB | None | 0 0
  1. import asyncio
  2. import hashlib
  3. import os
  4. import re
  5. import secrets
  6. from time import time
  7.  
  8. from multidict import CIMultiDict
  9. from devtools import debug
  10.  
  11.  
  12. HOST = 'sip.soho66.co.uk'
  13. PORT = 8060
  14. CONN = HOST, PORT
  15. USERNAME = os.environ['SIP_USERNAME']
  16. PASSWORD = os.environ['SIP_PASSWORD']
  17. URI = f'sip:{HOST}:{PORT}'
  18. AUTH_METHOD = 'REGISTER'
  19. BRANCH = secrets.token_hex()[:16].upper()
  20.  
  21.  
  22. def md5digest(*args):
  23. return hashlib.md5(':'.join(args).encode()).hexdigest()
  24.  
  25.  
  26. def parse_digest(header):
  27. params = {}
  28. for arg in header[7:].split(','):
  29. k, v = arg.strip().split('=', 1)
  30. if '="' in arg:
  31. v = v[1:-1]
  32. params[k] = v
  33. return params
  34.  
  35.  
  36. RESPONSE_DECODE = re.compile(r'SIP/2.0 (?P<status_code>[0-9]{3}) (?P<status_message>.+)')
  37. REQUEST_DECODE = re.compile(r'(?P<method>[A-Za-z]+) (?P<to_uri>.+) SIP/2.0')
  38. NUMBER = re.compile(r'sip:(\d+)@')
  39.  
  40.  
  41. def parse_headers(raw_headers):
  42. headers = CIMultiDict()
  43. decoded_headers = raw_headers.decode().split('\r\n')
  44. for line in decoded_headers[1:]:
  45. k, v = line.split(': ', 1)
  46. if k in headers:
  47. o = headers.setdefault(k, [])
  48. if not isinstance(o, list):
  49. o = [o]
  50. o.append(v)
  51. headers[k] = o
  52. else:
  53. headers[k] = v
  54.  
  55. for regex in (REQUEST_DECODE, RESPONSE_DECODE):
  56. m = regex.match(decoded_headers[0])
  57. if m:
  58. return m.groupdict(), headers
  59. debug(raw_headers, headers)
  60. raise RuntimeError('unable to decode response')
  61.  
  62.  
  63. class EchoClientProtocol:
  64. def __init__(self):
  65. self.transport = None
  66. self.auth_attempt = 0
  67. self.local_ip = None
  68. self.cseq = 1
  69. self.call_id = secrets.token_hex()[:10]
  70. self.last_invitation = 0
  71.  
  72. def connection_made(self, transport):
  73. self.transport = transport
  74. self.local_ip, _ = transport.get_extra_info('sockname')
  75. print(f'connection established')
  76. self.send(f"""\
  77. {AUTH_METHOD} sip:{HOST}:{PORT} SIP/2.0
  78. Via: SIP/2.0/UDP {self.local_ip}:5060;rport;branch={BRANCH}
  79. From: <sip:{USERNAME}@{HOST}:{PORT}>;tag=1269824498
  80. To: <sip:{USERNAME}@{HOST}:{PORT}>
  81. Call-ID: {self.call_id}
  82. CSeq: {self.cseq} {AUTH_METHOD}
  83. Contact: <sip:{USERNAME}@{self.local_ip};line=9ad550fb9d87b0f>
  84. Max-Forwards: 70
  85. User-Agent: TutorCruncher Address Book
  86. Expires: 60
  87. Content-Length: 0""")
  88.  
  89. def datagram_received(self, data, addr):
  90. # print('datagram:', data)
  91. try:
  92. self.process_response(data, addr)
  93. except Exception:
  94. debug(data)
  95. debug(data.decode())
  96. raise
  97.  
  98. def process_response(self, raw_data, addr):
  99. if raw_data == b'\x00\x00\x00\x00':
  100. return
  101. headers, data = raw_data.split(b'\r\n\r\n', 1)
  102. status, headers = parse_headers(headers)
  103. status_code = int(status.get('status_code', 0))
  104. if status_code == 401 and self.auth_attempt < 3:
  105. self.auth_attempt += 1
  106. auth = headers['WWW-Authenticate']
  107. params = parse_digest(auth)
  108. realm, nonce = params['realm'], params['nonce']
  109. ha1 = md5digest(USERNAME, realm, PASSWORD)
  110. ha2 = md5digest(AUTH_METHOD, URI)
  111. self.send(f"""\
  112. {AUTH_METHOD} sip:{HOST}:{PORT} SIP/2.0
  113. Via: SIP/2.0/UDP {self.local_ip}:5060;rport;branch={BRANCH}
  114. From: <sip:{USERNAME}@{HOST}:{PORT}>;tag=1269824498
  115. To: <sip:{USERNAME}@{HOST}:{PORT}>
  116. Call-ID: {self.call_id}
  117. CSeq: {self.cseq} {AUTH_METHOD}
  118. Contact: <sip:{USERNAME}@{self.local_ip};line=9ad550fb9d87b0f>
  119. Authorization: Digest username="{USERNAME}", realm="{realm}", nonce="{nonce}", uri="{URI}", \
  120. response="{md5digest(ha1, nonce, ha2)}", algorithm=MD5
  121. Max-Forwards: 70
  122. User-Agent: TutorCruncher Address Book
  123. Expires: 60
  124. Content-Length: 0""")
  125. elif status_code == 200:
  126. print('authenticated successfully')
  127. elif status.get('method') == 'OPTIONS':
  128. # don't care
  129. pass
  130. elif status.get('method') == 'INVITE':
  131. n = time()
  132. if (n - self.last_invitation) > 1:
  133. self.process_invite(headers)
  134. self.last_invitation = n
  135. else:
  136. debug(status, dict(headers))
  137. try:
  138. data_text = data.decode()
  139. except UnicodeDecodeError:
  140. print(data)
  141. else:
  142. print(data_text)
  143.  
  144. def send(self, data):
  145. data = (data.strip('\n ') + '\n\n').replace('\n', '\r\n').encode()
  146. self.transport.sendto(data)
  147. self.cseq += 1
  148.  
  149. def error_received(self, exc):
  150. print('Error received:', exc)
  151.  
  152. def process_invite(self, headers):
  153. m = NUMBER.search(headers['From'])
  154. if m:
  155. number = m.groups()[0]
  156. else:
  157. number = 'unknown'
  158. debug('no number found', dict(headers))
  159. country = headers.get('X-Brand', None)
  160. print(f'incoming call from {number}{" ({})".format(country) if country else ""}')
  161.  
  162.  
  163. loop = asyncio.get_event_loop()
  164. connect = loop.create_datagram_endpoint(lambda: EchoClientProtocol(), remote_addr=CONN)
  165. transport, protocol = loop.run_until_complete(connect)
  166. try:
  167. loop.run_forever()
  168. except KeyboardInterrupt:
  169. pass
  170. finally:
  171. transport.close()
  172. loop.close()
Add Comment
Please, Sign In to add comment