Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #Python imports
- import sys
- import json
- import asyncio
- import logging
- #Package imports
- import kafka
- import httpx
- import datadog
- #Proyect imports
- # Los pongo mas abajo: from utils.classes import StatusClass
- # Los pongo mas abajo: from utils.constants import *
- #Logging definition:
- logging.basicConfig(
- level=int(DEBUG_LEVEL),#Change to DEBUG for more info
- format='%(asctime)s %(process)d %(name)-12s %(levelname)-8s %(message)s',
- filename='logs/reader.log',
- filemode='w' #Whis clean log every reboot so dont increse too much its size
- )
- #Datadog Initialization
- options = {
- 'host_name': DATADOG_HOSTNAME,
- 'statsd_host': DATADOG_HOST,
- 'statsd_port': DATADOG_PORT
- }
- datadog.initialize(**options)
- ## utils.classes
- #Python imports
- import json
- import time
- import logging
- #Extra Imports
- import kafka
- #Project imports
- from utils.constants import KAFKA_TOPIC, KAFKA_URL
- #class definitions
- class StatusClass:
- def __init__(self):
- self.certs = []
- self.kafka_consumer = self.create_consumer()
- def create_consumer(self):
- while True:#loop infinito waiting till kafka is online
- try:
- #We try to make connection
- consumer = kafka.KafkaConsumer(
- KAFKA_TOPIC,
- bootstrap_servers=KAFKA_URL,
- auto_offset_reset='earliest',
- enable_auto_commit=True,
- auto_commit_interval_ms=1000,
- group_id='certreaders',
- value_deserializer=lambda x: json.loads(x)#it understand json sended
- )
- #if this succeed:
- logging.info("Begin reading Kafka Service in %s...", format(KAFKA_URL))
- return consumer
- except kafka.errors.NoBrokersAvailable:#If kafka not online
- time.sleep(1)#we wait one more second
- logging.info("Waiting for Kafka Service in %s...", format(KAFKA_URL))
- #We defined Cert Class
- class CertClass():
- def __init__(self, msg):
- self.msg = msg #from Certlib Calidog
- #Self calculated
- self.domain = self.get_domain()
- self.san = self.get_san()
- #To kafka
- self.send_status = None#Info from kafka server
- #IA Predictions
- self.modeld_answer = None
- self.label = None
- self.label_prob = None#To complete with IA Analisys
- #Basic json lookups:
- def get_domain(self):
- return self.msg['data']['leaf_cert']['subject']['CN']
- def url(self):
- url = self.domain.replace('*.', '')
- return 'https://' + url
- def get_san(self):
- return self.msg['data']['leaf_cert']['all_domains']
- def get_model_predict(self, predict):
- self.modeld_answer = predict
- self.label = predict.json()[0]['label']
- self.label_prob = predict.json()[0]['prob']
- ## utils.constants
- #python imports
- import os
- import logging
- #Constants:
- DEBUG_LEVEL = os.getenv('DEBUG_LEVEL', default=logging.DEBUG)
- MAX_WORKERS = os.getenv('MAX_WORKERS', default=20)
- SCREENING = os.getenv('PYTHON_SCREENING', default=True)
- #kafka
- KAFKA_URL = os.getenv('KAFKA_CONNECT', default='localhost:9092')
- KAFKA_TOPIC = os.getenv('KAFKA_TOPIC', default='CertStream')
- #datadog
- DATADOG_HOST = os.getenv('DATADOG_HOST', default="localhost")
- DATADOG_PORT = os.getenv('DATADOG_PORT', default=8125)
- DATADOG_HOSTNAME = os.getenv('HOSTNAME', default='DEVELOPMENT')
- DATADOG_ENVIROMENT = os.getenv('DATADOG_ENV', default="environment:dev")
- #IA Model
- MODELD_HOST = os.getenv('MODELD_HOST', default='http://localhost:7779')
- MODELD_HEADER = {'Content-type': 'application/json'}
- #Django
- DJANGO_URL = os.getenv('DJANGO_URL', default='http://localhost:8000/apis/reg_cert')
- #Pishing detection
- THRESHOLD_PHISHING = 0.8
- THRESHOLD_SUSPICIOUS = 0.6
- ## ESTO ES EL certreader.py
- #Definition of very simple proyect
- async def process_cert(status):
- logging.info('process_cert started...')
- # We instance one client for every worker
- client = httpx.AsyncClient()
- #We process the certs that are coming to queue
- while True:
- try:
- if status.certs:
- cert = status.certs.pop(0)#we get the first cert
- #we ask for the analisys
- cert.get_model_predict(
- await client.post(
- MODELD_HOST,
- json=[{'url': cert.url()}],
- headers={'Content-type': 'application/json'},
- timeout=None,
- )
- )
- #We sent data to datadog
- datadog.statsd.increment('certs_classified.increment', tags=[DATADOG_ENVIROMENT])
- #We has cert analized from
- if SCREENING:
- print("{0} is {1}".format(cert.domain, cert.label))
- except Exception:
- logging.error('fail sending batch', exc_info=sys.exc_info())
- async def queue_reader(status):
- logging.info('queue_reader started...')
- #We start reading messages from kafka:
- for msg in status.kafka_consumer:
- try:
- #for every message we instance cert class
- cert = CertClass(msg.value)
- #we add to my queue to process
- status.certs.append(cert)
- #we make time of cpu for processeing certs
- await asyncio.sleep(0)
- except Exception:
- logging.error('fail sending batch', exc_info=sys.exc_info())
- def main():
- #Instance the program control class
- status = StatusClass()
- #We create all the workers
- for x in range(MAX_WORKERS):
- asyncio.create_task(process_cert(status))
- logging.info('%s worker created...', x)
- #We make the work generator start
- asyncio.run(queue_reader(status))
- #Launch time!
- if __name__ == "__main__":
- main()
- ## EL ERROR:
- # Traceback (most recent call last):
- # File "certreader.py", line 85, in <module>
- # main()
- # File "certreader.py", line 78, in main
- # asyncio.create_task(process_cert(status))
- # File "/usr/lib/python3.7/asyncio/tasks.py", line 350, in create_task
- # loop = events.get_running_loop()
- # RuntimeError: no running event loop
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement