Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #! /usr/bin/env python
- """
- LLM-masscan by country name
- [source](https://stackoverflow.com/questions/74290268/how-to-get-ip-addresses-from-ip2location-database/78756361#78756361)
- [source](https://github.com/SadParad1se/snek-sploit)
- [source](https://stackoverflow.com/questions/36249744/interactive-shell-using-docker-compose)
- [source](https://blog.king-sabri.net/programming/controlling-metasploit-remotely-using-rpc-api)
- """
- #@trace(logger)
- def convert_to_ipv4(ip_string:str)->str:
- ip_number = int(ip_string)
- binary_ip = format(ip_number, '032b') # Convert the IP number to binary and pad with zeroes
- segments = [binary_ip[i:i+8] for i in range(0, 32, 8)] # Split the binary number into segments of 8 digits
- decimal_segments = [int(segment, 2) for segment in segments] # Convert each segment back to decimal
- ipv4_address = '.'.join(map(str, decimal_segments)) # Join the decimal segments using a dot to form the IPv4 address
- #print(f'{ip_number} --> {ipv4_address}')
- return ipv4_address
- @trace(logger)
- def load_ipdb(csv: str = "vendor.D/IP2LOCATION-LITE-DB1.CSV") -> pd.DataFrame:
- data = pd.read_csv(csv, header=None, names=['from', 'to', 'cc', 'name']) # Load the CSV file into a DataFrame
- data['from'] = data['from'].apply(convert_to_ipv4) # Convert the 'from' and 'to' columns to standard IPv4 addresses
- data['to'] = data['to'].apply(convert_to_ipv4)
- return data
- #@trace(logger)
- def extract_range(index:int, row)->str:
- return '%s-%s' % (row['from'], row['to'],)
- #@trace(logger)
- def extract_ranges(data:pd.DataFrame, name:str)->str:
- #data.set_index(['name'])
- #matches = data.loc[name]
- matches = data.loc[data['name'] == name] # TODO lower() ?
- ranges = starmap(extract_range, matches.iterrows())
- return ','.join(ranges)
- if __name__ == '__main__':
- model :str = os.environ.get('MODEL', '<redacted>')
- embed_model :str = os.environ.get('EMBED_MODEL', '<redacted>')
- request_timeout:int = int(os.environ.get('TIMEOUT', (60 * 60 * 24))) # my systems are SLOW
- base_url :str = os.environ.get('HOST', 'http://<redacted>:11434')
- Settings.llm = Ollama(
- model =model,
- request_timeout =request_timeout,
- base_url =base_url,
- )
- Settings.embed_model = OllamaEmbedding(
- model_name =embed_model,
- base_url =base_url,
- ollama_additional_kwargs={"mirostat": 0},
- )
- data :pd.DataFrame = load_ipdb() #
- client :MetasploitClient = MetasploitClient(
- username='<redacted>', password='<redacted>',
- host="<redacted>", ssl=True, port=55553)
- #regex = re.compile('Smart Home Manager')
- #XMLReader = download_loader('XMLReader')
- #xml_reader = XMLReader('/tmp/import') # Initialize the reader
- try:
- console :ConsoleInfo = client.consoles.create()
- try:
- while True:
- name :str = input('country name: ')
- if (name.lower() in ['quit', 'exit', 'done',]):
- logger.info('exiting')
- break
- ranges :str = extract_ranges(data, name)
- logger.debug('ranges: %s', ranges)
- if (not ranges):
- logger.error('no ranges')
- continue
- with NamedTemporaryFile(mode='r+', dir='/tmp/import/', suffix='.xml') as fout:
- logger.debug('fout: %s', fout.name)
- with NamedTemporaryFile(mode='w') as fin:
- logger.debug('fin: %s', fin.name)
- logger.debug('writing ranges')
- fin.write(ranges)
- fin.seek(0)
- logger.debug('wrote ranges')
- cmd :List[str] = [
- 'sudo',
- 'masscan',
- '-iL', fin.name, # TODO testing
- #'-iL', 'vendor.D/host.lst', # TODO ^^^^^^^
- #'--output-format', 'xml',
- '-oX', fout.name,
- '-p', '80,443', # TODO parametrize
- '--banners', '--open-only',
- '--rate', '0.1',
- '--ping',
- #'--retries', '3',
- ]
- logger.debug('cmd: %s', cmd)
- run = subprocess.run(
- cmd, check=True, capture_output=True, text=True)
- out :str = run.stdout
- err :str = run.stderr
- logger.info('out: %s', out)
- logger.error('err: %s', err)
- fout.seek(0)
- outx :str = fout.read()
- logger.debug('out: %s', outx)
- fout.seek(0)
- cmdstr :str = str(f'db_import {fout.name}') # TODO escapism
- logger.debug('cmd: %s', cmdstr)
- out :str = console.execute(cmdstr) # send to msfrpcd
- logger.info('out: %s', out)
- #root = ET.fromstring(outx) # TODO the xml don't have banners ?
- ##fout.seek(0)
- ##xml_doc = ET.parse(fout.name)
- ##root = xml_doc.getroot()
- #elements = root.xpath('.//*')
- #for element in elements:
- # logger.debug('element: %s', element)
- ## TODO filter banners not matching regex
- ## TODO dump new xml to file
- #root = ET.ElementTree(elements)
- #xml_doc = ET.tostring(root, encoding='unicode')
- #fout.truncate(0)
- #fout.write(xml_doc)
- fout.seek(0)
- xml_reader = XMLReader() # Initialize the reader
- documents = xml_reader.load_data(file=fout.name) # Load and transform XML documents
- index = VectorStoreIndex.from_documents(documents, show_progress=True, use_async=True)
- engine = index.as_query_engine(
- #use_async=True,
- verbose=True)
- while True:
- query :str = input('query: ')
- if (query.lower() in ['quit', 'exit', 'done',]):
- logger.info('continuing')
- break
- out = engine.query(query)
- logger.info('out: %s', out)
- finally:
- console.destroy()
- finally:
- client.logout()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement