Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- __author__ = 'Imam Omar Mochtar <iomarmochtar@gmail.com>'
- """
- Parse zimbra audit file for blocking any massive failed authentication which indicating brute force attempt
- this script assume zimbra proxy and mailbox service in same server with original ip (oip) is logged
- the attempted IP will listed in AUDITWATCH chain. for applying block combine run this command for add it in INPUT filter
- # iptables -t filter -I INPUT -j AUDITWATCH
- """
- import re
- import sys
- import subprocess
- import smtplib
- from email.mime.multipart import MIMEMultipart
- from email.mime.text import MIMEText
- from email.utils import COMMASPACE, formatdate
- from pprint import pprint
- ### VARIABLES BEGIN
- AUDIT_FILE = '/opt/zimbra/log/audit.log'
- CHAIN = 'AUDITWATCH'
- MAX_ATTEMPT = 20
- # define white list here
- WHITELIST = re.compile( '(%s)' % '|'.join( map(lambda x: x.replace('.', '\.'), [
- '192.100'
- ])))
- # list of blocked IP
- BLOCKED = set()
- # this var will hold parsing result
- FAILED_IP = {}
- ## SMTP
- REPORT_FROM = 'admin@mail.com'
- REPORT_TO = ['iomarmochtar@gmail.com']
- REPORT_SUBJECT = 'Authentication Attempt Report'
- REPORT_SMTP = '127.0.0.1'
- ### VARIABLES END
- def run_cmd(command):
- return subprocess.Popen(
- command, universal_newlines=True, shell=True,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
- # make sure chain are exists
- run_cmd("/sbin/iptables -nL {0} || /sbin/iptables -N {0}".format(CHAIN))
- # grab listed of blocked IP
- output, err = run_cmd("/sbin/iptables -nL {0}".format(CHAIN))
- for txt in output.splitlines():
- if txt.startswith('DROP'):
- brk = txt.split()
- if len(brk) != 5:
- continue
- BLOCKED.add(brk[3])
- continue
- oip_parser = re.compile(r'oip=(?P<oip>.*?);.*?for\s+\[(?P<username>.*?)\],\s+invalid password;$')
- # loop trough audit file to parse any failed authentication, capture OIP params
- for line in open(AUDIT_FILE, 'r').readlines():
- oip = oip_parser.search(line)
- if not oip:
- continue
- data = oip.groupdict()
- ip = data['oip']
- # if the IP was listed in existing block and ignore list then skip it
- if ip in BLOCKED or WHITELIST.search(ip):
- continue
- if not FAILED_IP.has_key(ip):
- FAILED_IP[ip] = {'users': set(), 'attempt': 0}
- FAILED_IP[ip]['users'].add(data['username'])
- FAILED_IP[ip]['attempt'] += 1
- # input IP in CHAIN if has reach maximum var
- is_report = False
- text = "<h3><b>List of auth attempt</b></h3> <br/><br/><br/>"
- f_cmd = "/sbin/iptables -I {0} -p tcp -s {1} -j DROP"
- for ip, data in FAILED_IP.items():
- if data['attempt'] <= MAX_ATTEMPT:
- continue
- is_report = True
- cmd = f_cmd.format(CHAIN, ip)
- print("IP {0} has reach max attempt {1} ({2})".format(ip, MAX_ATTEMPT, data['attempt']))
- run_cmd(cmd)
- text += "<b><u>{0} for {1} times</u></b> <br/><ul>".format(ip, data['attempt'])
- for user in data['users']:
- text += "<li>{0}</li>".format(user)
- text += "</ul>"
- if not is_report:
- sys.exit(0)
- print("sending report")
- msg = MIMEMultipart()
- msg['From'] = REPORT_FROM
- msg['To'] = COMMASPACE.join(REPORT_TO)
- msg['Date'] = formatdate(localtime=True)
- msg['Subject'] = REPORT_SUBJECT
- msg.attach(MIMEText(text, 'html'))
- msg.attach(MIMEText("Please see this email as HTML", 'plain'))
- smtp = smtplib.SMTP(REPORT_SMTP)
- smtp.sendmail(REPORT_FROM, REPORT_TO, msg.as_string())
- smtp.close()
Add Comment
Please, Sign In to add comment