Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- DERIVATION_PATHS = [
- "m/44'/0'/0'/0/0", # BIP44 legacy
- "m/84'/0'/0'/0/0", # BIP84 native segwit
- "m/49'/0'/0'/0/0", # BIP49 segwit
- "m/86'/0'/0'/0/0" # BIP86 taproot
- ]
- import itertools
- import hashlib
- import binascii
- import re
- import time
- import sys
- import os
- import json
- import pickle
- from mnemonic import Mnemonic
- from loguru import logger
- import concurrent.futures
- import argparse
- from tqdm import tqdm
- import asyncio
- import aiohttp
- from bitcoin import *
- from bip32utils import BIP32Key
- from multiprocessing import cpu_count, Manager, Pool, shared_memory
- import numpy as np
- class StateManager:
- """Manages the state persistence of the application"""
- def __init__(self, state_file="mnemonic_scanner_state.pkl"):
- self.state_file = state_file
- self.state = {
- "processed_permutations": set(),
- "valid_mnemonics": set(),
- "active_wallets": set(),
- "current_paragraph_index": 0,
- "current_batch_index": 0,
- "total_processed": 0
- }
- def save_state(self):
- """Save current state to a file"""
- try:
- with open(self.state_file, 'wb') as f:
- pickle.dump(self.state, f)
- logger.info(f"State saved to {self.state_file}")
- return True
- except Exception as e:
- logger.error(f"Error saving state: {e}")
- return False
- def load_state(self):
- """Load state from a file if it exists"""
- if os.path.exists(self.state_file):
- try:
- with open(self.state_file, 'rb') as f:
- self.state = pickle.load(f)
- logger.info(f"State loaded from {self.state_file}")
- logger.info(f"Resuming from paragraph {self.state['current_paragraph_index']}, "
- f"batch {self.state['current_batch_index']}")
- logger.info(f"Previously found {len(self.state['valid_mnemonics'])} valid mnemonics "
- f"and {len(self.state['active_wallets'])} active wallets")
- return True
- except Exception as e:
- logger.error(f"Error loading state: {e}")
- return False
- return False
- def update_state(self, key, value):
- """Update a specific state key"""
- if key in self.state:
- self.state[key] = value
- def add_processed_permutation(self, perm_hash):
- """Add a processed permutation hash to the state"""
- self.state["processed_permutations"].add(perm_hash)
- def add_valid_mnemonic(self, mnemonic):
- """Add a valid mnemonic to the state"""
- self.state["valid_mnemonics"].add(mnemonic)
- def add_active_wallet(self, mnemonic, address, path):
- """Add an active wallet to the state"""
- self.state["active_wallets"].add((mnemonic, address, path))
- def get_state(self):
- """Get the current state"""
- return self.state
- def is_permutation_processed(self, perm_hash):
- """Check if a permutation hash has been processed"""
- return perm_hash in self.state["processed_permutations"]
- def increment_processed(self):
- """Increment the total processed counter"""
- self.state["total_processed"] += 1
- def hash_permutation(perm):
- """Generate a hash for a permutation to track processed items"""
- perm_str = ' '.join(perm)
- return hashlib.md5(perm_str.encode()).hexdigest()
- def is_valid_mnemonic(words, lang='english'):
- """Check if a mnemonic phrase is valid per BIP39 specs"""
- try:
- mnemo = Mnemonic(lang)
- return mnemo.check(' '.join(words))
- except Exception as e:
- logger.error(f"Error checking mnemonic: {e}")
- return False
- def get_address_from_mnemonic(mnemonic, path="m/44'/0'/0'/0/0"):
- """Generate Bitcoin address from mnemonic"""
- try:
- seed = Mnemonic.to_seed(mnemonic)
- root_key = BIP32Key.fromEntropy(seed)
- # Parse derivation path
- parts = path.split('/')
- child_key = root_key
- for part in parts[1:]:
- if part.endswith("'"):
- # Hardened derivation
- index = int(part[:-1]) + 0x80000000
- else:
- # Normal derivation
- index = int(part)
- child_key = child_key.ChildKey(index)
- # Get address
- address = child_key.Address()
- return address
- except Exception as e:
- logger.error(f"Error generating address: {e}")
- return None
- def extract_bip39_words(text):
- """Extract words that are in the BIP39 wordlist from text"""
- mnemo = Mnemonic('english')
- words = re.findall(r'\b[a-zA-Z]+\b', text.lower())
- return [word for word in words if word in mnemo.wordlist]
- async def check_address_balance_async(session, address):
- """Check the balance of a Bitcoin address using various APIs with fallback"""
- apis = [
- f"https://mempool.space/api/address/{address}",
- ]
- for api_url in apis:
- try:
- async with session.get(api_url, timeout=5) as response:
- if response.status == 200:
- data = await response.json()
- # Different APIs might have different response formats
- if "chain_stats" in data:
- # mempool.space format
- chain_stats = data.get('chain_stats', {})
- mempool_stats = data.get('mempool_stats', {})
- tx_count = chain_stats.get('tx_count', 0) + mempool_stats.get('tx_count', 0)
- funded_sum = chain_stats.get('funded_txo_sum', 0) + mempool_stats.get('funded_txo_sum', 0)
- spent_sum = chain_stats.get('spent_txo_sum', 0) + mempool_stats.get('spent_txo_sum', 0)
- balance = funded_sum - spent_sum
- return True, {
- "balance": balance,
- "tx_count": tx_count,
- "data": data
- }
- elif "funded_txo_sum" in data:
- # blockstream.info format
- balance = data.get('chain_stats', {}).get('funded_txo_sum', 0) - data.get('chain_stats', {}).get('spent_txo_sum', 0)
- tx_count = data.get('chain_stats', {}).get('tx_count', 0)
- return True, {
- "balance": balance,
- "tx_count": tx_count,
- "data": data
- }
- elif "balance" in data:
- # Generic format
- return True, {
- "balance": data.get('balance', 0),
- "tx_count": data.get('tx_count', 0),
- "data": data
- }
- return True, {"data": data}
- except asyncio.TimeoutError:
- logger.debug(f"Timeout with API {api_url}")
- continue
- except Exception as e:
- logger.debug(f"Error with API {api_url}: {e}")
- continue
- return False, None
- def factorial(n):
- """Calculate factorial of n"""
- if n <= 1:
- return 1
- result = 1
- for i in range(2, n + 1):
- result *= i
- return result
- # Function to process a batch of permutations in parallel
- def process_batch_permutations(batch, state_manager=None):
- """Process a batch of permutations in parallel, returning valid mnemonics"""
- valid_mnemonics = []
- processed_hashes = []
- # Local mnemonic validator for efficiency
- mnemo = Mnemonic('english')
- for perm in batch:
- perm_str = ' '.join(perm)
- perm_hash = hashlib.md5(perm_str.encode()).hexdigest()
- processed_hashes.append(perm_hash)
- # Check if valid mnemonic
- if mnemo.check(perm_str):
- valid_mnemonics.append(perm_str)
- return valid_mnemonics, processed_hashes
- def process_permutation(perm, state_manager=None):
- """Process a single permutation of words, returning True if valid"""
- perm_hash = hash_permutation(perm)
- # Check if already processed using state manager
- if state_manager and state_manager.is_permutation_processed(perm_hash):
- return False, None, perm_hash
- if is_valid_mnemonic(perm):
- mnemonic = ' '.join(perm)
- return True, mnemonic, perm_hash
- return False, None, perm_hash
- class PermutationGenerator:
- """Memory-efficient generator of permutations with state tracking"""
- def __init__(self, elements, batch_size=10000, start_batch=0, state_manager=None):
- self.elements = elements
- self.batch_size = batch_size
- self.state_manager = state_manager
- self.current_batch = start_batch
- self.total_perms = factorial(len(elements))
- def __iter__(self):
- indices = list(range(len(self.elements)))
- cycles = list(range(len(self.elements), 0, -1))
- # Skip to the current batch if needed
- if self.current_batch > 0:
- skip_count = self.current_batch * self.batch_size
- logger.info(f"Skipping {skip_count:,} permutations to resume from batch {self.current_batch}")
- # Efficiently skip permutations
- count = 0
- while count < skip_count and count < self.total_perms:
- # Find the largest mobile element
- for i in range(len(self.elements)-1, -1, -1):
- cycles[i] -= 1
- if cycles[i] == 0:
- indices[i:] = indices[i+1:] + indices[i:i+1]
- cycles[i] = len(self.elements) - i
- else:
- j = cycles[i]
- indices[i], indices[-j] = indices[-j], indices[i]
- count += 1
- break
- else:
- break
- batch = []
- remain_count = 0
- while remain_count < self.total_perms:
- perm = tuple(self.elements[i] for i in indices)
- batch.append(perm)
- remain_count += 1
- if len(batch) >= self.batch_size:
- self.current_batch += 1
- if self.state_manager:
- self.state_manager.update_state("current_batch_index", self.current_batch)
- yield batch
- batch = []
- # Get next permutation
- for i in range(len(self.elements)-1, -1, -1):
- cycles[i] -= 1
- if cycles[i] == 0:
- indices[i:] = indices[i+1:] + indices[i:i+1]
- cycles[i] = len(self.elements) - i
- else:
- j = cycles[i]
- indices[i], indices[-j] = indices[-j], indices[i]
- break
- else:
- break
- if batch:
- yield batch
- async def check_addresses_batch_async(mnemonics, session, derive_paths=None, state_manager=None):
- """Check a batch of mnemonics against blockchain APIs"""
- if derive_paths is None:
- derive_paths = DERIVATION_PATHS
- active_wallets = []
- # Create tasks for each mnemonic and path combination
- tasks = []
- for mnemonic in mnemonics:
- for path in derive_paths:
- address = get_address_from_mnemonic(mnemonic, path)
- if address:
- tasks.append((mnemonic, address, path))
- # Process in smaller batches to avoid overwhelming APIs
- batch_size = 20 # Adjust based on API rate limits
- for i in range(0, len(tasks), batch_size):
- current_batch = tasks[i:i+batch_size]
- subtasks = []
- for mnemonic, address, path in current_batch:
- subtasks.append(check_address_balance_async(session, address))
- # Wait for all subtasks to complete
- results = await asyncio.gather(*subtasks)
- # Process results
- for idx, (success, data) in enumerate(results):
- mnemonic, address, path = current_batch[idx]
- if success and data:
- balance = data.get("balance", 0)
- tx_count = data.get("tx_count", 0)
- if balance > 0 or tx_count > 0:
- logger.success(f"FOUND ACTIVE WALLET! Address: {address}, Path: {path}")
- logger.success(f"Mnemonic: {mnemonic}")
- logger.success(f"Balance: {balance} satoshis, Transactions: {tx_count}")
- active_wallets.append((mnemonic, address, path))
- if state_manager:
- state_manager.add_active_wallet(mnemonic, address, path)
- state_manager.save_state()
- return active_wallets
- async def process_permutations_async(words, num_procs=None, batch_size=10000,
- max_permutations=None, state_manager=None):
- """Find valid mnemonics from permutations and check addresses in batches"""
- if num_procs is None:
- num_procs = cpu_count()
- total_permutations = factorial(len(words))
- if max_permutations and total_permutations > max_permutations:
- logger.warning(f"Limiting to {max_permutations:,} permutations out of {total_permutations:,}")
- total_permutations = max_permutations
- logger.info(f"Total possible permutations: {total_permutations:,}")
- logger.info(f"Using {num_procs} processes for computation")
- active_wallets = []
- valid_mnemonics_total = 0
- processed = 0
- # Initialize progress bar
- pbar = tqdm(total=total_permutations, desc="Validating permutations")
- # Load the current batch index from state if available
- start_batch = 0
- if state_manager:
- start_batch = state_manager.get_state()["current_batch_index"]
- processed = state_manager.get_state()["total_processed"]
- pbar.update(processed)
- # Initialize permutation generator with state
- perm_gen = PermutationGenerator(words, batch_size, start_batch, state_manager)
- # Create a process pool for batch processing
- with Pool(processes=num_procs) as pool:
- # Start an HTTP session for checking addresses
- connector = aiohttp.TCPConnector(limit=100) # Increased connection limit
- async with aiohttp.ClientSession(connector=connector) as session:
- # Process permutations in batches
- for batch in perm_gen:
- batch_start_time = time.time()
- # Track processed permutations
- if max_permutations and processed >= max_permutations:
- logger.info(f"Reached maximum permutation count: {max_permutations:,}")
- break
- # Skip this batch if we've already reached max permutations
- if max_permutations and processed + len(batch) > max_permutations:
- batch = batch[:max_permutations - processed]
- # Split the batch into chunks for parallel processing
- chunk_size = max(1, len(batch) // num_procs)
- chunks = [batch[i:i + chunk_size] for i in range(0, len(batch), chunk_size)]
- # Process all chunks in parallel
- batch_results = pool.map(process_batch_permutations, chunks)
- # Collect valid mnemonics and process hashes
- batch_valid_mnemonics = []
- batch_processed_hashes = []
- for valid_mnems, proc_hashes in batch_results:
- batch_valid_mnemonics.extend(valid_mnems)
- batch_processed_hashes.extend(proc_hashes)
- # Update state
- if state_manager:
- for perm_hash in batch_processed_hashes:
- state_manager.add_processed_permutation(perm_hash)
- state_manager.update_state("total_processed",
- state_manager.get_state()["total_processed"] + len(batch))
- processed += len(batch)
- pbar.update(len(batch))
- # Save state periodically
- if state_manager and processed % (batch_size * 5) == 0:
- state_manager.save_state()
- # Check valid mnemonics in this batch
- if batch_valid_mnemonics:
- valid_mnemonics_total += len(batch_valid_mnemonics)
- logger.info(f"Found {len(batch_valid_mnemonics)} valid mnemonics in this batch")
- # Register valid mnemonics in state
- if state_manager:
- for mnemonic in batch_valid_mnemonics:
- state_manager.add_valid_mnemonic(mnemonic)
- # Check addresses for valid mnemonics
- batch_active_wallets = await check_addresses_batch_async(
- batch_valid_mnemonics, session, DERIVATION_PATHS, state_manager
- )
- if batch_active_wallets:
- logger.success(f"Found {len(batch_active_wallets)} active wallets in this batch!")
- active_wallets.extend(batch_active_wallets)
- batch_end_time = time.time()
- batch_duration = batch_end_time - batch_start_time
- perms_per_second = len(batch) / batch_duration if batch_duration > 0 else 0
- logger.debug(f"Batch processed in {batch_duration:.2f} seconds ({perms_per_second:.2f} perms/sec)")
- # If we have any active wallets, save state
- if active_wallets and state_manager:
- state_manager.save_state()
- # Complete the progress bar
- pbar.close()
- # Report statistics
- logger.info(f"Processed {processed:,} permutations")
- if valid_mnemonics_total > 0:
- percentage = (valid_mnemonics_total / processed) * 100 if processed > 0 else 0
- logger.success(f"Found {valid_mnemonics_total} valid mnemonics out of {processed:,} permutations")
- logger.success(f"Valid permutations: {percentage:.6f}%")
- return active_wallets, valid_mnemonics_total, processed
- async def process_paragraph_async(paragraph, max_combinations=None, num_procs=None, batch_size=10000,
- state_manager=None):
- """Process a paragraph: find valid permutations and check addresses in batches"""
- bip39_words = extract_bip39_words(paragraph)
- logger.info(f"Found {len(bip39_words)} BIP39 words: {', '.join(bip39_words)}")
- if len(bip39_words) < 12:
- logger.warning("Not enough BIP39 words in this paragraph")
- return []
- # Special case: exactly 12 words
- if len(bip39_words) == 12:
- logger.info("Testing words in original order first")
- mnemonic = ' '.join(bip39_words)
- if is_valid_mnemonic(bip39_words):
- logger.success(f"Original order is valid mnemonic: {mnemonic}")
- async with aiohttp.ClientSession() as session:
- for path in DERIVATION_PATHS:
- addr = get_address_from_mnemonic(mnemonic, path)
- logger.info(f"Bitcoin address ({path}): {addr}")
- # Check if address has transaction history
- success, data = await check_address_balance_async(session, addr)
- if success and data:
- balance = data.get("balance", 0)
- tx_count = data.get("tx_count", 0)
- if balance > 0 or tx_count > 0:
- logger.success(f"FOUND ACTIVE WALLET! Address: {addr}")
- logger.success(f"Mnemonic: {mnemonic}")
- logger.success(f"Balance: {balance} satoshis, Transactions: {tx_count}")
- if state_manager:
- state_manager.add_active_wallet(mnemonic, addr, path)
- state_manager.save_state()
- return [(mnemonic, addr, path)]
- # Process word combinations
- if len(bip39_words) > 12 and max_combinations:
- logger.info(f"Testing combinations of 12 words from {len(bip39_words)} words")
- # If more than 12 words, select combinations of 12
- total_combinations = factorial(len(bip39_words)) // (factorial(len(bip39_words) - 12) * factorial(12))
- logger.info(f"Total possible combinations: {total_combinations:,}")
- if max_combinations and total_combinations > max_combinations:
- logger.warning(f"Too many combinations, limiting to {max_combinations}")
- # Just use the first 12 words instead of random combinations
- bip39_words = bip39_words[:12]
- logger.info(f"Using first 12 words: {bip39_words}")
- # Start finding and checking valid permutations in batches
- start_time = time.time()
- logger.info(f"Finding and validating permutations with {num_procs or cpu_count()} parallel processes")
- active_wallets, valid_count, total_count = await process_permutations_async(
- bip39_words, num_procs, batch_size, max_combinations, state_manager
- )
- end_time = time.time()
- duration = end_time - start_time
- # Report final statistics
- logger.info(f"Processing completed in {duration:.2f} seconds")
- if active_wallets:
- logger.success(f"Found {len(active_wallets)} active wallets!")
- for mnemonic, address, path in active_wallets:
- logger.success(f"Mnemonic: {mnemonic}")
- logger.success(f"Address: {address} (Path: {path})")
- else:
- logger.info("No active wallets found")
- return active_wallets
- async def main_async():
- # Parse command line arguments
- parser = argparse.ArgumentParser(description='Bitcoin Mnemonic Scanner with Batch Processing')
- parser.add_argument('--threads', type=int, default=None, help='Number of threads/processes to use (default: auto)')
- parser.add_argument('--max-combinations', type=int, default=None, help='Maximum number of combinations to test')
- parser.add_argument('--batch-size', type=int, default=10000, help='Batch size for permutation processing (default: 10000)')
- parser.add_argument('--state-file', type=str, default="mnemonic_scanner_state.pkl", help='State file for resuming (default: mnemonic_scanner_state.pkl)')
- parser.add_argument('--force-restart', action='store_true', help='Force restart and ignore existing state file')
- parser.add_argument('--paragraphs-file', type=str, help='File containing paragraphs to scan, one per line')
- args = parser.parse_args()
- # Set number of threads if not specified
- if args.threads is None:
- args.threads = cpu_count()
- logger.info(f"Starting BIP39 mnemonic search with {args.threads} threads...")
- # Initialize state manager
- state_manager = StateManager(args.state_file)
- # Load state if not forcing restart
- resumed = False
- if not args.force_restart:
- resumed = state_manager.load_state()
- # Load paragraphs
- paragraphs = []
- if args.paragraphs_file and os.path.exists(args.paragraphs_file):
- with open(args.paragraphs_file, 'r') as f:
- paragraphs = [line.strip() for line in f if line.strip()]
- logger.info(f"Loaded {len(paragraphs)} paragraphs from {args.paragraphs_file}")
- else:
- # Default test paragraph
- paragraphs = [
- "This is a very long journey, don't expect that in few years everybody will achieve it. Have patience, perseverance and dedication into fighting this war against tyranny."
- ]
- logger.info(f"Testing {len(paragraphs)} paragraphs for valid BIP39 mnemonics...")
- # Start from the last paragraph if resuming
- start_index = 0
- if resumed:
- start_index = state_manager.get_state()["current_paragraph_index"]
- for i in range(start_index, len(paragraphs)):
- p = paragraphs[i]
- logger.info(f"\n=== Testing Paragraph {i+1}/{len(paragraphs)} ===")
- logger.info(p)
- # Update current paragraph index in state
- state_manager.update_state("current_paragraph_index", i)
- state_manager.update_state("current_batch_index", 0) # Reset batch index for new paragraph
- state_manager.save_state()
- start_time = time.time()
- active_wallets = await process_paragraph_async(
- p, max_combinations=args.max_combinations,
- num_procs=args.threads, batch_size=args.batch_size,
- state_manager=state_manager
- )
- end_time = time.time()
- duration = end_time - start_time
- logger.info(f"Paragraph processing completed in {duration:.2f} seconds")
- if active_wallets:
- logger.success(f"Found {len(active_wallets)} active wallets in paragraph {i+1}")
- else:
- logger.info(f"No active wallets found in paragraph {i+1}")
- def main():
- # Handle keyboard interrupts gracefully
- try:
- asyncio.run(main_async())
- except KeyboardInterrupt:
- logger.warning("Process interrupted by user. State has been saved.")
- logger.warning("Resume by running the same command without --force-restart flag.")
- sys.exit(1)
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement