Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import hashlib
- from Crypto.Cipher import AES
- from Crypto.Util.Padding import pad, unpad
- from Crypto.Random import get_random_bytes
- from numba import cuda, uint32, uint64, uint8
- import numpy as np
- import argparse
- import os
- import time
- import multiprocessing
- # Parameters
- BLOCK_SIZE = 16 # AES block size
- # Encrypt the hidden link using AES CBC mode with a random IV
- def encrypt_link(link, key):
- """
- Encrypts the given link using AES encryption in CBC mode with a random IV.
- :param link: The link to encrypt.
- :param key: The AES encryption key.
- :return: The encrypted link (bytes), with the IV prepended.
- """
- iv = get_random_bytes(BLOCK_SIZE)
- cipher = AES.new(key, AES.MODE_CBC, iv)
- encrypted_link = iv + cipher.encrypt(pad(link.encode(), BLOCK_SIZE))
- return encrypted_link
- # Decrypt the hidden link
- def decrypt_link(encrypted_link, key):
- """
- Decrypts the given encrypted link using AES encryption in CBC mode.
- :param encrypted_link: The encrypted link (bytes), with the IV prepended.
- :param key: The AES decryption key.
- :return: The decrypted link as a string.
- """
- iv = encrypted_link[:BLOCK_SIZE]
- cipher = AES.new(key, AES.MODE_CBC, iv)
- decrypted_data = cipher.decrypt(encrypted_link[BLOCK_SIZE:])
- decrypted_link = unpad(decrypted_data, BLOCK_SIZE).decode()
- return decrypted_link
- # GPU kernel for hash computation using DJB2 hash function
- @cuda.jit
- def find_nonce_kernel(d_challenge_bytes, challenge_length, nonce_start, target, result):
- """
- CUDA kernel function to find a nonce that produces a DJB2 hash value below the target.
- :param d_challenge_bytes: Device array of challenge bytes.
- :param challenge_length: Length of the challenge bytes.
- :param nonce_start: Starting nonce value for this batch (uint64).
- :param target: Target hash value (difficulty).
- :param result: Device array to store the result [nonce, flag].
- """
- idx = cuda.grid(1)
- nonce = uint64(nonce_start) + uint64(idx)
- local_buffer = cuda.local.array(64, dtype=uint8) # Local array for data
- # Copy challenge_bytes into local_buffer
- for i in range(challenge_length):
- local_buffer[i] = d_challenge_bytes[i]
- # Append nonce as bytes to local_buffer
- nonce_start_pos = challenge_length
- for i in range(8): # 8 bytes for uint64 nonce
- shift_amount = uint64(8 * i)
- local_buffer[nonce_start_pos + i] = uint8((nonce >> shift_amount) & uint64(0xFF))
- total_length = nonce_start_pos + 8
- # Compute DJB2 hash
- hash_value = uint32(5381)
- for i in range(total_length):
- c = uint32(local_buffer[i])
- hash_value = uint32(hash_value * uint32(33) + c)
- # Check if hash matches difficulty
- if hash_value < target:
- # Atomically set the result if not already found
- old = cuda.atomic.cas(result, 1, 0, 1)
- if old == 0:
- result[0] = nonce # Store nonce
- # Solve the challenge using GPU
- def find_nonce_gpu(challenge, difficulty):
- """
- Solves the proof-of-work challenge using GPU acceleration.
- :param challenge: The challenge string (hexadecimal).
- :param difficulty: The difficulty level (number of bits).
- :return: The nonce value that solves the challenge.
- """
- try:
- # Check for maximum difficulty
- if difficulty >= 32:
- print("Error: Maximum difficulty level for this implementation is 31.")
- return None
- challenge_bytes = np.frombuffer(bytes.fromhex(challenge), dtype=np.uint8).copy()
- challenge_length = challenge_bytes.size
- result = np.zeros(2, dtype=np.uint64) # result[0] = nonce, result[1] = flag
- # Transfer data to device once
- d_challenge_bytes = cuda.to_device(challenge_bytes)
- d_result = cuda.to_device(result)
- threads_per_block = 256
- blocks_per_grid = 1024
- batch_size = np.uint64(threads_per_block * blocks_per_grid)
- nonce_start = np.uint64(0)
- max_hash_value = 0xFFFFFFFF # Use standard Python integer
- target = max_hash_value >> difficulty
- # Convert target to uint32 for CUDA kernel
- target_uint32 = np.uint32(target)
- # Check if target is zero
- if target == 0:
- print("Error: Target hash value is zero. Cannot proceed with difficulty level >= 32.")
- return None
- # Calculate the expected number of hashes needed
- probability = target / float(max_hash_value + 1)
- expected_hashes = 1 / probability
- # Perform a benchmark to estimate hash rate
- print("Performing benchmark to estimate hash rate...")
- benchmark_batches = 10 # Number of batches to process during the benchmark
- start_time = time.time()
- for _ in range(benchmark_batches):
- find_nonce_kernel[blocks_per_grid, threads_per_block](
- d_challenge_bytes, challenge_length, nonce_start, target_uint32, d_result
- )
- cuda.synchronize() # Ensure kernel execution is complete
- nonce_start += batch_size
- end_time = time.time()
- elapsed_time = end_time - start_time
- # Calculate hash rate
- total_hashes = batch_size * benchmark_batches
- hash_rate = total_hashes / elapsed_time # Hashes per second
- # Estimate total expected time
- estimated_total_time = expected_hashes / hash_rate
- print(f"Estimated hash rate: {hash_rate:.2f} hashes/second")
- print(f"Expected number of hashes to find solution: {expected_hashes:.0f}")
- print(f"Estimated time to complete: {estimated_total_time:.2f} seconds ({estimated_total_time/60:.2f} minutes)")
- # Reset nonce_start and result for the actual computation
- nonce_start = np.uint64(0)
- result[1] = 0 # Reset the flag
- d_result.copy_to_device(result)
- print("Starting GPU processing...")
- start_time = time.time()
- # Variables for controlling print intervals
- print_interval = 5 # Print progress every 5 seconds
- last_print_time = start_time
- range_start_nonce = nonce_start # Keep track of the starting nonce for the current range
- total_hashes_processed = np.uint64(0) # To track total hashes processed
- try:
- while True:
- # Launch GPU kernel for the current batch
- find_nonce_kernel[blocks_per_grid, threads_per_block](
- d_challenge_bytes, challenge_length, nonce_start, target_uint32, d_result
- )
- cuda.synchronize() # Ensure kernel execution is complete
- # Copy results back to the host
- result = d_result.copy_to_host()
- # Update total hashes processed
- total_hashes_processed += batch_size
- # Check if solution found
- if result[1] == 1: # Solution found
- elapsed_time = time.time() - start_time
- print(f"\nSolution found at nonce {result[0]} (Elapsed time: {elapsed_time:.2f}s)")
- return int(result[0])
- nonce_start += batch_size # Increment nonce_start for the next batch
- # Only print the progress message if a certain amount of time has passed
- current_time = time.time()
- if current_time - last_print_time >= print_interval:
- elapsed_time = current_time - start_time
- # Update estimated time remaining
- hashes_performed = total_hashes_processed
- hashes_remaining = expected_hashes - hashes_performed
- est_time_remaining = hashes_remaining / hash_rate
- est_total_time = elapsed_time + est_time_remaining
- # Progress percentage
- progress = min((hashes_performed / expected_hashes) * 100, 100)
- print(f"Processed nonce range {range_start_nonce} - {nonce_start} "
- f"(Elapsed: {elapsed_time:.2f}s, Remaining: {est_time_remaining:.2f}s, "
- f"Progress: {progress:.2f}%)")
- last_print_time = current_time
- range_start_nonce = nonce_start # Update the starting nonce for the next range
- except KeyboardInterrupt:
- print("\nProcess interrupted by user.")
- return None
- except Exception as e:
- print(f"An error occurred during GPU computation: {e}")
- print("Falling back to CPU solver...")
- return find_nonce_cpu(challenge, difficulty)
- # Worker function for multiprocessing CPU solver
- def cpu_worker(args):
- challenge_bytes, nonce_start, nonce_end, target, return_dict, worker_id = args
- for nonce in range(nonce_start, nonce_end):
- data = challenge_bytes + nonce.to_bytes(8, byteorder='little')
- # Compute DJB2 hash
- hash_value = 5381
- for c in data:
- hash_value = ((hash_value * 33) + c) & 0xFFFFFFFF # Ensure 32-bit unsigned integer
- if hash_value < target:
- return_dict['nonce'] = nonce
- return True # Found solution
- return False # Did not find solution
- # Solve the challenge using CPU
- def find_nonce_cpu(challenge, difficulty):
- """
- Solves the proof-of-work challenge using CPU.
- :param challenge: The challenge string (hexadecimal).
- :param difficulty: The difficulty level (number of bits).
- :return: The nonce value that solves the challenge.
- """
- # Check for maximum difficulty
- if difficulty >= 32:
- print("Error: Maximum difficulty level for this implementation is 31.")
- return None
- max_hash_value = 0xFFFFFFFF # Standard Python integer
- target = max_hash_value >> difficulty
- # Check if target is zero
- if target == 0:
- print("Error: Target hash value is zero. Cannot proceed with difficulty level >= 32.")
- return None
- challenge_bytes = bytes.fromhex(challenge)
- # Calculate the expected number of hashes needed
- probability = target / float(max_hash_value + 1)
- expected_hashes = 1 / probability
- # Perform a benchmark to estimate hash rate
- print("Performing benchmark to estimate hash rate...")
- benchmark_nonces = 100000 # Number of nonces to process during the benchmark
- start_time = time.time()
- nonce = 0
- for _ in range(benchmark_nonces):
- data = challenge_bytes + nonce.to_bytes(8, byteorder='little')
- # Compute DJB2 hash
- hash_value = 5381
- for c in data:
- hash_value = ((hash_value * 33) + c) & 0xFFFFFFFF
- nonce += 1
- end_time = time.time()
- elapsed_time = end_time - start_time
- # Calculate hash rate
- hash_rate_per_core = benchmark_nonces / elapsed_time # Hashes per second
- # Estimate total expected time
- num_cores = multiprocessing.cpu_count()
- hash_rate = hash_rate_per_core * num_cores
- estimated_total_time = expected_hashes / hash_rate
- print(f"Estimated hash rate per core: {hash_rate_per_core:.2f} hashes/second")
- print(f"Total estimated hash rate: {hash_rate:.2f} hashes/second")
- print(f"Expected number of hashes to find solution: {expected_hashes:.0f}")
- print(f"Estimated time to complete: {estimated_total_time:.2f} seconds ({estimated_total_time/60:.2f} minutes)")
- # Start multiprocessing pool
- print(f"Starting CPU processing on {num_cores} cores...")
- manager = multiprocessing.Manager()
- return_dict = manager.dict()
- pool = multiprocessing.Pool(processes=num_cores)
- nonce_start = 0
- chunk_size = 1000000 # Adjust as needed
- total_hashes_processed = 0
- start_time = time.time()
- try:
- while True:
- jobs = []
- for i in range(num_cores):
- nonce_range_start = nonce_start + i * chunk_size
- nonce_range_end = nonce_range_start + chunk_size
- args = (challenge_bytes, nonce_range_start, nonce_range_end, target, return_dict, i)
- job = pool.apply_async(cpu_worker, args=(args,))
- jobs.append(job)
- # Wait for jobs to finish or for a solution to be found
- for job in jobs:
- result = job.get()
- if result:
- pool.terminate()
- elapsed_time = time.time() - start_time
- nonce_found = return_dict['nonce']
- print(f"\nSolution found at nonce {nonce_found} (Elapsed time: {elapsed_time:.2f}s)")
- return nonce_found
- nonce_start += num_cores * chunk_size
- total_hashes_processed += num_cores * chunk_size
- # Progress update
- elapsed_time = time.time() - start_time
- hashes_remaining = expected_hashes - total_hashes_processed
- est_time_remaining = hashes_remaining / hash_rate
- progress = min((total_hashes_processed / expected_hashes) * 100, 100)
- print(f"Processed up to nonce {nonce_start} "
- f"(Elapsed: {elapsed_time:.2f}s, Remaining: {est_time_remaining:.2f}s, "
- f"Progress: {progress:.2f}%)")
- except KeyboardInterrupt:
- print("\nProcess interrupted by user.")
- pool.terminate()
- return None
- except Exception as e:
- print(f"An error occurred during CPU computation: {e}")
- pool.terminate()
- return None
- finally:
- pool.close()
- pool.join()
- print("Failed to find a valid nonce.")
- return None
- # Generate a challenge
- def generate_challenge(link, difficulty):
- """
- Generates a new challenge string and encrypts the link.
- :param link: The hidden link to encrypt.
- :param difficulty: The difficulty level for the challenge.
- :return: The combined string containing the challenge, difficulty, and encrypted link.
- """
- # Check for maximum difficulty
- if difficulty >= 32:
- print("Error: Maximum difficulty level for this implementation is 31.")
- return None
- challenge = os.urandom(16).hex() # Random 16-byte challenge
- challenge_with_difficulty = f"{challenge}:{difficulty}" # Embed difficulty
- key = hashlib.sha256(challenge_with_difficulty.encode()).digest()[:BLOCK_SIZE] # Derive key
- encrypted_link = encrypt_link(link, key)
- combined_string = f"{challenge}:{difficulty}:{encrypted_link.hex()}"
- return combined_string
- # Parse combined input string
- def parse_combined_string(combined_string):
- """
- Parses the combined string into challenge, difficulty, and encrypted link.
- :param combined_string: The combined string from challenge generation.
- :return: Tuple of (challenge, difficulty, encrypted_link).
- """
- try:
- # Limit the number of splits to 2 to handle colons in the encrypted link
- parts = combined_string.split(":", 2)
- if len(parts) != 3:
- raise ValueError
- challenge, difficulty_str, encrypted_link_hex = parts
- if not all(c in '0123456789abcdefABCDEF' for c in challenge):
- raise ValueError
- difficulty = int(difficulty_str)
- if not isinstance(difficulty, int) or difficulty < 0:
- raise ValueError
- encrypted_link = bytes.fromhex(encrypted_link_hex)
- return challenge, difficulty, encrypted_link
- except ValueError:
- raise ValueError("Invalid input format. Expected format: <challenge>:<difficulty>:<encrypted_link>")
- # Main program
- def main():
- usage_text = """
- Examples:
- Generate a new challenge with a hidden link:
- python gpu_hard_challenge_ex.py --generate --link "https://example.com/hidden" --difficulty 25
- Solve a given challenge and reveal the hidden link:
- python gpu_hard_challenge_ex.py --solve --input "<challenge>:<difficulty>:<encrypted_link>"
- View this help message:
- python gpu_hard_challenge_ex.py --help
- """
- parser = argparse.ArgumentParser(
- description="GPU Hard Challenge: A Proof-of-Work System Using GPU Acceleration.",
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog=usage_text
- )
- group = parser.add_mutually_exclusive_group(required=True)
- group.add_argument("--generate", action="store_true", help="Generate a new challenge string with an encrypted link.")
- group.add_argument("--solve", action="store_true", help="Solve a given challenge to reveal the hidden link.")
- parser.add_argument("--link", type=str, help="The hidden link to encrypt (required when using --generate).")
- parser.add_argument("--difficulty", type=int, default=5, help="Difficulty level for the proof-of-work (default: 5). Acceptable range: 0-31.")
- parser.add_argument("--input", type=str, help="The combined input string to solve, in the format <challenge>:<difficulty>:<encrypted_link> (required when using --solve).")
- args = parser.parse_args()
- if args.generate:
- if not args.link:
- print("Error: Please provide a link using --link when generating a challenge.")
- parser.print_help()
- return
- try:
- combined_string = generate_challenge(args.link, args.difficulty)
- if combined_string:
- print("\nGenerated Combined String:\n")
- print(combined_string)
- print("\nShare this combined string with the solver.\n")
- except Exception as e:
- print(f"An error occurred during challenge generation: {e}")
- return
- elif args.solve:
- if not args.input:
- print("Error: Please provide the combined input string using --input when solving a challenge.")
- parser.print_help()
- return
- # Parse the input
- try:
- challenge, difficulty, encrypted_link = parse_combined_string(args.input)
- except ValueError as e:
- print(e)
- return
- # Check for maximum difficulty
- if difficulty >= 32:
- print("Error: Maximum difficulty level for this implementation is 31.")
- return
- print("Solving challenge...")
- if cuda.is_available():
- print("CUDA detected, using GPU...")
- nonce = find_nonce_gpu(challenge, difficulty)
- else:
- print("CUDA not detected, falling back to CPU...")
- nonce = find_nonce_cpu(challenge, difficulty)
- if nonce is not None:
- print(f"Found Nonce: {nonce}")
- key = hashlib.sha256(f"{challenge}:{difficulty}".encode()).digest()[:BLOCK_SIZE]
- try:
- hidden_link = decrypt_link(encrypted_link, key)
- print(f"Hidden Link: {hidden_link}")
- except Exception as e:
- print(f"An error occurred during decryption: {e}")
- else:
- print("Failed to find a valid nonce.")
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment