Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from extract_emails import ExtractEmails
- import re
- import cloudscraper
- from bs4 import BeautifulSoup
- import pandas as pd
- import os
- import concurrent.futures
- import phonenumbers
- import sys
- try:
- command_arg = sys.argv[1]
- #if command_arg != "AU" or command_arg != "US":
- #sys.exit("\nCommand line argument should be 'AU' or 'US'")
- except IndexError:
- sys.exit("\nPlease provide a command line argument\nFor example 'python name_of_script.py AU' for Australian domains or 'python name_of_script.py US' for US domains")
- RESULTS = []
- def locate_txt_files(path=os.getcwd):
- """ A function that locates and returns a list of txt files from a given path """
- txt_array = []
- for item in os.listdir(path()):
- if item.endswith(".txt"):
- txt_array.append(item)
- return txt_array
- def open_and_prepare_urls2(txt_file_array):
- final_domains = []
- for txt_file in txt_file_array:
- with open(txt_file, encoding="utf-8") as f:
- content = f.read()
- content = content.split("au")[0:]
- domain_urls = [item.replace("|", "").replace("\n", "").replace(",", "").strip()+"au" for item in content if "." in item]
- final_domains.extend(domain_urls)
- print(f"Total of {len(domain_urls)} '.au' domains found in {txt_file}")
- print("\n")
- return final_domains
- ###### New Additions ####
- def getEmail(url):
- em = ExtractEmails(url, depth=5, ssl_verify=True)
- return em.emails
- def getPhone(source):
- phones_array = []
- phones = re.findall(r'[\+\(]?[0-9][0-9 .\-\(\)]{8,}[0-9]', source)
- prefixes = ["1300", "1800",
- "02", "03", "04", "05", "07", "08",
- "612", "613", "614", "615", "617", "618"]
- for prefix in prefixes:
- for phone in phones:
- if phone.startswith(str(prefix)) and len(phone.strip().replace(" ", "").strip()) <= 12:
- phones_array.append(phone)
- try:
- phones_array = [phone.replace(" ", "").strip() for phone in phones_array]
- except:
- pass
- try:
- phones_array = sorted(set(phones_array), key=phones_array.index)
- except:
- pass
- return phones_array
- def googleScrapePhones(text):
- phones_list = []
- for match in phonenumbers.PhoneNumberMatcher(text, command_arg):
- phones_list.append(phonenumbers.format_number(match.number, phonenumbers.PhoneNumberFormat.E164))
- phones_set = sorted(set(phones_list), key=phones_list.index)
- return ", ".join(phones_set)
- scraper = cloudscraper.CloudScraper(browser="chrome")
- def has_ads(text, keyword="ads"):
- return keyword in text
- def getContent(url):
- is_ad = False
- r = scraper.get(url)
- soup = BeautifulSoup(r.text, "html.parser")
- Source = str(soup)
- is_ad = has_ads(text=Source)
- try:
- fones = googleScrapePhones(text=Source)
- except Exception as e:
- #print(e)
- pass
- if fones:
- fones = fones
- #fones = ", ".join(fones)
- else:
- fones = ""
- try:
- e_mails = getEmail(url)
- except Exception as e:
- #print(e)
- pass
- if e_mails:
- e_mails = ", ".join(e_mails)
- else:
- e_mails = ""
- RESULTS.append((url, fones, e_mails, is_ad))
- def getData(url):
- fones = ""
- e_mails = ""
- contact_link = ""
- if "http" not in url:
- url = "http://"+url
- #perform scraping
- getContent(url)
- ##### Apply #####
- txt_files_list = locate_txt_files()
- domains = open_and_prepare_urls2(txt_files_list)
- total_time = int(len(domains)) * 6.7
- print("Starting scraper\nPlease wait...\nEstimated time in sec(s):\n"+ str(total_time))
- with concurrent.futures.ThreadPoolExecutor() as executor:
- executor.map(getData, domains)
- general_result = []
- result_04 = []
- for item_tuple in RESULTS:
- #Only 04 numbers
- obj_04 = {}
- obj_gen = {}
- phone_str = ""
- phone_gen_str = ""
- for item in item_tuple[1].split(", "):
- if item.startswith("+614"):
- phone_str+= item + ", "
- else:
- phone_gen_str+=item + ", "
- if len(phone_str) >=3:
- obj_04["Domain"] = item_tuple[0]
- obj_04["Phone Number"] = phone_str
- obj_04["Email"] = item_tuple[2]
- obj_04["ads"] = item_tuple[3]
- result_04.append(obj_04)
- obj_gen["Domain"] = item_tuple[0]
- obj_gen["Phone Number"] = phone_gen_str
- obj_gen["Email"] = item_tuple[2]
- obj_gen["ads"] = item_tuple[3]
- general_result.append(obj_gen)
- df = pd.DataFrame(general_result)
- df_04 = pd.DataFrame(result_04)
- #Filter conditions
- filt_ads = (df["ads"]== True)
- filt_no_ads = (df["ads"]== False)
- filt_04_ads = (df_04["ads"] == True)
- filt_04_no_ads = (df_04["ads"] == False)
- #create ads dataframe from df and df_04
- df_ads = df[filt_ads]
- df_no_ads = df[filt_no_ads]
- df_04_ads = df_04[filt_04_ads]
- df_04_no_ads = df_04[filt_04_no_ads]
- df_ads.to_excel("result_landline_ads.xlsx", index=False)
- df_no_ads.to_excel("result_landline_no_ads.xlsx", index=False)
- df_04_ads.to_excel("result_04_mobile_ads.xlsx", index=False)
- df_04_no_ads.to_excel("result_04_mobile_no_ads.xlsx", index=False)
Advertisement
Add Comment
Please, Sign In to add comment