import requests import re import time import os from urllib.parse import quote from typing import List, Optional, Dict, Any # Configuration OUTPUT_FILE = 'found_keys.txt' GITHUB_TOKEN = '' HEADERS = { 'Authorization': f'token {GITHUB_TOKEN}', 'Accept': 'application/vnd.github.v3+json' } MAX_RETRIES = 3 DELAY_BETWEEN_REQUESTS = 5 # увеличили задержку class GitHubScanner: def __init__(self, token: str, output_file: str): self.token = token self.output_file = output_file self.headers = { 'Authorization': f'token {token}', 'Accept': 'application/vnd.github.v3+json' } def get_rate_limit(self) -> Dict: """ Проверка оставшихся лимитов API """ response = requests.get('https://api.github.com/rate_limit', headers=self.headers) return response.json()['resources']['search'] def search_github(self, query: str) -> Optional[Dict[str, Any]]: """ Поиск по GitHub с использованием API """ url = f'https://api.github.com/search/code?q={quote(query)}' print(f"Searching with query: {query}") # Проверяем лимиты перед запросом rate_limit = self.get_rate_limit() if rate_limit['remaining'] == 0: reset_time = rate_limit['reset'] wait_time = max(reset_time - time.time(), 0) + 1 print(f"Rate limit exceeded. Waiting {wait_time:.0f} seconds...") time.sleep(wait_time) for attempt in range(MAX_RETRIES): try: response = requests.get(url, headers=self.headers) if response.status_code == 200: results = response.json() print(f"Results found: {len(results.get('items', []))}") return results elif response.status_code == 403: print("Rate limit exceeded. Waiting for reset...") time.sleep(60) # Ждем минуту перед повторной попыткой else: print(f"Error {response.status_code}: {response.text}") time.sleep(DELAY_BETWEEN_REQUESTS) except Exception as e: print(f"Request failed: {str(e)}") time.sleep(DELAY_BETWEEN_REQUESTS) return None def get_raw_content(self, url: str) -> Optional[str]: """ Получение содержимого файла из GitHub """ try: raw_url = url.replace('github.com', 'raw.githubusercontent.com').replace('/blob/', '/') print(f"Fetching content from: {raw_url}") response = requests.get(raw_url, headers=self.headers) if response.status_code == 200: return response.text else: print(f"Error fetching content: {response.status_code}") return None except Exception as e: print(f"Error while fetching content: {str(e)}") return None def extract_keys(self, content: str) -> List[str]: """ Извлечение API ключей из контента """ if not content: return [] pattern = r'AIzaSy[A-Za-z][A-Za-z0-9-_]{32}' keys = re.findall(pattern, content) if keys: print(f"Found {len(keys)} potential API keys") return list(set(keys)) def save_keys(self, keys: List[str]) -> None: """ Сохранение найденных ключей в файл """ if not keys: return try: with open(self.output_file, 'a', encoding='utf-8') as f: for key in keys: f.write(f"{key}\n") print(f"Saved {len(keys)} keys to {self.output_file}") except Exception as e: print(f"Error saving keys: {str(e)}") def scan(self) -> None: """ Основной метод сканирования """ # Используем более общие поисковые запросы queries = [ 'AIzaSy', # Базовый поиск 'filename:config AIzaSy', # Поиск в конфигурационных файлах 'filename:.env AIzaSy', # Поиск в .env файлах 'filename:settings AIzaSy' # Поиск в файлах настроек ] for query in queries: results = self.search_github(query) if not results: continue for item in results.get('items', []): content = self.get_raw_content(item['html_url']) if content: keys = self.extract_keys(content) if keys: print(f"Found keys in {item['html_url']}") self.save_keys(keys) time.sleep(DELAY_BETWEEN_REQUESTS) def main(): scanner = GitHubScanner(GITHUB_TOKEN, OUTPUT_FILE) scanner.scan() if __name__ == '__main__': main()