Advertisement
N3ll4

@CipherSerpeant

Nov 26th, 2023 (edited)
1,756
0
Never
1
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 2.54 KB | Source Code | 0 0
  1. import argparse
  2. import threading
  3. import requests
  4.  
  5. class LoadTester:
  6.     def init(self, url, payload):
  7.         self.url = url
  8.         self.payload = payload
  9.         self.request_counter = 0
  10.         self.printed_msgs = []
  11.         self.lock = threading.Lock()
  12.         self.threads = 500  # Default number of threads
  13.  
  14.     def print_msg(self, msg):
  15.         with self.lock:
  16.             if msg not in self.printed_msgs:
  17.                 print(f"\n{msg} after {self.request_counter} requests")
  18.                 self.printed_msgs.append(msg)
  19.  
  20.     def handle_status_codes(self, status_code):
  21.         with self.lock:
  22.             self.request_counter += 1
  23.         print(f"\r{self.request_counter} requests have been sent", end="")
  24.  
  25.         if status_code == 429:
  26.             self.print_msg("You have been throttled")
  27.         elif status_code == 500:
  28.             self.print_msg("Status code 500 received")
  29.  
  30.     def send_get(self):
  31.         while True:
  32.             try:
  33.                 response = requests.get(self.url)
  34.                 self.handle_status_codes(response.status_code)
  35.             except requests.RequestException:
  36.                 pass
  37.  
  38.     def send_post(self):
  39.         while True:
  40.             try:
  41.                 response = requests.post(self.url, data=self.payload)
  42.                 self.handle_status_codes(response.status_code)
  43.             except requests.RequestException:
  44.                 pass
  45.  
  46.     def start(self):
  47.         threads = []
  48.         for _ in range(self.threads):
  49.             if self.url:
  50.                 if self.payload:
  51.                     thread = threading.Thread(target=self.send_post)
  52.                 else:
  53.                     thread = threading.Thread(target=self.send_get)
  54.                 threads.append(thread)
  55.                 thread.start()
  56.  
  57.         for thread in threads:
  58.             thread.join()  # Note: This will make the main thread wait indefinitely
  59.  
  60. if name == "main":
  61.     parser = argparse.ArgumentParser()
  62.     parser.add_argument("-g", "--get", help="Specify GET request. Usage: -g '<url>'")
  63.     parser.add_argument("-p", "--post", help="Specify POST request. Usage: -p '<url>'")
  64.     parser.add_argument("-d", "--data", help="Specify data payload for POST request")
  65.     args = parser.parse_args()
  66.  
  67.     if not args.get and not args.post:
  68.         parser.print_usage()
  69.     elif args.get and args.post:
  70.         print("You must specify either a GET (-g) or POST (-p) request.")
  71.     else:
  72.         url = args.get or args.post
  73.         load_tester = LoadTester(url, args.data)
  74.         load_tester.start()
Advertisement
Comments
  • c4pncrunch
    189 days
    # text 2.55 KB | 0 0
    1. import argparse
    2. import threading
    3. import requests
    4.  
    5. class LoadTester:
    6. def __init__(self, url, payload):
    7. self.url = url
    8. self.payload = payload
    9. self.request_counter = 0
    10. self.printed_msgs = []
    11. self.lock = threading.Lock()
    12. self.threads = 500 # Default number of threads
    13.  
    14. def print_msg(self, msg):
    15. with self.lock:
    16. if msg not in self.printed_msgs:
    17. print(f"\n{msg} after {self.request_counter} requests")
    18. self.printed_msgs.append(msg)
    19.  
    20. def handle_status_codes(self, status_code):
    21. with self.lock:
    22. self.request_counter += 1
    23. print(f"\r{self.request_counter} requests have been sent", end="")
    24.  
    25. if status_code == 429:
    26. self.print_msg("You have been throttled")
    27. elif status_code == 500:
    28. self.print_msg("Status code 500 received")
    29.  
    30. def send_get(self):
    31. while True:
    32. try:
    33. response = requests.get(self.url)
    34. self.handle_status_codes(response.status_code)
    35. except requests.RequestException:
    36. pass
    37.  
    38. def send_post(self):
    39. while True:
    40. try:
    41. response = requests.post(self.url, data=self.payload)
    42. self.handle_status_codes(response.status_code)
    43. except requests.RequestException:
    44. pass
    45.  
    46. def start(self):
    47. threads = []
    48. for _ in range(self.threads):
    49. if self.url:
    50. if self.payload:
    51. thread = threading.Thread(target=self.send_post)
    52. else:
    53. thread = threading.Thread(target=self.send_get)
    54. threads.append(thread)
    55. thread.start()
    56.  
    57. for thread in threads:
    58. thread.join() # Note: This will make the main thread wait indefinitely
    59.  
    60. if __name__ == "__main__":
    61. parser = argparse.ArgumentParser()
    62. parser.add_argument("-g", "--get", help="Specify GET request. Usage: -g '<url>'")
    63. parser.add_argument("-p", "--post", help="Specify POST request. Usage: -p '<url>'")
    64. parser.add_argument("-d", "--data", help="Specify data payload for POST request")
    65. args = parser.parse_args()
    66.  
    67. if not args.get and not args.post:
    68. parser.print_usage()
    69. elif args.get and args.post:
    70. print("You must specify either a GET (-g) or POST (-p) request.")
    71. else:
    72. url = args.get or args.post
    73. load_tester = LoadTester(url, args.data)
    74. load_tester.start()
    75.  
Add Comment
Please, Sign In to add comment
Advertisement