Advertisement
Guest User

A Nonstandard A-record Only Local DNS Cacher

a guest
Jan 4th, 2019
684
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 28.60 KB | None | 0 0
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3.  
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Lesser General Public License as published
  6. # by the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  12. # GNU Lesser General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Lesser General Public License
  15. # along with this program.  If not, see <https://www.gnu.org/licenses/>.
  16.  
  17. # Acknowledgment
  18. # This program uses the following open source projects:
  19. # BitString, by Scott Griffiths, released under MIT License, Version 2.0
  20. # Pony ORM, by Pony ORM, LLC, released under Apache License, Version 2.0
  21. # Requests, by Kenneth Reitz, released under Apache License, Version 2.0
  22.  
  23. from socket import socket, AF_INET, SOCK_DGRAM
  24. import queue
  25. from threading import Thread
  26. import requests
  27. from bitstring import BitArray, BitStream
  28. import pony.orm
  29. from time import sleep, time
  30. from collections import deque
  31.  
  32. SERVER_IP = '127.0.0.1'
  33. SERVER_PORT = 53
  34.  
  35. DOH_URL = 'https://dns.quad9.net/dns-query'
  36.  
  37. PERIOD = 0.5  # In days
  38.  
  39. POSITIVE_CACHE_FILENAME = 'dns_pcache.sqlite'
  40. NEGATIVE_CACHE_FILENAME = 'dns_ncache.sqlite'
  41. SCHEDULE_FILE_FILENAME = 'cache_update.schedule'
  42.  
  43.  
  44. def main():
  45.     if PERIOD <= 0:
  46.         raise ValueError('PERIOD should be larger than 0.')
  47.     # Load records cache database
  48.     records_cache = pony.orm.Database()
  49.  
  50.     class RecordsCache(records_cache.Entity):
  51.         full_domain = pony.orm.Required(str)
  52.         query_type = pony.orm.Required(str)
  53.         data = pony.orm.Required(str)
  54.         used_in_current_period = pony.orm.Required(bool)
  55.         used_in_previous_period = pony.orm.Required(bool)
  56.         pony.orm.PrimaryKey(full_domain, query_type)
  57.     records_cache.bind(provider='sqlite', filename=POSITIVE_CACHE_FILENAME, create_db=True)
  58.     records_cache.generate_mapping(create_tables=True)
  59.  
  60.     # Create negative cache database
  61.     negative_cache = pony.orm.Database()
  62.  
  63.     class NegativeCache(negative_cache.Entity):
  64.         full_domain = pony.orm.PrimaryKey(str)
  65.     negative_cache.bind(provider='sqlite', filename=NEGATIVE_CACHE_FILENAME, create_db=True)
  66.     negative_cache.generate_mapping(create_tables=True)
  67.  
  68.     with pony.orm.db_session:
  69.         NegativeCache.select().delete(bulk=True)  # Delete all negative caches from last session
  70.  
  71.     try:
  72.         schedule_file = open(file=SCHEDULE_FILE_FILENAME, mode='r+')
  73.         schedule = schedule_file.read().rstrip()
  74.         try:
  75.             schedule = int(schedule)
  76.         except ValueError as e:
  77.             print(e)
  78.             schedule = int(time() + PERIOD * 86400)
  79.             schedule_file.seek(0)  # Filetype will go wrong without this line, I don't know why
  80.             schedule_file.truncate()
  81.             schedule_file.writelines(str(schedule)+'\n')
  82.     except FileNotFoundError:
  83.         schedule_file = open(file=SCHEDULE_FILE_FILENAME, mode='w+')
  84.         schedule = int(time() + PERIOD * 86400)
  85.         schedule_file.writelines(str(schedule)+'\n')
  86.     finally:
  87.         schedule_file.close()
  88.  
  89.     Thread(target=timer_job, args=(schedule, RecordsCache, NegativeCache,)).start()  # Timer thread
  90.  
  91.     task_queue = queue.Queue()
  92.  
  93.     server_socket = socket(AF_INET, SOCK_DGRAM)
  94.     server_socket.bind((SERVER_IP, SERVER_PORT))
  95.  
  96.     for i in range(4):
  97.         Thread(target=worker, args=(task_queue, RecordsCache, NegativeCache, server_socket,)).start()  # Worker threads
  98.  
  99.     while True:
  100.         query_data, client_address = server_socket.recvfrom(512)
  101.         task_package = (query_data, client_address)
  102.         task_queue.put(task_package)
  103.  
  104.  
  105. def worker(task_queue, RecordsCache, NegativeCache, server_socket):
  106.     requests_session = requests.Session()  # Maybe responsiveness can be improved by using a session
  107.     while True:
  108.         query_data, client_address = task_queue.get(block=True)  # Using block=False will result in high cpu usage
  109.         try:
  110.             full_domain, query_type, id, rd, cd, question_section = read_query_stream(BitStream(query_data))
  111.             print('Received query:', full_domain, query_type, "from", client_address)
  112.             if query_type != 'A':
  113.                 if query_type == 'ALL':
  114.                     query_type = 'A'
  115.                 else:
  116.                     raise NotImplementedError('Received query for {} record. The current server supports A records only.'.format(query_type))
  117.         except BaseException as e:
  118.             print(e)
  119.             task_queue.task_done()
  120.             continue  # Skip following procedures, go to next iteration
  121.         try:
  122.             cache_status, cached_data = fetch_cached_data(RecordsCache, NegativeCache, full_domain, query_type)
  123.         except BaseException as e:
  124.             print(e)
  125.             task_queue.task_done()
  126.             continue  # Skip following procedures, go to next iteration
  127.  
  128.         status_to_respond = ''
  129.         address_data = None
  130.         if cache_status == 'positive':  # There is a cache and it is a valid record
  131.             address_data = cached_data
  132.             response = construct_positive_response(id, rd, cd, question_section, address_data)
  133.             status_to_respond = 'NOERROR'
  134.         elif cache_status == 'nxdomain':
  135.             response = construct_nxdomain_response(id, rd, cd, question_section)
  136.             status_to_respond = 'NXDOMAIN'
  137.         elif cache_status == 'nocache':
  138.             try:
  139.                 remote_status, remote_data = fetch_remote_data(requests_session, full_domain, query_type)
  140.             except BaseException as e:
  141.                 # Return server failure
  142.                 print('Failed to get address from remote:', e)
  143.                 response = construct_servfail_response(id, rd, cd, question_section)
  144.                 status_to_respond = 'SERVFAIL'
  145.             else:
  146.                 if remote_status == 'noerror':  # Successfully retrieved an address from remote
  147.                     address_data = remote_data
  148.                     Thread(target=error_output_beautifier, args=(cache_remote_positive_answer, (RecordsCache, full_domain, query_type, remote_data,),)).start()
  149.                     response = construct_positive_response(id, rd, cd, question_section, address_data)
  150.                     status_to_respond = 'NOERROR'
  151.                 elif remote_status == 'nodata':
  152.                     response = construct_nodata_response(id, rd, cd, question_section)
  153.                     status_to_respond = 'NODATA'
  154.                 elif remote_status == 'nxdomain':
  155.                     Thread(target=error_output_beautifier, args=(cache_remote_nxdomain_answer, (NegativeCache, full_domain,),)).start()
  156.                     response = construct_nxdomain_response(id, rd, cd, question_section)
  157.                     status_to_respond = 'NXDOMAIN'
  158.                 elif remote_status == 'servfail':
  159.                     response = construct_servfail_response(id, rd, cd, question_section)
  160.                     status_to_respond = 'SERVFAIL'
  161.                 else:
  162.                     raise NotImplementedError('Unrecognized remote_status.')
  163.         else:
  164.             raise Exception('Invalid cache_status.')
  165.  
  166.         print('Responding message:', full_domain, query_type, str(address_data), status_to_respond)
  167.         binary_response = response.tobytes()
  168.         server_socket.sendto(binary_response, client_address)
  169.         task_queue.task_done()
  170.  
  171.         # Return no error with cached record or remotely fetched record
  172.         # Return no data when having no cache and remotely fetched data is empty
  173.         # Return no such domain when there are negative caches or remote server says so
  174.         # Return server failure when both having no cache and failing to fetch from remote
  175.  
  176.  
  177. def read_query_stream(query_stream):      # query_stream should be a BitStream
  178.     # Read header
  179.     id, qr, opcode, aa, tc, rd, ra, z, ad, cd, rcode, qdcount, ancount, nscount, arcount = query_stream.readlist(
  180.         'bits:16, bool, uint:4, bool, bool, bits:1, bool, bits:1, bool, bits:1, uint:4, uint:16, uint:16, uint:16, uint:16')
  181.  
  182.     # Initial checks on whether the query is legitimate or supported
  183.     if qr:
  184.         raise ValueError('Value of QR should have been 0 as it should be a query.')
  185.     if opcode != 0:
  186.         raise NotImplementedError('Non-standard queries are not supported.')
  187.     if qdcount != 1:
  188.         raise ValueError('The current server only supports query with single question.')
  189.     if ancount != 0:
  190.         raise ValueError('There should be no answer records in query.')
  191.     if nscount != 0:
  192.         raise ValueError('There should be no authority records in query.')
  193.     if aa:
  194.         raise ValueError('Unexpected AA flag being set.')
  195.     if ra:
  196.         raise ValueError('Unexpected RA flag being set.')
  197.  
  198.     # Read question
  199.     questions_start_pos = query_stream.pos  # Used in copying the question section
  200.     domain_labels = []
  201.     while True:
  202.         label_flag = query_stream.read('bits:2')
  203.         if label_flag.bin == '00':
  204.             label_length = query_stream.read('uint:6')
  205.             if label_length > 0:
  206.                 label = query_stream.read('bytes:' + str(label_length))
  207.                 domain_labels.append(label)
  208.             else:
  209.                 break
  210.         else:
  211.             raise ValueError('Question name label flag unrecognized.')
  212.     full_domain = '.'.join([str(label, 'utf-8') for label in domain_labels])
  213.     if not full_domain.endswith('.'):
  214.         full_domain = full_domain + '.'
  215.     full_domain = full_domain.lower()  # Convert to lower case
  216.     query_type_value = query_stream.read('uint:16')
  217.     record_type_dict = {1: 'A', 2: 'NS', 3: 'MD', 4: 'MF', 5: 'CNAME', 6: 'SOA', 7: 'MB', 8: 'MG',
  218.                         9: 'MR', 10: 'NULL', 11: 'WKS', 12: 'PTR', 13: 'HINFO', 14: 'MINFO', 15: 'MX',
  219.                         16: 'TXT', 17: 'RP', 18: 'AFSDB', 19: 'X25', 20: 'ISDN', 21: 'RT', 22: 'NSAP',
  220.                         23: 'NSAP-PTR', 24: 'SIG', 25: 'KEY', 26: 'PX', 27: 'GPOS', 28: 'AAAA',
  221.                         29: 'LOC', 30: 'NXT', 31: 'EID', 32: 'NIMLOC', 33: 'SRV', 34: 'ATMA',
  222.                         35: 'NAPTR', 36: 'KX', 37: 'CERT', 38: 'A6', 39: 'DNAME', 40: 'SINK',
  223.                         41: 'OPT', 42: 'APL', 43: 'DS', 44: 'SSHFP', 45: 'IPSECKEY', 46: 'RRSIG',
  224.                         47: 'NSEC', 48: 'DNSKEY', 49: 'DHCID', 50: 'NSEC3', 51: 'NSEC3PARAM',
  225.                         52: 'TLSA', 53: 'SMIMEA', 55: 'HIP', 59: 'CDS', 60: 'CDSKEY',
  226.                         61: 'OPENGPGKEY', 99: 'SPF', 100: 'UINFO', 101: 'UID', 102: 'GID',
  227.                         103: 'UNSPEC', 249: 'TKEY', 250: 'TSIG', 251: 'IXFR', 252: 'AXFR',
  228.                         253: 'MAILB', 254: 'MAILA', 255: 'ALL', 256: 'URI', 257: 'CAA', 32768: 'TA',
  229.                         32769: 'DLV'}
  230.     query_type = record_type_dict[query_type_value]
  231.     query_class_value = query_stream.read('uint:16')
  232.     if query_class_value != 1:
  233.         raise NotImplementedError('The current server supports Internet class only.')
  234.     questions_end_pos = query_stream.pos
  235.     question_section = query_stream[questions_start_pos:questions_end_pos]
  236.  
  237.     # Ignore other sections
  238.     return full_domain, query_type, id, rd, cd, question_section
  239.  
  240.  
  241. def fetch_cached_data(RecordsCache, NegativeCache, full_domain, query_type):
  242.     with pony.orm.db_session:
  243.         cached_record = RecordsCache.get(full_domain=full_domain, query_type=query_type)
  244.         if cached_record:
  245.             cached_record.used_in_current_period = True
  246.             cached_data = cached_record.data
  247.             return 'positive', cached_data
  248.         if NegativeCache.exists(full_domain=full_domain):
  249.             return 'nxdomain', None
  250.         return 'nocache', None
  251.  
  252.  
  253. def fetch_remote_data(requests_session, full_domain, query_type):
  254.     result = requests_session.get(DOH_URL, params={'name': full_domain, 'type': query_type},
  255.                                   headers={'Accept': 'application/dns-json'}, timeout=10)
  256.     result_json = result.json()
  257.     status_code = int(result_json['Status'])  # 0 = no error, 1 = format error, 2 = server failure,
  258.                                               # 3 = no such domain, 4 = not implemented, 5 = refused
  259.     if status_code == 0:  # Received NOERROR
  260.         try:
  261.             answers = result_json['Answer']
  262.             for answer in answers:
  263.                 if query_type == 'A':
  264.                     if answer['type'] == 1:
  265.                         return 'noerror', answer['data']
  266.         except KeyError:
  267.             pass
  268.         # If there is no data in Answer section or no answer section
  269.         return 'nodata', None
  270.     elif status_code == 3:
  271.         return 'nxdomain', None
  272.     elif status_code == 2:
  273.         return 'servfail', None
  274.     else:
  275.         raise NotImplementedError('Unsupported status code received.')
  276.  
  277.  
  278. def cache_remote_positive_answer(RecordsCache, full_domain, query_type, remote_data):
  279.     with pony.orm.db_session:
  280.         # Automatic handling by ponyorm
  281.         RecordsCache(full_domain=full_domain, query_type=query_type, data=remote_data,
  282.                      used_in_current_period=True,
  283.                      used_in_previous_period=False)
  284.     print('Cached record:', full_domain, query_type, remote_data)
  285.  
  286.  
  287. def cache_remote_nxdomain_answer(NegativeCache, full_domain):
  288.     with pony.orm.db_session:
  289.         # Automatic handling by ponyorm
  290.         NegativeCache(full_domain=full_domain)
  291.     print('Cached NXDOMAIN:', full_domain)
  292.  
  293.  
  294. def construct_positive_response(id, rd, cd, question_section, address_data):
  295.     # See https://tools.ietf.org/html/rfc1035#section-4.1.1
  296.     # Also https://tools.ietf.org/html/rfc6895
  297.     response = BitArray()
  298.  
  299.     # Header section begins
  300.     header = BitArray()
  301.  
  302.     id = id                                # 16bits ID identifier
  303.     qr = BitArray('uint:1=1')              # 1bit specifying if it's query(0) or response(1)
  304.     opcode = BitArray('uint:4=0')          # 4bits specifying query kind. 0 = standard
  305.     aa = BitArray('uint:1=0')              # 1bit specifying if response is authorative(1)
  306.     tc = BitArray('uint:1=0')              # 1bit specifying if message is truncated(1)
  307.     rd = rd                                # Copy from query, 1bit specifying if recursion is desired(1) by sender
  308.     ra = BitArray('uint:1=0')              # 1bit specifying if recursion is available(1) from server
  309.     z = BitArray('uint:1=0')               # 1bit reserved for future use, zero
  310.     ad = BitArray('uint:1=0')              # 1bit stating all records in answer and authority sections are authentic (DNSSEC)
  311.     cd = cd                                # Copy from query, 1bit requesting no signature validation by upstream servers (DNSSEC)
  312.     rcode = BitArray('uint:4=0')           # 4bits stating query status, 0 = no error, 1 = format error,
  313.                                            # 2 = server failure, 3 = name error, 4 = not implemented,
  314.                                            # 5 = refused, 6-15 = reserved
  315.     qdcount = BitArray(uint=1, length=16)  # unsigned 16bits int specifying number of entries in question section
  316.     ancount = BitArray(uint=1, length=16)  # unsigned 16bits int specifying number of records in answer section
  317.     nscount = BitArray('uint:16=0')        # unsigned 16bits int specifying number of records in authority section
  318.     arcount = BitArray('uint:16=0')        # unsigned 16bits int specifying number of records in additional record section
  319.  
  320.     header.append(id)
  321.     header.append(qr)
  322.     header.append(opcode)
  323.     header.append(aa)
  324.     header.append(tc)
  325.     header.append(rd)
  326.     header.append(ra)
  327.     header.append(z)
  328.     header.append(ad)
  329.     header.append(cd)
  330.     header.append(rcode)
  331.     header.append(qdcount)
  332.     header.append(ancount)
  333.     header.append(nscount)
  334.     header.append(arcount)
  335.     response.append(header)
  336.     # Header section ends
  337.  
  338.     # Question section, copied from query
  339.     response.append(question_section)
  340.  
  341.     # Answer section begins
  342.     ttl = abs(int(PERIOD * 86400 / 2))
  343.     offset_starting_point = 12  # It can be 12 only assuming query only has one question
  344.  
  345.     rr_part = BitArray()
  346.     # NAME
  347.     rr_part.append(BitArray('uint:2=3'))  # 2bits, '11', flag for compression
  348.     rr_part.append(BitArray(uint=offset_starting_point, length=14))  # 14bit, offset specified in compression
  349.     # TYPE
  350.     rr_part.append(BitArray('uint:16=1'))  # A record
  351.     # CLASS
  352.     rr_part.append(BitArray('uint:16=1'))  # Internet Class
  353.     # TTL
  354.     rr_part.append(BitArray(uint=ttl, length=32))  # TTL in seconds
  355.     # RDLENGTH, specifying length of data
  356.     rr_part.append(BitArray('uint:16=4'))  # IP address in A record requires 4 octets(bytes)
  357.     # RDATA
  358.     ipaddress = [BitArray(uint=int(part), length=8) for part in address_data.split('.')]  # Like socket.inet_aton()
  359.     for part in ipaddress:
  360.         rr_part.append(part)
  361.     response.append(rr_part)
  362.     # Answer section ends
  363.  
  364.     # Ignore other sections
  365.     return response
  366.  
  367.  
  368. def construct_nodata_response(id, rd, cd, question_section):
  369.     # See https://tools.ietf.org/html/rfc1035#section-4.1.1
  370.     # Also https://tools.ietf.org/html/rfc6895
  371.     response = BitArray()
  372.  
  373.     # Header section begins
  374.     header = BitArray()
  375.  
  376.     id = id                                # 16bits ID identifier
  377.     qr = BitArray('uint:1=1')              # 1bit specifying if it's query(0) or response(1)
  378.     opcode = BitArray('uint:4=0')          # 4bits specifying query kind. 0 = standard
  379.     aa = BitArray('uint:1=0')              # 1bit specifying if response is authorative(1)
  380.     tc = BitArray('uint:1=0')              # 1bit specifying if message is truncated(1)
  381.     rd = rd                                # Copy from query, 1bit specifying if recursion is desired(1) by sender
  382.     ra = BitArray('uint:1=0')              # 1bit specifying if recursion is available(1) from server
  383.     z = BitArray('uint:1=0')               # 1bit reserved for future use, zero
  384.     ad = BitArray('uint:1=0')              # 1bit stating all records in answer and authority sections are authentic (DNSSEC)
  385.     cd = cd                                # Copy from query, 1bit requesting no signature validation by upstream servers (DNSSEC)
  386.     rcode = BitArray('uint:4=0')           # 4bits stating query status, 0 = no error, 1 = format error,
  387.                                            # 2 = server failure, 3 = name error, 4 = not implemented,
  388.                                            # 5 = refused, 6-15 = reserved
  389.                                            # In NODATA scenario, it should be 0
  390.     qdcount = BitArray(uint=1, length=16)  # unsigned 16bits int specifying number of entries in question section
  391.     ancount = BitArray(uint=0, length=16)  # unsigned 16bits int specifying number of records in answer section. In NODATA scenario, it should be 0
  392.     nscount = BitArray('uint:16=0')        # unsigned 16bits int specifying number of records in authority section
  393.     arcount = BitArray('uint:16=0')        # unsigned 16bits int specifying number of records in additional record section
  394.  
  395.     header.append(id)
  396.     header.append(qr)
  397.     header.append(opcode)
  398.     header.append(aa)
  399.     header.append(tc)
  400.     header.append(rd)
  401.     header.append(ra)
  402.     header.append(z)
  403.     header.append(ad)
  404.     header.append(cd)
  405.     header.append(rcode)
  406.     header.append(qdcount)
  407.     header.append(ancount)
  408.     header.append(nscount)
  409.     header.append(arcount)
  410.     response.append(header)
  411.     # Header section ends
  412.  
  413.     # Question section, copied from query
  414.     response.append(question_section)
  415.  
  416.     # No Answer section in NODATA scenario
  417.  
  418.     # Ignore other sections
  419.     return response
  420.  
  421.  
  422. def construct_nxdomain_response(id, rd, cd, question_section):
  423.     # See https://tools.ietf.org/html/rfc1035#section-4.1.1
  424.     # Also https://tools.ietf.org/html/rfc6895
  425.     response = BitArray()
  426.  
  427.     # Header section begins
  428.     header = BitArray()
  429.  
  430.     id = id                                # 16bits ID identifier
  431.     qr = BitArray('uint:1=1')              # 1bit specifying if it's query(0) or response(1)
  432.     opcode = BitArray('uint:4=0')          # 4bits specifying query kind. 0 = standard
  433.     aa = BitArray('uint:1=0')              # 1bit specifying if response is authorative(1)
  434.     tc = BitArray('uint:1=0')              # 1bit specifying if message is truncated(1)
  435.     rd = rd                                # Copy from query, 1bit specifying if recursion is desired(1) by sender
  436.     ra = BitArray('uint:1=0')              # 1bit specifying if recursion is available(1) from server
  437.     z = BitArray('uint:1=0')               # 1bit reserved for future use, zero
  438.     ad = BitArray('uint:1=0')              # 1bit stating all records in answer and authority sections are authentic (DNSSEC)
  439.     cd = cd                                # Copy from query, 1bit requesting no signature validation by upstream servers (DNSSEC)
  440.     rcode = BitArray('uint:4=3')           # 4bits stating query status, 0 = no error, 1 = format error,
  441.                                            # 2 = server failure, 3 = name error, 4 = not implemented,
  442.                                            # 5 = refused, 6-15 = reserved
  443.                                            # In NXDOMAIN scenario, it should be 3
  444.     qdcount = BitArray(uint=1, length=16)  # unsigned 16bits int specifying number of entries in question section
  445.     ancount = BitArray(uint=0, length=16)  # unsigned 16bits int specifying number of records in answer section
  446.     nscount = BitArray('uint:16=0')        # unsigned 16bits int specifying number of records in authority section
  447.     arcount = BitArray('uint:16=0')        # unsigned 16bits int specifying number of records in additional record section
  448.  
  449.     header.append(id)
  450.     header.append(qr)
  451.     header.append(opcode)
  452.     header.append(aa)
  453.     header.append(tc)
  454.     header.append(rd)
  455.     header.append(ra)
  456.     header.append(z)
  457.     header.append(ad)
  458.     header.append(cd)
  459.     header.append(rcode)
  460.     header.append(qdcount)
  461.     header.append(ancount)
  462.     header.append(nscount)
  463.     header.append(arcount)
  464.     response.append(header)
  465.     # Header section ends
  466.  
  467.     # Question section, copied from query
  468.     response.append(question_section)
  469.  
  470.     # No Answer section in NXDOMAIN scenario
  471.  
  472.     # Ignore other sections
  473.     return response
  474.  
  475.  
  476. def construct_servfail_response(id, rd, cd, question_section):
  477.     # See https://tools.ietf.org/html/rfc1035#section-4.1.1
  478.     # Also https://tools.ietf.org/html/rfc6895
  479.     response = BitArray()
  480.  
  481.     # Header section begins
  482.     header = BitArray()
  483.  
  484.     id = id                                # 16bits ID identifier
  485.     qr = BitArray('uint:1=1')              # 1bit specifying if it's query(0) or response(1)
  486.     opcode = BitArray('uint:4=0')          # 4bits specifying query kind. 0 = standard
  487.     aa = BitArray('uint:1=0')              # 1bit specifying if response is authorative(1)
  488.     tc = BitArray('uint:1=0')              # 1bit specifying if message is truncated(1)
  489.     rd = rd                                # Copy from query, 1bit specifying if recursion is desired(1) by sender
  490.     ra = BitArray('uint:1=0')              # 1bit specifying if recursion is available(1) from server
  491.     z = BitArray('uint:1=0')               # 1bit reserved for future use, zero
  492.     ad = BitArray('uint:1=0')              # 1bit stating all records in answer and authority sections are authentic (DNSSEC)
  493.     cd = cd                                # Copy from query, 1bit requesting no signature validation by upstream servers (DNSSEC)
  494.     rcode = BitArray('uint:4=2')           # 4bits stating query status, 0 = no error, 1 = format error,
  495.                                            # 2 = server failure, 3 = name error, 4 = not implemented,
  496.                                            # 5 = refused, 6-15 = reserved
  497.                                            # In SERVFAIL scenario, it should be 2
  498.     qdcount = BitArray(uint=1, length=16)  # unsigned 16bits int specifying number of entries in question section
  499.     ancount = BitArray(uint=0, length=16)  # unsigned 16bits int specifying number of records in answer section
  500.     nscount = BitArray('uint:16=0')        # unsigned 16bits int specifying number of records in authority section
  501.     arcount = BitArray('uint:16=0')        # unsigned 16bits int specifying number of records in additional record section
  502.  
  503.     header.append(id)
  504.     header.append(qr)
  505.     header.append(opcode)
  506.     header.append(aa)
  507.     header.append(tc)
  508.     header.append(rd)
  509.     header.append(ra)
  510.     header.append(z)
  511.     header.append(ad)
  512.     header.append(cd)
  513.     header.append(rcode)
  514.     header.append(qdcount)
  515.     header.append(ancount)
  516.     header.append(nscount)
  517.     header.append(arcount)
  518.     response.append(header)
  519.     # Header section ends
  520.  
  521.     # Question section, copied from query
  522.     response.append(question_section)
  523.  
  524.     # No Answer section in SERVFAIL scenario
  525.  
  526.     # Ignore other sections
  527.     return response
  528.  
  529.  
  530. def change_period(RecordsCache, NegativeCache):
  531.     print('Starting to change period.')
  532.     with pony.orm.db_session:
  533.         # Delete cached records unused in both current period and previous period
  534.         # Note: When next system boot is beyond a period, cached records in regular
  535.         #       use may be discarded inadvertently if a cycle of one period is used.
  536.         #       Consider the case where a system shutdowns right after a period change,
  537.         #       and the next time the system boots is beyond a period, in this case,
  538.         #       change_period() will be initiated, and all records will be discarded.
  539.         #       Therefore a cycle of two periods is used.
  540.         RecordsCache.select(lambda record: (not record.used_in_current_period) and (not record.used_in_previous_period)).delete(bulk=True)
  541.  
  542.         # Update used_in_current_period and used_in_previous_period
  543.         cached_records = RecordsCache.select()
  544.         for record in cached_records:
  545.             record.used_in_previous_period = record.used_in_current_period
  546.             record.used_in_current_period = False
  547.  
  548.         # Delete all negative caches
  549.         NegativeCache.select().delete(bulk=True)
  550.     print('Unused records discarded, negative cache deleted, and period columns updated in database.')
  551.     update_cache(RecordsCache, NegativeCache)
  552.  
  553.  
  554. def update_cache(RecordsCache, NegativeCache):
  555.     with pony.orm.db_session:
  556.         cached_records = RecordsCache.select()
  557.         records_to_update = deque((record.full_domain, record.query_type) for record in cached_records)
  558.  
  559.     requests_session = requests.Session()
  560.     while True:
  561.         try:
  562.             record = records_to_update.popleft()
  563.         except IndexError:
  564.             break
  565.         else:
  566.             full_domain, query_type = record[0], record[1]
  567.             try:
  568.                 remote_status, remote_data = fetch_remote_data(requests_session, full_domain, query_type)
  569.             except BaseException as e:
  570.                 print(e)
  571.                 print('Cache update failed, re-enqueue task:', full_domain, query_type)
  572.                 records_to_update.append(record)
  573.             else:
  574.                 if remote_status == 'noerror':
  575.                     try:
  576.                         with pony.orm.db_session:
  577.                             RecordsCache.get(full_domain=full_domain, query_type=query_type).set(data=remote_data)
  578.                     except BaseException as e:
  579.                         print(e)
  580.                     else:
  581.                         print('Updated cache:', full_domain, query_type, remote_data)
  582.                 else:
  583.                     print('Cache update failed, re-enqueue task:', full_domain, query_type)
  584.                     records_to_update.append(record)
  585.             sleep(5)
  586.  
  587.  
  588. def update_schedule_file(schedule):
  589.     try:
  590.         with open(file=SCHEDULE_FILE_FILENAME, mode='w+') as schedule_file:
  591.             schedule_file.writelines(str(schedule)+'\n')
  592.     except BaseException as e:
  593.         print(e)
  594.  
  595.  
  596. def timer_job(schedule, RecordsCache, NegativeCache):
  597.     print('Timer job started')
  598.     while True:
  599.         time_to_wait = int(schedule - time())
  600.         print('Seconds to next period change:', time_to_wait)
  601.         if time_to_wait >= 0:
  602.             sleep(time_to_wait)
  603.         change_period(RecordsCache, NegativeCache)
  604.         schedule = int(time() + PERIOD * 86400)
  605.         update_schedule_file(schedule)
  606.  
  607.  
  608. def error_output_beautifier(function, args):
  609.     try:
  610.         function(*args)
  611.     except BaseException as e:
  612.         print(e)
  613.  
  614.  
  615. if __name__ == '__main__':
  616.     main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement