Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import requests
- import logging
- import sys
- import hashlib
- import os
- import ntpath
- import json
- import time
- TOKEN = '3368d1e65134733d3a8bd70ef54aeaa1c49a0f14'
- BUFFER_SIZE = 65538
- A1000_HOST = "https://a1000.reversinglabs.com"
- TI_CLOUD_EMEA_APAC_HOST = "ticloud01.reversinglabs.com"
- TI_CLOUD_US_HOST = "ticloud-cdn-api.reversinglabs.com"
- ####################################################################
- # UTILS #
- ####################################################################
- def path_leaf(path):
- head, tail = ntpath.split(path)
- return tail or ntpath.basename(head)
- ####################################################################
- # A1000 services #
- ####################################################################
- def rl_a1000_upload(a1000_host, file, token):
- file_name = path_leaf(file)
- data = {"analysis": "cloud", "filename": file_name}
- f = {"file": open(file, 'rb')}
- headers = {"Authorization" : "Token %s" % token}
- r = requests.post(
- "%s%s" % (a1000_host, "/api/uploads/"),
- files=f,
- headers=headers,
- data=data,
- )
- if r.status_code < 200 or r.status_code >= 300:
- logging.warning(
- "Request did not succeeded and returned code %s" % r.status_code
- )
- return r.text
- def rl_a1000_processing_status_sample(a1000_host, hash, token):
- headers = {'Authorization' : 'Token %s' % token}
- data = {'hash_values' : [hash]}
- params = {'status' : 'processed'}
- url = "%s/api/samples/status/" % (a1000_host)
- print(url)
- r = requests.post(url, data=data, params=params, headers=headers)
- if r.status_code < 200 or r.status_code >= 300:
- logging.warning("Request did not succeeded and returned code %s" % r.status_code)
- return r.text
- def rl_a1000_reanalyze(a1000_host, hash, token):
- data = {"analysis": "cloud"}
- headers = {"Authorization" : "Token %s" % token}
- url = "%s/api/samples/%s/analyze/" % (a1000_host, hash)
- r = requests.post(url, headers=headers, data=data)
- if r.status_code < 200 or r.status_code >= 300:
- logging.warning(
- "Request did not succeeded and returned code %s" % r.status_code
- )
- return r.text
- def rl_a1000_list(a1000_host, hashes, token, verify_cert=True):
- data = {"hash_values": [hashes], "fields": FIELDS}
- headers = {"Authorization" : "Token %s" % token}
- r = requests.post(
- "%s%s" % (a1000_host, "/api/samples/list/"),
- headers=headers,
- data=data,
- verify=verify,
- )
- if r.status_code < 200 or r.status_code >= 300:
- logging.warning(
- "Request did not succeeded and returned code %s" % r.status_code
- )
- return r
- return r.text
- def rl_a1000_list_of_extracted_files_from_sample(a1000_host, hash, token):
- headers = {"Authorization" : "Token %s" % token}
- r = requests.get("%s%s%s%s" % (a1000_host, "/api/samples/", hash, "/extracted-files/"), headers=headers)
- if r.status_code < 200 or r.status_code >= 300:
- logging.warning("Request did not succeeded and returned code %s" % r.status_code)
- return r.text
- def rl_a1000_is_available(a1000_host, token, hash):
- headers = {'Authorization' : 'Token %s' % token}
- r = requests.get("%s%s%s/" % (a1000_host, "/api/samples/", hash), headers=headers)
- if r.status_code < 200 or r.status_code >= 300:
- return False
- return True
- def rl_titanium_cloud_alert_api(tc_host, hashes, subscribe=True):
- if subscribe:
- url = "https://%s/api/subscription/data_change/v1/bulk_query/subscribe/json".format(tc_host)
- else:
- url = "https://%s/api/subscription/data_change/v1/bulk_query/unsubscribe/json".format(tc_host)
- json_payload = {"rl": {"query":{"hash_type":"sha1", "hashes" : hashes} }}
- r = requests.post(url, json=json_payload)
- ############################################################################
- # HASH CALCULATOR #
- ############################################################################
- def calculate_sha1(file):
- sha1 = hashlib.sha1()
- try:
- f = open(file, 'rb')
- except IOError:
- print 'Could not read the file: %s' % file
- exit(1)
- with f:
- while True:
- data = f.read(BUFFER_SIZE)
- if not data:
- break
- sha1.update(data)
- return sha1.hexdigest()
- if __name__ == '__main__':
- if len(sys.argv) != 2:
- print 'You must submit the path to the binary that will be examined!'
- exit(1)
- file_path = sys.argv[1]
- if not os.path.exists(file_path):
- print 'The submited path does not exist. Please check the path and try again.'
- exit(1)
- logging.basicConfig(level=logging.INFO)
- hash = calculate_sha1(file_path)
- logging.info("Checking availability of sample...")
- available_on_a1000 = rl_a1000_is_available(A1000_HOST, TOKEN, hash)
- if not available_on_a1000:
- logging.info('Sending the sample on A1000...')
- response = rl_a1000_upload(A1000_HOST, file_path, TOKEN)
- print(response)
- else:
- logging.info('Sending on reanalyze....')
- response = rl_a1000_reanalyze(A1000_HOST, hash, TOKEN)
- print(response)
- response = rl_a1000_processing_status_sample(A1000_HOST, hash, TOKEN)
- json_response = json.loads(response)
- count = len(json_response['results'])
- while count != 1:
- response = rl_a1000_processing_status_sample(A1000_HOST, hash, TOKEN)
- json_response = json.loads(response)
- count = len(json_response['results'])
- time.sleep(5)
- logging.info('Sample is analyzed...')
- logging.info('Retreiving extracted files from the sample...')
- response = rl_a1000_list_of_extracted_files_from_sample(A1000_HOST, hash, TOKEN)
- print(response)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement