Advertisement
Guest User

Untitled

a guest
Nov 2nd, 2017
105
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 9.69 KB | None | 0 0
  1. import binascii
  2. import logging
  3. import random
  4. import socket
  5.  
  6. __version__ = '0.1.0'
  7.  
  8. log = logging.getLogger("pystun")
  9.  
  10. STUN_SERVERS = (
  11.     'stun.stunprotocol.org',
  12.     # 's1.taraba.net',
  13.     # 'stun.ekiga.net',
  14.     # 'stun.ideasip.com',
  15.     # 'stun.voiparound.com',
  16.     # 'stun.voipbuster.com',
  17.     # 'stun.voipstunt.com',
  18.     # 'stun.voxgratia.org'
  19. )
  20.  
  21. stun_servers_list = STUN_SERVERS
  22.  
  23. DEFAULTS = {
  24.     'stun_port': 3478,
  25.     'source_ip': '0.0.0.0',
  26.     'source_port': 54320
  27. }
  28.  
  29. # stun attributes
  30. MappedAddress = '0001'
  31. ResponseAddress = '0002'
  32. ChangeRequest = '0003'
  33. SourceAddress = '0004'
  34. ChangedAddress = '0005'
  35. Username = '0006'
  36. Password = '0007'
  37. MessageIntegrity = '0008'
  38. ErrorCode = '0009'
  39. UnknownAttribute = '000A'
  40. ReflectedFrom = '000B'
  41. XorOnly = '0021'
  42. XorMappedAddress = '8020'
  43. ServerName = '8022'
  44. SecondaryAddress = '8050'  # Non standard extension
  45.  
  46. # types for a stun message
  47. BindRequestMsg = '0001'
  48. BindResponseMsg = '0101'
  49. BindErrorResponseMsg = '0111'
  50. SharedSecretRequestMsg = '0002'
  51. SharedSecretResponseMsg = '0102'
  52. SharedSecretErrorResponseMsg = '0112'
  53.  
  54. dictAttrToVal = {'MappedAddress': MappedAddress,
  55.                  'ResponseAddress': ResponseAddress,
  56.                  'ChangeRequest': ChangeRequest,
  57.                  'SourceAddress': SourceAddress,
  58.                  'ChangedAddress': ChangedAddress,
  59.                  'Username': Username,
  60.                  'Password': Password,
  61.                  'MessageIntegrity': MessageIntegrity,
  62.                  'ErrorCode': ErrorCode,
  63.                  'UnknownAttribute': UnknownAttribute,
  64.                  'ReflectedFrom': ReflectedFrom,
  65.                  'XorOnly': XorOnly,
  66.                  'XorMappedAddress': XorMappedAddress,
  67.                  'ServerName': ServerName,
  68.                  'SecondaryAddress': SecondaryAddress}
  69.  
  70. dictMsgTypeToVal = {
  71.     'BindRequestMsg': BindRequestMsg,
  72.     'BindResponseMsg': BindResponseMsg,
  73.     'BindErrorResponseMsg': BindErrorResponseMsg,
  74.     'SharedSecretRequestMsg': SharedSecretRequestMsg,
  75.     'SharedSecretResponseMsg': SharedSecretResponseMsg,
  76.     'SharedSecretErrorResponseMsg': SharedSecretErrorResponseMsg}
  77.  
  78. dictValToMsgType = {}
  79.  
  80. dictValToAttr = {}
  81.  
  82. Blocked = "Blocked"
  83. OpenInternet = "Open Internet"
  84. FullCone = "Full Cone"
  85. SymmetricUDPFirewall = "Symmetric UDP Firewall"
  86. RestricNAT = "Restric NAT"
  87. RestricPortNAT = "Restric Port NAT"
  88. SymmetricNAT = "Symmetric NAT"
  89. ChangedAddressError = "Meet an error, when do Test1 on Changed IP and Port"
  90.  
  91.  
  92. def _initialize():
  93.     items = dictAttrToVal.items()
  94.     for i in range(len(items)):
  95.         dictValToAttr.update({items[i][1]: items[i][0]})
  96.     items = dictMsgTypeToVal.items()
  97.     for i in range(len(items)):
  98.         dictValToMsgType.update({items[i][1]: items[i][0]})
  99.  
  100.  
  101. def gen_tran_id():
  102.     a = ''.join(random.choice('0123456789ABCDEF') for i in range(32))
  103.     # return binascii.a2b_hex(a)
  104.     return a
  105.  
  106.  
  107. def stun_test(sock, host, port, source_ip, source_port, send_data=""):
  108.     retVal = {'Resp': False, 'ExternalIP': None, 'ExternalPort': None,
  109.               'SourceIP': None, 'SourcePort': None, 'ChangedIP': None,
  110.               'ChangedPort': None}
  111.     str_len = "%#04d" % (len(send_data) / 2)
  112.     tranid = gen_tran_id()
  113.     str_data = ''.join([BindRequestMsg, str_len, tranid, send_data])
  114.     data = binascii.a2b_hex(str_data)
  115.     recvCorr = False
  116.     while not recvCorr:
  117.         recieved = False
  118.         count = 3
  119.         while not recieved:
  120.             log.debug("sendto: %s", (host, port))
  121.             try:
  122.                 sock.sendto(data, (host, port))
  123.             except socket.gaierror:
  124.                 retVal['Resp'] = False
  125.                 return retVal
  126.             try:
  127.                 buf, addr = sock.recvfrom(2048)
  128.                 log.debug("recvfrom: %s", addr)
  129.                 recieved = True
  130.             except Exception:
  131.                 recieved = False
  132.                 if count > 0:
  133.                     count -= 1
  134.                 else:
  135.                     retVal['Resp'] = False
  136.                     return retVal
  137.         msgtype = binascii.b2a_hex(buf[0:2])
  138.         bind_resp_msg = dictValToMsgType[msgtype] == "BindResponseMsg"
  139.         tranid_match = tranid.upper() == binascii.b2a_hex(buf[4:20]).upper()
  140.         if bind_resp_msg and tranid_match:
  141.             recvCorr = True
  142.             retVal['Resp'] = True
  143.             len_message = int(binascii.b2a_hex(buf[2:4]), 16)
  144.             len_remain = len_message
  145.             base = 20
  146.             while len_remain:
  147.                 attr_type = binascii.b2a_hex(buf[base:(base + 2)])
  148.                 attr_len = int(binascii.b2a_hex(buf[(base + 2):(base + 4)]), 16)
  149.                 if attr_type == MappedAddress:
  150.                     port = int(binascii.b2a_hex(buf[base + 6:base + 8]), 16)
  151.                     ip = ".".join([
  152.                         str(int(binascii.b2a_hex(buf[base + 8:base + 9]), 16)),
  153.                         str(int(binascii.b2a_hex(buf[base + 9:base + 10]), 16)),
  154.                         str(int(binascii.b2a_hex(buf[base + 10:base + 11]), 16)),
  155.                         str(int(binascii.b2a_hex(buf[base + 11:base + 12]), 16))
  156.                     ])
  157.                     retVal['ExternalIP'] = ip
  158.                     retVal['ExternalPort'] = port
  159.                 if attr_type == SourceAddress:
  160.                     port = int(binascii.b2a_hex(buf[base + 6:base + 8]), 16)
  161.                     ip = ".".join([
  162.                         str(int(binascii.b2a_hex(buf[base + 8:base + 9]), 16)),
  163.                         str(int(binascii.b2a_hex(buf[base + 9:base + 10]), 16)),
  164.                         str(int(binascii.b2a_hex(buf[base + 10:base + 11]), 16)),
  165.                         str(int(binascii.b2a_hex(buf[base + 11:base + 12]), 16))
  166.                     ])
  167.                     retVal['SourceIP'] = ip
  168.                     retVal['SourcePort'] = port
  169.                 if attr_type == ChangedAddress:
  170.                     port = int(binascii.b2a_hex(buf[base + 6:base + 8]), 16)
  171.                     ip = ".".join([
  172.                         str(int(binascii.b2a_hex(buf[base + 8:base + 9]), 16)),
  173.                         str(int(binascii.b2a_hex(buf[base + 9:base + 10]), 16)),
  174.                         str(int(binascii.b2a_hex(buf[base + 10:base + 11]), 16)),
  175.                         str(int(binascii.b2a_hex(buf[base + 11:base + 12]), 16))
  176.                     ])
  177.                     retVal['ChangedIP'] = ip
  178.                     retVal['ChangedPort'] = port
  179.                 # if attr_type == ServerName:
  180.                     # serverName = buf[(base+4):(base+4+attr_len)]
  181.                 base = base + 4 + attr_len
  182.                 len_remain = len_remain - (4 + attr_len)
  183.     # s.close()
  184.     return retVal
  185.  
  186.  
  187. def get_nat_type(s, source_ip, source_port, stun_host=None, stun_port=3478):
  188.     _initialize()
  189.     port = stun_port
  190.     log.debug("Do Test1")
  191.     resp = False
  192.     if stun_host:
  193.         ret = stun_test(s, stun_host, port, source_ip, source_port)
  194.         resp = ret['Resp']
  195.     else:
  196.         for stun_host in stun_servers_list:
  197.             log.debug('Trying STUN host: %s', stun_host)
  198.             ret = stun_test(s, stun_host, port, source_ip, source_port)
  199.             resp = ret['Resp']
  200.             if resp:
  201.                 break
  202.     if not resp:
  203.         return Blocked, ret
  204.     log.debug("Result: %s", ret)
  205.     exIP = ret['ExternalIP']
  206.     exPort = ret['ExternalPort']
  207.     changedIP = ret['ChangedIP']
  208.     changedPort = ret['ChangedPort']
  209.     if ret['ExternalIP'] == source_ip:
  210.         changeRequest = ''.join([ChangeRequest, '0004', "00000006"])
  211.         ret = stun_test(s, stun_host, port, source_ip, source_port,
  212.                         changeRequest)
  213.         if ret['Resp']:
  214.             typ = OpenInternet
  215.         else:
  216.             typ = SymmetricUDPFirewall
  217.     else:
  218.         changeRequest = ''.join([ChangeRequest, '0004', "00000006"])
  219.         log.debug("Do Test2")
  220.         ret = stun_test(s, stun_host, port, source_ip, source_port,
  221.                         changeRequest)
  222.         log.debug("Result: %s", ret)
  223.         if ret['Resp']:
  224.             typ = FullCone
  225.         else:
  226.             log.debug("Do Test1")
  227.             ret = stun_test(s, changedIP, changedPort, source_ip, source_port)
  228.             log.debug("Result: %s", ret)
  229.             if not ret['Resp']:
  230.                 typ = ChangedAddressError
  231.             else:
  232.                 if exIP == ret['ExternalIP'] and exPort == ret['ExternalPort']:
  233.                     changePortRequest = ''.join([ChangeRequest, '0004',
  234.                                                  "00000002"])
  235.                     log.debug("Do Test3")
  236.                     ret = stun_test(s, changedIP, port, source_ip, source_port,
  237.                                     changePortRequest)
  238.                     log.debug("Result: %s", ret)
  239.                     if ret['Resp']:
  240.                         typ = RestricNAT
  241.                     else:
  242.                         typ = RestricPortNAT
  243.                 else:
  244.                     typ = SymmetricNAT
  245.     return typ, ret
  246.  
  247.  
  248. def get_ip_info(source_ip="0.0.0.0", source_port=54320, stun_host=None,
  249.                 stun_port=3478):
  250.     s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  251.     s.settimeout(2)
  252.     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  253.     s.bind((source_ip, source_port))
  254.     nat_type, nat = get_nat_type(s, source_ip, source_port,
  255.                                  stun_host=stun_host, stun_port=stun_port)
  256.     external_ip = nat['ExternalIP']
  257.     external_port = nat['ExternalPort']
  258.     s.close()
  259.     return (nat_type, external_ip, external_port)
  260.  
  261.  
  262.  
  263. nat_type, external_ip, external_port = get_ip_info()
  264. print('NAT Type:', nat_type)
  265. print('External IP:', external_ip)
  266. print('External Port:', external_port)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement