Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import aiohttp
- import asyncio
- from lxml import html
- from aiolimiter import AsyncLimiter
- import random
- import signal
- from log_setup import setup_logger
- from state_manager import create_hash, load_previous_state, save_current_state, save_progress, load_progress
- from utils import get_categories
- from upload_product_files import upload_all_product_files, load_config
- from save_json_file import save_to_json
- import cachetools
- logger = setup_logger()
- headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
- }
- # Функция для выполнения запроса с повторными попытками
- @cachetools.cached(cache=cachetools.TTLCache(maxsize=1000, ttl=3600))
- async def fetch_with_retry(session, url, max_retries=3):
- for attempt in range(max_retries):
- try:
- async with session.get(url, headers=headers) as response:
- if response.status == 200:
- return await response.text()
- elif response.status == 429:
- wait_time = random.uniform(0.5, 1.5)
- logger.warning(f"Слишком много запросов. Ожидание {wait_time:.2f} секунд...")
- await asyncio.sleep(wait_time)
- else:
- logger.error(f"Ошибка {response.status} при запросе {url}")
- except aiohttp.ClientError as e:
- logger.error(f"Ошибка при запросе {url}: {str(e)}")
- wait_time = 2 ** attempt + random.uniform(0, 1)
- logger.info(f"Повторная попытка через {wait_time:.2f} секунд...")
- await asyncio.sleep(wait_time)
- raise Exception(f"Не удалось получить данные после {max_retries} попыток")
- async def scrape_products(categories):
- base_url = "https://megamailerdata.com"
- product_count = 0
- file_counter = 1
- batch_size = 20
- limiter = AsyncLimiter(100, 1)
- state = load_previous_state()
- progress = load_progress()
- async with aiohttp.ClientSession() as session:
- try:
- start_index = categories.index(state['last_processed_category']) if state['last_processed_category'] in categories else 0
- for i in range(start_index, len(categories), batch_size):
- batch = categories[i:i + batch_size]
- logger.info(f"Обработка пакета категорий {i + 1}-{min(i + batch_size, len(categories))} из {len(categories)}")
- tasks = []
- for category_url in batch:
- category_progress = progress if category_url == progress['category'] else {'category': category_url, 'page': 1, 'product_index': 0}
- logger.info(f"Начало обработки категории: {category_url}")
- save_progress(category_progress['category'], category_progress['page'], category_progress['product_index'])
- task = asyncio.create_task(scrape_category(session, category_url, base_url, file_counter, limiter, category_progress, state))
- tasks.append(task)
- results = await asyncio.gather(*tasks, return_exceptions=True)
- for category_url, result in zip(batch, results):
- if isinstance(result, Exception):
- logger.error(f"Ошибка при обработке категории {category_url}: {str(result)}")
- elif result is not None:
- category_product_count, file_counter = result
- product_count += category_product_count
- logger.info(f"Завершена обработка категории: {category_url}. Всего товаров: {product_count}")
- progress['category'] = batch[-1]
- state['last_processed_category'] = batch[-1]
- state['last_processed_page'] = 1
- progress['page'] = 1
- progress['product_index'] = 0
- save_progress(progress['category'], progress['page'], progress['product_index'])
- save_current_state(state)
- except KeyboardInterrupt:
- logger.warning("\nПарсинг остановлен пользователем. Прогресс сохранен.")
- save_progress(progress['category'], progress['page'], progress['product_index'])
- save_current_state(state)
- logger.info(f"Всего обработано товаров: {product_count}")
- async def scrape_category(session, category_url, base_url, file_counter, limiter, progress, state):
- async with limiter:
- try:
- await asyncio.sleep(random.uniform(0.5, 1.5))
- html_content = await fetch_with_retry(session, category_url)
- tree = html.fromstring(html_content)
- hierarchy = extract_hierarchy(tree)
- logger.info(f"Обработка категории: {hierarchy}")
- products = await parse_category_page(session, tree, base_url, hierarchy, limiter, progress, state)
- logger.info(f"Попытка сохранения {len(products)} продуктов для категории {hierarchy}")
- if products:
- new_file_counter = save_to_json(products, file_counter)
- logger.info(f"Сохранение завершено. Новый номер файла: {new_file_counter}")
- else:
- logger.warning(f"Нет продуктов для сохранения в категории {hierarchy}")
- new_file_counter = file_counter
- # Сохраняем прогресс только после обработки всей категории
- progress['page'] = 1
- progress['product_index'] = 0
- save_progress(category_url, progress['page'], progress['product_index'])
- # Принудительно сохраняем состояние после обработки категории
- save_current_state(state)
- logger.info(f"Обработано товаров в категории {hierarchy}: {len(products)}")
- return len(products), new_file_counter
- except Exception as e:
- logger.error(f"Ошибка при обработке категории {category_url}: {str(e)}")
- return 0, file_counter
- def extract_hierarchy(tree):
- pathway_div = tree.xpath("//div[contains(@class, 'pathway')]")[0]
- if pathway_div is not None:
- hierarchy = []
- for link in pathway_div.xpath(".//a[contains(@class, 'pathwaylink')]"):
- hierarchy.append(link.text.strip())
- last_item = pathway_div.xpath(".//span[contains(@class, 'pathwaylink')]")
- if last_item:
- hierarchy.append(last_item[0].text.strip())
- return " / ".join(hierarchy)
- return "Нет данных"
- async def parse_category_page(session, tree, base_url, hierarchy, limiter, progress, state):
- product_blocks = tree.xpath("//div[contains(@class, 'stretch coin-c coin-god')]")
- logger.info(f"Найдено {len(product_blocks)} товаров в категории {hierarchy}")
- tasks = []
- for block in product_blocks[progress['product_index']:]:
- task = asyncio.create_task(parse_product(session, block, base_url, hierarchy, limiter, progress, state))
- tasks.append(task)
- products = await asyncio.gather(*tasks)
- return [product for product in products if product]
- async def parse_product(session, block, base_url, hierarchy, limiter, progress, state):
- try:
- product = {}
- product_name = block.xpath(".//h2/text()")[0].strip() if block.xpath(".//h2") else None
- img_tag = block.xpath(".//img[contains(@class, 'cboxPhoto')]")[0]
- product_image = img_tag.get('data-src') or img_tag.get('src') if img_tag is not None else "Нет изображения"
- if not product_name or product_image == "Нет изображения":
- return None
- product['hierarchy'] = hierarchy
- product['name'] = product_name
- product['image'] = product_image
- tables = block.xpath(".//div[@class='table']")
- if tables:
- table = tables[0]
- rows = table.xpath(".//div[@class='tr']")
- product['characteristics'] = []
- for row in rows:
- row_data = parse_row(row)
- # Парсинг цен
- price_block = row.xpath(".//div[contains(@class, 'prices')]")
- if price_block:
- price_link = price_block[0].xpath(".//a")
- if price_link:
- price_count_text = price_link[0].xpath(".//span[2]/text()")
- if price_count_text:
- price_count_text = price_count_text[0].strip()
- if price_count_text.isdigit() and int(price_count_text) > 0:
- full_price_url = base_url + price_link[0].get('href')
- logger.info(f"Найдено {price_count_text} цен для товара {product_name}: {full_price_url}")
- async with limiter:
- row_data['extra_prices'] = await parse_seller_page(session, full_price_url, limiter, state)
- else:
- row_data['extra_prices'] = []
- row_data['prices'] = f"Цена: {price_count_text}"
- else:
- logger.warning(f"Не удалось найти текст цены для товара {product_name}")
- row_data['prices'] = "Цена: не найдена"
- else:
- logger.warning(f"Для товара {product_name} найден блок цен, но нет ссылки")
- row_data['prices'] = "Цена: 0 (нет ссылки)"
- else:
- logger.warning(f"Для товара {product_name} не найден блок цен")
- row_data['prices'] = "Цена: 0 (блок цен не найден)"
- product['characteristics'].append(row_data)
- return product
- except Exception as e:
- logger.error(f"Ошибка при обработке продукта {product_name if 'product_name' in locals() else 'Unknown'}: {str(e)}")
- return None
- def parse_row(row):
- try:
- return {
- 'year': row.xpath(".//div[contains(@class, 'year')]/text()")[0].strip() if row.xpath(".//div[contains(@class, 'year')]") else "Не указано",
- 'sign': row.xpath(".//div[contains(@class, 'sign')]/text()")[0].strip() if row.xpath(".//div[contains(@class, 'sign')]") else "Не указано",
- 'desc': "".join(row.xpath(".//div[contains(@class, 'desc')]//text()")).strip() if row.xpath(".//div[contains(@class, 'desc')]") else "Не указано",
- 'gurt': row.xpath(".//div[contains(@class, 'gurt')]/@data-hide")[0].strip() if row.xpath(".//div[contains(@class, 'gurt')]/@data-hide") else "Не указано",
- 'metal': row.xpath(".//div[contains(@class, 'metal')]/text()")[0].strip() if row.xpath(".//div[contains(@class, 'metal')]") else "Не указано",
- 'bitkin': row.xpath(".//div[contains(@class, 'bitkin')]//a/@data-bitkin")[0].strip() if row.xpath(".//div[contains(@class, 'bitkin')]//a/@data-bitkin") else
- "".join(row.xpath(".//div[contains(@class, 'bitkin')]//text()")).strip() if row.xpath(".//div[contains(@class, 'bitkin')]") else "Не указано",
- 'adrianov': row.xpath(".//div[contains(@class, 'adrian')]/text()")[0].strip() if row.xpath(".//div[contains(@class, 'adrian')]") else "Не указано",
- 'fedorin': row.xpath(".//a[contains(@class, 'group-1427')]/@data-bitkin")[0] if row.xpath(".//a[contains(@class, 'group-1427')]/@data-bitkin") else "Не указано",
- }
- except Exception as e:
- logger.error(f"Ошибка при парсинге строки: {str(e)}")
- return {}
- async def parse_seller_page(session, url, limiter, state):
- all_price_info = []
- current_page = 1
- total_prices = 0
- while True:
- page_url = f"{url}?page={current_page}#sort_table" if current_page > 1 else url
- logger.info(f"Обработка страницы с ценами: {page_url}")
- async with limiter:
- try:
- html_content = await fetch_with_retry(session, page_url)
- tree = html.fromstring(html_content)
- price_table = tree.xpath("//div[contains(@class, 'table')]")
- if price_table:
- rows = price_table[0].xpath(".//div[contains(@class, 'tr')]")
- if not rows:
- logger.info(f"Не найдено строк с ценами на странице {page_url}")
- break
- for i, row in enumerate(rows):
- price_info = await parse_price_row_async(session, row, tree, limiter, state)
- if price_info:
- all_price_info.append(price_info)
- total_prices += 1
- else:
- logger.warning(f"Не найдена таблица с ценами на странице {page_url}")
- break
- # Проверка наличия следующей страницы
- pagebar = tree.xpath("//div[contains(@class, 'pagebar')]")
- if pagebar:
- next_page = pagebar[0].xpath(".//a[contains(@class, 'pagebar_page next')]")
- if next_page:
- current_page += 1
- logger.info(f"Переход на следующую страницу: {current_page}")
- else:
- logger.info(f"Достигнута последняя страница {current_page} для {url}")
- break
- else:
- logger.info(f"Не найдена панель пагинации на странице {page_url}")
- break
- except Exception as e:
- logger.error(f"Ошибка при обработке страницы {current_page} для {url}: {e}")
- break
- logger.info(f"Найдено {total_prices} цен для {url}")
- return all_price_info
- async def parse_price_row_async(session, row, tree, limiter, state):
- row_data = parse_price_row(row)
- if row_data['price'] != "Нет данных":
- specs_data = parse_technical_specs(tree)
- row_data['technical'] = specs_data['technical']
- row_data['literature'] = specs_data['literature']
- photo_div = row.xpath(".//div[contains(@class, 'photo')]")
- if photo_div:
- lot_link = photo_div[0].xpath(".//a/@href")[0]
- full_lot_url = f"https://megamailerdata.com{lot_link}"
- current_hash = create_hash(row_data)
- # Проверяем, обрабатывали ли мы эту запись ранее
- if full_lot_url in state['processed_data'] and state['processed_data'][full_lot_url] == current_hash:
- logger.info(f"Пропуск уже обработанной записи: {full_lot_url}")
- return None
- async with limiter:
- seller_description, lot_url, evaluation = await parse_seller_description(session, full_lot_url)
- row_data['seller_description'] = seller_description
- row_data['lot_url'] = lot_url
- row_data['evaluation'] = evaluation
- # Обновляем состояние
- state['processed_data'][full_lot_url] = current_hash
- save_current_state(state)
- return row_data
- return None
- async def parse_seller_description(session, url):
- try:
- html_content = await fetch_with_retry(session, url)
- tree = html.fromstring(html_content)
- lot_desc = tree.xpath("//div[contains(@class, 'lot-desc')]")[0]
- description = "Описание продавца не найдено"
- if lot_desc is not None:
- description_elements = lot_desc.xpath(".//p|.//ul|.//li")
- description_parts = []
- for element in description_elements:
- if element.tag == "p":
- description_parts.append(element.text_content().strip())
- elif element.tag == "ul":
- for li in element.xpath(".//li"):
- description_parts.append(li.text_content().strip())
- description = "\n".join(description_parts)
- evaluation = "Нет данных"
- price_span = tree.xpath("//span[contains(@class, 'price_orig') and @data-cache-curs]")[0]
- if price_span is not None:
- evaluation = price_span.get('data-cache-curs')
- return description, url, evaluation
- except Exception as e:
- logger.error(f"Ошибка при парсинге описания продавца для {url}: {str(e)}")
- return "Описание продавца не найдено", url, "Нет данных"
- def parse_price_row(row):
- return {
- 'date': row.xpath(".//div[contains(@class, 'date')]/text()")[0].strip() if row.xpath(".//div[contains(@class, 'date')]") else "Не указано",
- 'seller': "".join(row.xpath(".//div[contains(@class, 'wrap-name')]//div[contains(@class, 'name')]//div/text()")).strip() if row.xpath(".//div[contains(@class, 'wrap-name')]//div[contains(@class, 'name')]") else "Не указано",
- 'condition': row.xpath(".//div[contains(@class, 'sohr')]/text()")[0].strip() if row.xpath(".//div[contains(@class, 'sohr')]") else "Не указано",
- 'price': row.xpath(".//span[contains(@class, 'price_orig')]/text()")[0].strip() if row.xpath(".//span[contains(@class, 'price_orig')]") else "Нет данных",
- }
- def parse_technical_specs(tree):
- specs_data = {'technical': [], 'literature': []}
- specs_block = tree.xpath("//div[contains(@class, 'specs')]")
- if specs_block:
- tech_section = specs_block[0].xpath(".//ul")
- if tech_section:
- tech_list = tech_section[0].xpath("./li")
- specs_data['technical'] = [li.text_content().strip() for li in tech_list]
- literature_section = specs_block[0].xpath(".//ul[@id='literature']")
- if literature_section:
- literature_list = literature_section[0].xpath("./li")
- specs_data['literature'] = [li.text_content().strip() for li in literature_list]
- return specs_data
- def signal_handler(signum, frame):
- raise KeyboardInterrupt()
- if __name__ == "__main__":
- try:
- categories = get_categories()
- signal.signal(signal.SIGINT, signal_handler)
- logger.info("Начало работы скрипта")
- asyncio.run(scrape_products(categories))
- except KeyboardInterrupt:
- logger.info("Скрипт прерван пользователем. Прогресс сохранен.")
- except Exception as e:
- logger.error(f"Произошла ошибка: {str(e)}")
- finally:
- logger.info("Работа скрипта завершена")
Advertisement
Add Comment
Please, Sign In to add comment