Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """A job to send a HTTP (GET or DELETE) periodically."""
- import logging
- import requests
- import os
- import json
- import urllib3
- from ndscheduler import job
- logger = logging.getLogger(__name__)
- logger = logging.getLogger(__name__)
- class SSLException(Exception):
- pass
- class Scanner(object):
- '''
- Scanner object
- '''
- #fields
- url = ""
- api_keys = ""
- def __init__(self, url, api_keys='',
- insecure = True, auto_logout = True, debug = True):
- self.debug = debug
- self.url = url
- self.api_keys = api_keys
- if debug:
- logger.setLevel(logging.INFO)
- if insecure:
- urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
- logger.info("Patrowl wrapper initiated")
- ################################################################################
- def action(self, action, method, extra={}, files={}, json_req=True, download=False, private=False, retry=True):
- payload = {}
- payload.update(extra)
- headers = {
- # will alwaus use API
- "Cookie": self.api_keys,
- #token - no details TODO@ verify if it's refreshed
- # "X-API-Token" : "740A19ED-68A7-418A-9D74-D4C2F7AA9D00",
- # "Content-type": "application/json",
- # "Accept": "text/plain"
- }
- payload = json.dumps(payload)
- url = "{}/{}".format(self.url, action)
- verify = False
- try:
- req = requests.request(method, url, data=payload, files=files,
- verify=verify, headers=headers)
- logger.info("Patrowl wrapper call action({}/{}) returned with status {}".format(method, url, str(req.status_code)))
- return {
- "status": str(req.status_code),
- "body" : req.json() if req.text else {}
- }
- except requests.exceptions.SSLError as ssl_error:
- raise SSLException('%s for %s.' % (ssl_error, url))
- except requests.exceptions.ConnectionError:
- raise Exception("Could not connect to %s.\nExiting!\n" % url)
- ################################################################################
- def start_scan(self, scan_id=0):
- action_url = "/scans/api/v1/defs/run/{}".format(scan_id)
- try:
- create_scan_result = self.action(action=action_url,
- method="GET",
- private=True,
- retry=False)
- if create_scan_result["status"] == "200":
- logger.info("Patrowl scan(id={}) successfully started".format(scan_id))
- return {"data" : create_scan_result["body"]}
- else:
- logger.error("Patrowl scan(id={}) failed to start: action returned non-200 status response".format(scan_id))
- return {"error": "Error on start_scan",
- "description": "Nessus request returned a non-200 status response",
- "data" : create_scan_result["body"]
- }
- except Exception as ex:
- logger.error("Error on start_scan")
- logger.exception(ex)
- return {"error": "Error on start_scan",
- "description": "Exception",
- "data" : str(ex)}
- ################################################################################
- def objdump(self):
- '''
- debugging function to dump all of the set values
- '''
- for attr in dir(self):
- print("obj.%s = %s" % (attr, getattr(self, attr)))
- class CreateScanSchedulerJob(job.JobBase):
- TIMEOUT = 10
- @classmethod
- def meta_info(cls):
- return {
- 'job_class_string': '%s.%s' % (cls.__module__, cls.__name__),
- 'notes': 'Start\'s a Patrowl preconfigured scan',
- 'arguments': [
- # Scan Id
- {'type': 'int', 'description': 'Patrowl Scan ID'},
- # API key
- {'type': 'string', 'description': 'Patrowl API keys'},
- # Server url
- {'type': 'string', 'description': 'Patrowl server url'}
- ],
- 'example_arguments': ('["10", "http://patrowl.manager.local", "sessionid=123456789012345678901234567890AB"]')
- }
- def run(self, scan_id, key, url, *args, **kwargs):
- wrapper = Scanner(url = url, api_keys = key)
- return wrapper.start_scan(scan_id)
- if __name__ == "__main__":
- job = CreateScanSchedulerJob.create_test_instance()
- job.run(0, "", "0.0.0.0:1111")
Advertisement
Add Comment
Please, Sign In to add comment