Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- import hashlib
- import os
- import re
- import secrets
- from time import time
- from multidict import CIMultiDict
- from devtools import debug
- HOST = 'sip.soho66.co.uk'
- PORT = 8060
- CONN = HOST, PORT
- USERNAME = os.environ['SIP_USERNAME']
- PASSWORD = os.environ['SIP_PASSWORD']
- URI = f'sip:{HOST}:{PORT}'
- AUTH_METHOD = 'REGISTER'
- BRANCH = secrets.token_hex()[:16].upper()
- def md5digest(*args):
- return hashlib.md5(':'.join(args).encode()).hexdigest()
- def parse_digest(header):
- params = {}
- for arg in header[7:].split(','):
- k, v = arg.strip().split('=', 1)
- if '="' in arg:
- v = v[1:-1]
- params[k] = v
- return params
- RESPONSE_DECODE = re.compile(r'SIP/2.0 (?P<status_code>[0-9]{3}) (?P<status_message>.+)')
- REQUEST_DECODE = re.compile(r'(?P<method>[A-Za-z]+) (?P<to_uri>.+) SIP/2.0')
- NUMBER = re.compile(r'sip:(\d+)@')
- def parse_headers(raw_headers):
- headers = CIMultiDict()
- decoded_headers = raw_headers.decode().split('\r\n')
- for line in decoded_headers[1:]:
- k, v = line.split(': ', 1)
- if k in headers:
- o = headers.setdefault(k, [])
- if not isinstance(o, list):
- o = [o]
- o.append(v)
- headers[k] = o
- else:
- headers[k] = v
- for regex in (REQUEST_DECODE, RESPONSE_DECODE):
- m = regex.match(decoded_headers[0])
- if m:
- return m.groupdict(), headers
- debug(raw_headers, headers)
- raise RuntimeError('unable to decode response')
- class EchoClientProtocol:
- def __init__(self):
- self.transport = None
- self.auth_attempt = 0
- self.local_ip = None
- self.cseq = 1
- self.call_id = secrets.token_hex()[:10]
- self.last_invitation = 0
- def connection_made(self, transport):
- self.transport = transport
- self.local_ip, _ = transport.get_extra_info('sockname')
- print(f'connection established')
- self.send(f"""\
- {AUTH_METHOD} sip:{HOST}:{PORT} SIP/2.0
- Via: SIP/2.0/UDP {self.local_ip}:5060;rport;branch={BRANCH}
- From: <sip:{USERNAME}@{HOST}:{PORT}>;tag=1269824498
- To: <sip:{USERNAME}@{HOST}:{PORT}>
- Call-ID: {self.call_id}
- CSeq: {self.cseq} {AUTH_METHOD}
- Contact: <sip:{USERNAME}@{self.local_ip};line=9ad550fb9d87b0f>
- Max-Forwards: 70
- User-Agent: TutorCruncher Address Book
- Expires: 60
- Content-Length: 0""")
- def datagram_received(self, data, addr):
- # print('datagram:', data)
- try:
- self.process_response(data, addr)
- except Exception:
- debug(data)
- debug(data.decode())
- raise
- def process_response(self, raw_data, addr):
- if raw_data == b'\x00\x00\x00\x00':
- return
- headers, data = raw_data.split(b'\r\n\r\n', 1)
- status, headers = parse_headers(headers)
- status_code = int(status.get('status_code', 0))
- if status_code == 401 and self.auth_attempt < 3:
- self.auth_attempt += 1
- auth = headers['WWW-Authenticate']
- params = parse_digest(auth)
- realm, nonce = params['realm'], params['nonce']
- ha1 = md5digest(USERNAME, realm, PASSWORD)
- ha2 = md5digest(AUTH_METHOD, URI)
- self.send(f"""\
- {AUTH_METHOD} sip:{HOST}:{PORT} SIP/2.0
- Via: SIP/2.0/UDP {self.local_ip}:5060;rport;branch={BRANCH}
- From: <sip:{USERNAME}@{HOST}:{PORT}>;tag=1269824498
- To: <sip:{USERNAME}@{HOST}:{PORT}>
- Call-ID: {self.call_id}
- CSeq: {self.cseq} {AUTH_METHOD}
- Contact: <sip:{USERNAME}@{self.local_ip};line=9ad550fb9d87b0f>
- Authorization: Digest username="{USERNAME}", realm="{realm}", nonce="{nonce}", uri="{URI}", \
- response="{md5digest(ha1, nonce, ha2)}", algorithm=MD5
- Max-Forwards: 70
- User-Agent: TutorCruncher Address Book
- Expires: 60
- Content-Length: 0""")
- elif status_code == 200:
- print('authenticated successfully')
- elif status.get('method') == 'OPTIONS':
- # don't care
- pass
- elif status.get('method') == 'INVITE':
- n = time()
- if (n - self.last_invitation) > 1:
- self.process_invite(headers)
- self.last_invitation = n
- else:
- debug(status, dict(headers))
- try:
- data_text = data.decode()
- except UnicodeDecodeError:
- print(data)
- else:
- print(data_text)
- def send(self, data):
- data = (data.strip('\n ') + '\n\n').replace('\n', '\r\n').encode()
- self.transport.sendto(data)
- self.cseq += 1
- def error_received(self, exc):
- print('Error received:', exc)
- def process_invite(self, headers):
- m = NUMBER.search(headers['From'])
- if m:
- number = m.groups()[0]
- else:
- number = 'unknown'
- debug('no number found', dict(headers))
- country = headers.get('X-Brand', None)
- print(f'incoming call from {number}{" ({})".format(country) if country else ""}')
- loop = asyncio.get_event_loop()
- connect = loop.create_datagram_endpoint(lambda: EchoClientProtocol(), remote_addr=CONN)
- transport, protocol = loop.run_until_complete(connect)
- try:
- loop.run_forever()
- except KeyboardInterrupt:
- pass
- finally:
- transport.close()
- loop.close()
Add Comment
Please, Sign In to add comment