Advertisement
mmandrille

Untitled

Dec 15th, 2020
64
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 6.26 KB | None | 0 0
  1. #Python imports
  2. import sys
  3. import json
  4. import asyncio
  5. import logging
  6. #Package imports
  7. import kafka
  8. import httpx
  9. import datadog
  10. #Proyect imports
  11. # Los pongo mas abajo: from utils.classes import StatusClass
  12. # Los pongo mas abajo: from utils.constants import *
  13.  
  14. #Logging definition:
  15. logging.basicConfig(
  16.     level=int(DEBUG_LEVEL),#Change to DEBUG for more info
  17.     format='%(asctime)s %(process)d %(name)-12s %(levelname)-8s %(message)s',
  18.     filename='logs/reader.log',
  19.     filemode='w' #Whis clean log every reboot so dont increse too much its size
  20. )
  21.  
  22. #Datadog Initialization
  23. options = {
  24.     'host_name': DATADOG_HOSTNAME,
  25.     'statsd_host': DATADOG_HOST,
  26.     'statsd_port': DATADOG_PORT
  27. }
  28. datadog.initialize(**options)
  29.  
  30. ## utils.classes
  31. #Python imports
  32. import json
  33. import time
  34. import logging
  35. #Extra Imports
  36. import kafka
  37. #Project imports
  38. from utils.constants import KAFKA_TOPIC, KAFKA_URL
  39.  
  40. #class definitions
  41. class StatusClass:
  42.     def __init__(self):
  43.         self.certs = []
  44.         self.kafka_consumer = self.create_consumer()
  45.  
  46.     def create_consumer(self):
  47.         while True:#loop infinito waiting till kafka is online
  48.             try:
  49.                 #We try to make connection
  50.                 consumer = kafka.KafkaConsumer(
  51.                     KAFKA_TOPIC,
  52.                     bootstrap_servers=KAFKA_URL,
  53.                     auto_offset_reset='earliest',
  54.                     enable_auto_commit=True,
  55.                     auto_commit_interval_ms=1000,
  56.                     group_id='certreaders',
  57.                     value_deserializer=lambda x: json.loads(x)#it understand json sended
  58.                 )
  59.                 #if this succeed:
  60.                 logging.info("Begin reading Kafka Service in %s...", format(KAFKA_URL))
  61.                 return consumer
  62.  
  63.             except kafka.errors.NoBrokersAvailable:#If kafka not online
  64.                 time.sleep(1)#we wait one more second
  65.                 logging.info("Waiting for Kafka Service in %s...", format(KAFKA_URL))
  66.  
  67. #We defined Cert Class
  68. class CertClass():
  69.     def __init__(self, msg):
  70.         self.msg = msg #from Certlib Calidog
  71.         #Self calculated
  72.         self.domain = self.get_domain()
  73.         self.san = self.get_san()
  74.         #To kafka
  75.         self.send_status = None#Info from kafka server
  76.         #IA Predictions
  77.         self.modeld_answer = None
  78.         self.label = None
  79.         self.label_prob = None#To complete with IA Analisys
  80.  
  81.     #Basic json lookups:
  82.     def get_domain(self):
  83.         return self.msg['data']['leaf_cert']['subject']['CN']
  84.     def url(self):
  85.         url = self.domain.replace('*.', '')
  86.         return 'https://' + url
  87.     def get_san(self):
  88.         return self.msg['data']['leaf_cert']['all_domains']
  89.     def get_model_predict(self, predict):
  90.         self.modeld_answer = predict
  91.         self.label = predict.json()[0]['label']
  92.         self.label_prob = predict.json()[0]['prob']
  93.    
  94. ## utils.constants
  95. #python imports
  96. import os
  97. import logging
  98.  
  99. #Constants:
  100. DEBUG_LEVEL = os.getenv('DEBUG_LEVEL', default=logging.DEBUG)
  101. MAX_WORKERS = os.getenv('MAX_WORKERS', default=20)
  102. SCREENING = os.getenv('PYTHON_SCREENING', default=True)
  103. #kafka
  104. KAFKA_URL = os.getenv('KAFKA_CONNECT', default='localhost:9092')
  105. KAFKA_TOPIC = os.getenv('KAFKA_TOPIC', default='CertStream')
  106. #datadog
  107. DATADOG_HOST = os.getenv('DATADOG_HOST', default="localhost")
  108. DATADOG_PORT = os.getenv('DATADOG_PORT', default=8125)
  109. DATADOG_HOSTNAME = os.getenv('HOSTNAME', default='DEVELOPMENT')
  110. DATADOG_ENVIROMENT = os.getenv('DATADOG_ENV', default="environment:dev")
  111. #IA Model
  112. MODELD_HOST = os.getenv('MODELD_HOST', default='http://localhost:7779')
  113. MODELD_HEADER = {'Content-type': 'application/json'}
  114. #Django
  115. DJANGO_URL = os.getenv('DJANGO_URL', default='http://localhost:8000/apis/reg_cert')
  116.  
  117. #Pishing detection
  118. THRESHOLD_PHISHING = 0.8
  119. THRESHOLD_SUSPICIOUS = 0.6
  120.  
  121. ## ESTO ES EL certreader.py
  122. #Definition of very simple proyect
  123. async def process_cert(status):
  124.     logging.info('process_cert started...')
  125.     # We instance one client for every worker
  126.     client = httpx.AsyncClient()
  127.     #We process the certs that are coming to queue
  128.     while True:
  129.         try:
  130.             if status.certs:
  131.                 cert = status.certs.pop(0)#we get the first cert
  132.                 #we ask for the analisys
  133.                 cert.get_model_predict(
  134.                     await client.post(
  135.                         MODELD_HOST,
  136.                         json=[{'url': cert.url()}],
  137.                         headers={'Content-type': 'application/json'},
  138.                         timeout=None,
  139.                     )
  140.                 )
  141.                 #We sent data to datadog
  142.                 datadog.statsd.increment('certs_classified.increment', tags=[DATADOG_ENVIROMENT])
  143.                 #We has cert analized from
  144.                 if SCREENING:
  145.                     print("{0} is {1}".format(cert.domain, cert.label))
  146.  
  147.         except Exception:
  148.             logging.error('fail sending batch', exc_info=sys.exc_info())
  149.  
  150. async def queue_reader(status):
  151.     logging.info('queue_reader started...')
  152.     #We start reading messages from kafka:
  153.     for msg in status.kafka_consumer:
  154.         try:
  155.             #for every message we instance cert class
  156.             cert = CertClass(msg.value)
  157.             #we add to my queue to process
  158.             status.certs.append(cert)
  159.             #we make time of cpu for processeing certs
  160.             await asyncio.sleep(0)
  161.  
  162.         except Exception:
  163.             logging.error('fail sending batch', exc_info=sys.exc_info())
  164.  
  165. def main():
  166.     #Instance the program control class
  167.     status = StatusClass()
  168.     #We create all the workers
  169.     for x in range(MAX_WORKERS):
  170.         asyncio.create_task(process_cert(status))
  171.         logging.info('%s worker created...', x)
  172.     #We make the work generator start
  173.     asyncio.run(queue_reader(status))
  174.  
  175. #Launch time!
  176. if __name__ == "__main__":
  177.     main()
  178.  
  179.  
  180.  
  181. ## EL ERROR:
  182. #   Traceback (most recent call last):
  183. #     File "certreader.py", line 85, in <module>
  184. #       main()
  185. #     File "certreader.py", line 78, in main
  186. #       asyncio.create_task(process_cert(status))
  187. #     File "/usr/lib/python3.7/asyncio/tasks.py", line 350, in create_task
  188. #       loop = events.get_running_loop()
  189. #   RuntimeError: no running event loop
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement