Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from urllib.parse import unquote
- import csv
- import logging
- import sys
- import traceback
- import re
- import locale
- import bs4
- import requests
- contacts = [
- "Контакты",
- "контакты",
- "О нас",
- "о нас",
- "О компании",
- "о компании",
- "Связаться с нами",
- "связаться с нами",
- ]
- contacts = [x for x in contacts if x]
- ignor_emails = [
- "web@coffeestudio.ru",
- "Rating@Mail.ru",
- "Рейтинг@Mail.ru",
- "Ðåéòèíã@Mail.ru",
- "",
- ]
- ignor_emails = [x for x in ignor_emails if x]
- # инициализация
- headers = {"User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.61",
- "accept-language": "ru-RU,ru;q=0.9,en-US;q=0.8,en;q=0.7"}
- logging.basicConfig(datefmt="%Y-%m-%d %H:%M:%S",
- format="%(asctime)s: %(message)s",
- handlers=[logging.FileHandler('errors.log', 'a', 'utf-8')])
- email_regexp = re.compile("""(>|[\s]+|"|')([^>\s="']+@[^<\s\.]+\.[^<\s=^"']+)([\s]+|<|"|')""")
- locale_encoding = locale.getpreferredencoding()
- # функция для приведения адреса к читаемому виду в случае с кавычками и html заквторированными символами (html entities)
- def repair_email(email):
- if '\\' in email or '/' in email: return ''
- res = re.findall('&#[\d]+;', email)
- buf_email = email
- for finder in res:
- buf_email = buf_email.replace(finder, chr(int(finder[2:-1])))
- if buf_email.startswith('"'): buf_email = buf_email[1:]
- if buf_email.startswith("'"): buf_email = buf_email[1:]
- if buf_email.endswith('"'): buf_email = buf_email[:-1]
- if buf_email.endswith("'"): buf_email = buf_email[:-1]
- if ':' in buf_email: buf_email = buf_email.split(':')[1]
- if ';' in buf_email: buf_email = buf_email.split(';')[1]
- #if buf_email.endswith(","): buf_email = buf_email[:-1]
- buf_email = buf_email.strip('.,')
- if buf_email.endswith(".jpg") or buf_email.endswith(".png"): return ''
- if buf_email.startswith('<!--'): buf_email = buf_email[4:]
- if '@' not in buf_email: return ''
- return buf_email.strip()
- # поиск нужного тега со ссылкой на раздел контактов с адресами почты
- def has_text_contact(tag, contact):
- if tag.name == 'a':
- if contact in tag.text:
- return tag
- if tag.get('title', None) == contact:
- return tag
- # составление csv файла с перечнем организаций и их сайтов по запросу в google
- def parse_companies(query):
- try:
- # инициализация
- true_query = '+'.join(query.split())
- companies = []
- # выполняем основной запрос в Google
- url = "https://www.google.com/search?q=%s" % true_query
- res = requests.get(url, headers=headers)
- soup = bs4.BeautifulSoup(res.text, "html.parser")
- #with open("first_page.html", 'w', encoding = "utf-8") as f: f.write(soup.prettify())
- # ищем элемент с надписью "Другие места", а затем предыдущий тег <a> со ссылкой
- places_span_tag = soup.find("span", text="Другие места")
- next_page_url = None
- for buf_tag in places_span_tag.parents:
- if buf_tag.name == 'a':
- next_page_url = "https://www.google.com" + unquote(buf_tag["href"])
- break
- page_number = 1
- # в цикле читаем все адреса и названия всех компаний на всех страницах
- while next_page_url:
- print('page_number %s url "%s"' % (page_number, next_page_url))
- page_number += 1
- # выполняем чтение новой страницы
- res = requests.get(next_page_url, headers=headers)
- soup = bs4.BeautifulSoup(res.text, "html.parser")
- #with open("next_page.html", 'w', encoding = "utf-8") as f: f.write(soup.prettify())
- # ищем все теги сайтов и по каждому из них находим имя организации
- for site_div in soup.find_all("div", text="Сайт"):
- for buf_tag in site_div.parents:
- if buf_tag.name == 'a':
- site_url = unquote(buf_tag["href"])
- buf_element = buf_tag.previous_sibling # нахрдим предыдущий тег <a>
- title_element = buf_element.find("div", role="heading") # внутри тега <div> с именем
- company_name = title_element.text.strip()
- if not [True for buf_name, buf_url in companies if buf_url.replace('www.', '') == site_url.replace('www.', '')] and \
- not [True for buf_name, buf_url in companies if company_name == buf_name]:
- if company_name.startswith('"') and company_name.endswith('"'):
- company_name = company_name[1:-1]
- elif company_name.startswith("'") and company_name.endswith("'"):
- company_name = company_name[1:-1]
- company_name = company_name.replace('«', '"').replace('»', '"')
- company_name = company_name.replace('""', '"').strip()
- companies.append([company_name, site_url])
- break
- # ищем следующую страницу, если на есть
- next_page = soup.find("span", text="Следующая")
- if next_page:
- for buf_tag in next_page.parents:
- if buf_tag.name == 'a':
- next_page_url = "https://www.google.com" + unquote(buf_tag["href"])
- break
- else:
- next_page_url = None
- #break
- # сортируем компании в лексикографическом порядке
- companies.sort()
- #for name, site in companies: print(name, site)
- #print('found %s comanies' % len(companies))
- if not companies:
- print('sorry, but there is no companies found for query "%s"' % query)
- else:
- # пишем в csv файл найденные данные по компаниям
- companies.insert(0, ["title", "site"])
- with open("%s.csv" % query, 'w', newline='\n', encoding=locale_encoding) as csv_file:
- writer = csv.writer(csv_file, delimiter=';', quoting=csv.QUOTE_NONE, quotechar='')
- for line in companies:
- writer.writerow(line)
- print("csv saved (%s companies)\n" % len(companies))
- return True
- except Exception as err:
- msg = "error in parse_companies: %s" % str(err)
- print(msg)
- logging.exception(msg)
- # ищем e-mail адреса на сайтах организаций на всех предполагаемых страницах контактов
- def parse_contacts(query):
- # для тестовой отладки
- igonre_tiltes = [
- ]
- try:
- all_emails = []
- empties = []
- companies = []
- # открываем полученный ранее пропарсенный список организаций и их сайтов
- with open(query+".csv", 'r', encoding=locale_encoding) as f:
- data = [row[:-1].split(';') for row in f.readlines()]
- data.pop(0)
- # ищем почтовые адреса для всех компаний по очереди
- for number, (name, site) in enumerate(data):
- #if number < N: continue
- if site.endswith('/en/'): site = site[:-4] + '/ru/'
- elif site.endswith('/en'): site = site[:-3] + '/ru'
- print('%s/%s name="%s" site="%s"' % (number+1, len(data), name, site))
- emails = set()
- # получаем главную страницу сайта компании
- try:
- res = requests.get(site, headers=headers)
- soup = bs4.BeautifulSoup(res.text, "html.parser")
- except Exception as err:
- msg = 'error in parse_contacts main site "%s": %s' % (site, str(err))
- print('empty')
- logging.error(msg)
- continue
- # получаем все адреса почты на главной странице
- buf_emails = email_regexp.findall(res.text)
- buf_emails = [x[1] for x in buf_emails]
- emails.update(set(buf_emails))
- # получаем все почтовые адреса со всех найденных гипотетических страниц контактов
- buf_hrefs = []
- for contact_title in contacts:
- # ищем тег со ссылкой на страницу конактов
- find_a_tag = soup.find(lambda tag: has_text_contact(tag, contact_title))
- if find_a_tag:
- try:
- buf_href = find_a_tag['href']
- except:
- msg = 'error in parse_contacts - failed to find href site "%s", contact_title "%s", tag %s' % (site, contact_title, find_a_tag)
- #print('contact pass')
- logging.error(msg)
- continue
- # строим абсолютную ссылку на страницу контактов
- if buf_href.startswith('//'):
- buf_href = 'https:' + buf_href
- elif not buf_href.startswith('http'):
- if not buf_href.startswith('/'):
- buf_href = '/' + buf_href
- if site.endswith('/'):
- buf_href = site[:-1] + buf_href
- else:
- buf_href = site + buf_href
- # запрашиваем страницу с контактами
- buf_hrefs.append([contact_title, buf_href])
- try:
- res = requests.get(buf_href, headers=headers)
- #with open("temp_contact_%s.html" % contact_title, 'w', encoding = "utf-8") as f: f.write(res.text)
- except Exception as err:
- msg = 'error in parse_contacts - failed to request contact_url "%s" for site "%s": %s' % (buf_href, site, str(err))
- #print('contact pass')
- logging.error(msg)
- #print(msg)
- continue
- buf_emails = email_regexp.findall(res.text)
- buf_emails = [x[1] for x in buf_emails]
- emails.update(set(buf_emails))
- #print('find contact url:', contact_title, buf_href, buf_emails)
- emails = [repair_email(email) for email in emails]
- emails = [email for email in emails if email and email not in ignor_emails]
- emails = sorted(list(set(emails)))
- #all_emails.extend(emails)
- #print('site emails:', emails)
- if not emails:
- print('empty')
- # запоминаем компании в список с адресами и в список пустых адресов
- if not emails:
- empties.append([name.replace('""', '"'), site])
- else:
- companies.append([name.replace('""', '"'), site, ', '.join(emails)])
- """
- # для отладки
- if not emails:
- print('no emails')
- # если не было найден никакой страницы со ссылками, то вывести в файл страницу
- if not buf_hrefs:
- with open("temp.html", 'w', encoding = "utf-8") as f:
- f.write(soup.prettify())
- else:
- for buf_row in buf_hrefs:
- print(buf_row)
- if name in igonre_tiltes: continue
- break
- """
- # пишем в csv файл найденные данные по компаниям
- companies.insert(0, ["title", "site", "emails"])
- with open("%s (emails).csv" % query, 'w', newline='\n', encoding=locale_encoding) as csv_file:
- for line in companies:
- if ';' in line:
- print('!', line)
- csv_file.write(';'.join(line) + '\n')
- print("csv saved (%s companies)\n" % (len(companies)-1))
- # выводим список компаний без найденных почтовых адресов
- print('empties: %s' % len(empties))
- for number, (name, site) in enumerate(empties):
- print('%s/%s name="%s" site="%s"' % (number+1, len(empties), name, site))
- # вывод результатов финальной статистики
- print('\nresult statistic: produced contacts %s, empty contacts %s' % (len(companies)-1, len(empties)))
- # для отладки
- #all_emails.sort()
- #for email in all_emails:
- # print(email)
- except Exception as err:
- msg = "error in parse_companies: %s" % str(err)
- print(msg)
- logging.exception(msg)
- if __name__ == '__main__':
- query = input('Введите запрос для google поиска: ').strip()
- #query = 'Заводы Санкт-Петербург'
- assert query, 'Пустой запрос'
- res = parse_companies(query)
- if res:
- parse_contacts(query)
- print("\ndone")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement