Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/local/bin/python3
- # Tested under version 22.1
- # Disable circular logs under System > Settings > Logging
- # requires dshield.ini from https://github.com/jullrich/dshieldpfsense/blob/master/dshield.sample
- #
- # cron - create /etc/cron.d/dshield
- # 53 2,5,8,11,14,17,20,23 * * * root /usr/local/bin/dshield
- import sys
- import configparser
- import ipaddress
- import pytz
- import smtplib
- from datetime import date, datetime
- from email.message import EmailMessage
- from os import path
- config_file = '/usr/local/etc/dshield.ini'
- timezone_name = 'US/Central'
- dshield_email_addr = 'reports@dshield.org'
- last_run_timestamp_file = '/var/run/dshield'
- filter_log_dir = '/var/log/filter'
- smtp_server_name = 'localhost'
- smtp_server_port = 25
- def parse_record(log_line: str) -> dict:
- log = {}
- log_fields = log_line.split(',')
- log['rulenr'] = int(log_fields[0])
- log['ridentifier'] = log_fields[3]
- log['interface'] = log_fields[4]
- log['reason'] = log_fields[5]
- log['action'] = log_fields[6]
- log['dir'] = log_fields[7]
- log['version'] = int(log_fields[8])
- if log['version'] == 4:
- log['tos'] = log_fields[9]
- log['ttl'] = int(log_fields[11])
- log['id'] = int(log_fields[12])
- log['offset'] = int(log_fields[13])
- log['ipflags'] = log_fields[14]
- log['proto'] = int(log_fields[15])
- log['protoname'] = log_fields[16].upper()
- log['length'] = int(log_fields[17])
- log['src'] = log_fields[18]
- log['dst'] = log_fields[19]
- if log['proto'] == 1:
- log['srcport'] = '???'
- log['dstport'] = '???'
- # proto 6 = TCP
- # proto 17 = UDP
- if log['proto'] in [6, 17]:
- log['srcport'] = int(log_fields[20])
- log['dstport'] = int(log_fields[21])
- log['datalen'] = int(log_fields[22])
- if log['proto'] == 6:
- log['tcpflags'] = log_fields[23]
- log['seq'] = log_fields[24]
- log['ack'] = log_fields[25]
- log['urp'] = int(log_fields[26])
- log['tcpoptions'] = log_fields[28]
- else:
- log['options'] = log_fields[20]
- elif log['version'] == 6:
- log['class'] = log_fields[9]
- log['flow'] = log_fields[10]
- log['hoplimit'] = log_fields[11]
- log['protoname'] = log_fields[12]
- log['proto'] = int(log_fields[13])
- log['payload-length'] = int(log_fields[14])
- log['src'] = ipaddress.ip_address(log_fields[15]).exploded
- log['dst'] = ipaddress.ip_address(log_fields[16]).exploded
- if log['proto'] == 58:
- # replace 'IPV6-ICMP' with standard name
- log['protoname'] = 'ICMP'
- if log['proto'] in [6, 17]:
- log['srcport'] = int(log_fields[17])
- log['dstport'] = int(log_fields[18])
- log['datalen'] = int(log_fields[19])
- if log['proto'] == 6:
- log['tcpflags'] = log_fields[20]
- # leaving seq as string as '2028535965:2028536024' is possible
- log['seq'] = log_fields[21]
- log['ack'] = log_fields[22]
- log['urp'] = int(log_fields[23])
- log['tcpoptions'] = log_fields[25]
- if debug:
- for field in log.keys():
- if type(log[field]) in [int]:
- print(f"{field}: {log[field]}")
- elif type(log[field]) == datetime:
- print(f"{field}: {log[field].strftime('%Y-%m-%dT%H:%M:%S%z')}")
- else:
- print(f"{field}: '{log[field]}'")
- return log
- def check_record(record: dict) -> list:
- issues = []
- if record['interface'] not in interfaces:
- issues.append(f"{record['interface']} not in interface list")
- elif record['dir'] == 'out':
- issues.append(f"{record['interface']} traffic is outbound")
- ip_src = ipaddress.ip_address(record['src'])
- if ip_src in authorized_source_ip:
- issues.append(f"{record['src']} in authorized_source_ip")
- if not ip_src.is_global:
- issues.append(f"src:{ip_src.compressed} not valid IP")
- ip_dst = ipaddress.ip_address(record['dst'])
- if not ip_dst.is_global or ip_dst.is_multicast:
- issues.append(f"dst:{ip_dst.compressed} not valid IP")
- if record['protoname'] == 'IGMP':
- issues.append(f"skipping {record['protoname']}")
- if debug and len(issues) > 0:
- print(f"Issues found: {issues}")
- return issues
- def localize_datetime(log_date: str) -> datetime:
- log_datetime = datetime.strptime(log_date, "%b %d %H:%M:%S")
- if today_month < log_datetime.month:
- log_datetime = log_datetime.replace(year=today_year - 1)
- else:
- log_datetime = log_datetime.replace(year=today_year)
- return local_tz.localize(log_datetime)
- def add_record_to_msg(record:dict) -> None:
- global msg_body
- record_date = record.get('date').strftime('%Y-%m-%d %H:%M:%S %z')
- # add colon to TZ field as required for Dshield format
- email_date = f'{record_date[:-2]}:{record_date[-2:]}'
- src_port = record.get('srcport', '')
- dst_port = record.get('dstport', '')
- tcp_flags = record.get('tcpflags', '')
- msg_record = f"{email_date}\t{uid}\t1\t{record['src']}\t{src_port}\t{record['dst']}\t{dst_port}\t{record['protoname']}\t{tcp_flags}\n"
- if debug:
- print(f'msg+: {msg_record}', end='')
- msg_body += msg_record
- def compose_email(body: str) -> EmailMessage:
- msg = EmailMessage()
- local_tz_offset = datetime.now(local_tz).strftime('%z')
- msg['Subject'] = f'FORMAT DSHIELD USERID {uid} TZ {local_tz_offset[:-2]}:{local_tz_offset[-2:]} OPNsense 0.01'
- msg['To'] = dshield_email_addr
- msg['From'] = from_address
- if cc_address != '':
- msg['CC'] = cc_address
- msg.set_content(body)
- if debug:
- print_header('=', 80, 'EMAIL TO SEND')
- print(msg.as_string())
- return msg
- def print_header(char: str, length: int, header: str = None) -> None:
- print(char * length)
- if header is not None:
- spacer_length = int((length - len(header) - 4) / 2)
- print(char * spacer_length, end=' ')
- print(header, end=' ')
- print(char * (spacer_length + (length - len(header) - 4) % spacer_length))
- print(char * length)
- def read_timestamp(file_name: str) -> int:
- timestamp = 0
- try:
- if path.exists(file_name) and path.isfile(file_name):
- with open(file_name, 'r') as f:
- timestamp = int(f.read())
- except ValueError:
- pass
- if debug:
- print(f'Last run: {timestamp}')
- return timestamp
- def write_timestamp(file_name: str) -> int:
- timestamp = int(datetime.now().timestamp())
- with open(file_name, 'w') as f:
- f.write(str(timestamp))
- return timestamp
- config = configparser.ConfigParser()
- if not config.read(config_file):
- print(f"Cannot open '{config_file}' file for reading. Exiting.")
- sys.exit()
- filter_file = datetime.now().strftime(f'{filter_log_dir}/filter_%Y%m%d.log')
- if not path.exists(filter_file) or not path.isfile(filter_file):
- print(f"Cannot open '{filter_file}' for reading. Exiting.")
- sys.exit()
- # get dshield.ini config parameters
- debug = config['dshield'].getboolean('debug', False)
- interfaces = config['dshield'].get('interfaces', '').strip('"\'').split(',')
- authorized_source_ip = [ipaddress.ip_address(ip) for ip in config['dshield'].get('authorized_source_ip', '').strip('"\'').split(',')]
- uid = config['dshield'].get('uid', '').strip('"\'')
- from_address = config['dshield'].get('fromaddr', '').strip('"\'')
- cc_address = config['dshield'].get('ccaddr', '').strip('"\'')
- if debug:
- print_header('-', 80, 'PROGRAM PARAMETERS')
- print(f'Interfaces: {interfaces}')
- print(f'Authorized IPs: {authorized_source_ip}')
- print(f'UID: \'{uid}\'')
- print(f'fromaddr: \'{from_address}\'')
- print(f'ccaddr: \'{cc_address}\'')
- local_tz = pytz.timezone(timezone_name)
- last_run_timestamp = read_timestamp(last_run_timestamp_file)
- lines = 0
- msg_body = ''
- with open(filter_file, 'r') as f:
- today_month = date.today().month
- today_year = date.today().year
- for filter_line in f:
- log_date = datetime.fromisoformat(filter_line.split()[1])
- if log_date.timestamp() < last_run_timestamp:
- continue
- if debug:
- print_header('-', 80)
- print(f"stdin: {filter_line}", end='')
- try:
- log = parse_record(filter_line.split()[8])
- log['date'] = log_date
- if check_record(log) == []:
- add_record_to_msg(log)
- lines += 1
- except (IndexError, ValueError, pytz.exceptions.AmbiguousTimeError):
- # IndexError happens when there are missing fields sent to parse_record()
- # ValueError happens when corrupt log causes int() in parse_record() to fail
- # pytz.exceptions.AmbiguousTimeError can happen when DST switches back to Standard Time
- pass
- if lines > 0:
- if debug:
- print(msg_body)
- with smtplib.SMTP(smtp_server_name, smtp_server_port) as smtp:
- smtp.send_message(compose_email(msg_body))
- smtp.quit()
- write_timestamp(last_run_timestamp_file)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement