Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- ## for scraping urls according to queues and maintaining said queues ##################
- ## A more object-oriented version of original [ https://pastebin.com/TBtYja5D ] #######
- ## a simplified version [using csv, json and global variables instead of a database] ##
- ## example of usage at
- ### https://docs.google.com/spreadsheets/d/1rMgkO9S1s_HKXBzJps5qfvrjVf-YtmqJ9Hp6_UFqGls
- from urllib.parse import urljoin, urlsplit
- import pandas, json
- ##################################################################################################
- ### FOR FETCHING AND PARSING HTML FROM URL #######################################################
- ### from https://pastebin.com/rBTr06vy [also suggests variations with other libraries] ###########
- import requests
- from bs4 import BeautifulSoup
- def linkToSoup(targetUrl, conf={}, isv=True, returnErr=False):
- parser = conf["parser"] if "parser" in conf else None
- try:
- r = requests.get(targetUrl)
- if isv: print(f'<{r.status_code} {r.reason}> /{parser} from {r.url}')
- if r.status_code == 200: return BeautifulSoup(r.content, parser) ## [CAN +MORE TESTS] ##
- errMsg = f'<{r.status_code} {r.reason}> - '
- errMsg = f'{errMsg}Failed to scrape {targetUrl}'
- except Exception as e: errMsg = f'Failed to scrape {targetUrl} \n - errorMsg: "{str(e)}"'
- if isv: print(errMsg)
- return errMsg if returnErr else None
- #### SOME ALTERNATIVES:
- ###### with ScrapingAnt: https://pastebin.com/5ibz2F6p ###########################################
- ###### with selenium: https://pastebin.com/VLZ2vPYK , https://pastebin.com/kEC9gPC8 ##############
- ##################################################################################################
- class qScrawlr:
- def __init__(self, starter, setVars={}, clrLog=False):
- self.starterUrl = starter # SOME VALID URL
- ## defaults:
- self.pageLimit = 10
- self.maxScrapes = 30
- self.scrapeCt = 0
- self.curUrlId = 0
- self.scrawlLogs = {} # [in a database this would be one or more tables]
- self.skipUrls = []
- self.samesite = False
- if isinstance(starter, str):
- setVars['starterUrl'] = starter
- self.setGlobals(setVars, clrLog)
- else:
- if isinstance(starter, dict): self.loadScrawlSess(**starter)
- else: self.loadScrawlSess(*starter)
- def setGlobals(self, varDict, clearLog=False):
- print('updating properties with', varDict)
- ## could be interacting with database intead of:
- if 'starterUrl' in varDict: self.starterUrl = varDict['starterUrl']
- if 'pageLimit' in varDict: self.pageLimit = varDict['pageLimit']
- if 'curUrlId' in varDict: self.curUrlId = varDict['curUrlId']
- if 'skipUrls' in varDict:
- self.skipUrls = list(set(varDict['skipUrls'] + self.skipUrls))
- if 'maxScrapes' in varDict: self.maxScrapes = varDict['maxScrapes']
- if 'scrapeCt' in varDict: self.scrapeCt = varDict['scrapeCt']
- if 'samesite' in varDict: self.samesite = varDict['samesite']
- if clearLog: self.scrawlLogs = {}
- print('properties after updating:', {
- k: v for k, v in self.__dict__.items() if k != 'scrawlLogs'})
- def get_next_fromScrawlQ(self):
- ## could be interacting with database intead of:
- if self.scrawlLogs == {}: return self.starterUrl
- if self.scrapeCt > self.maxScrapes: return None
- elig = [
- k for k, l in self.scrawlLogs.items()
- if l['status'] == 'queued'
- ][:1]
- return elig[0] if elig else None
- def get_urlId(self, iUrl, refUrlId=None):
- ## could be interacting with database intead of:
- print('', end=f'\r[curUrlId={self.curUrlId}] getting id for {iUrl}')
- if (not iUrl) or iUrl[0] == '!': iUrl = f'{iUrl}_{self.curUrlId+1}'
- if iUrl not in self.scrawlLogs:
- if not refUrlId: return None
- self.scrawlLogs[iUrl] = {
- 'url': iUrl, 'refUrlId': refUrlId, 'status': None}
- if 'urlId' in self.scrawlLogs[iUrl] and self.scrawlLogs[iUrl]['urlId']:
- return self.scrawlLogs[iUrl]['urlId']
- self.curUrlId += 1
- self.scrawlLogs[iUrl]['urlId'] = self.curUrlId
- return self.curUrlId
- def addUrl_to_queue(self, aUrl, rootUrl, refUrlId, qStatus='queued'):
- ## could be interacting with database intead of:
- aUrl = urljoin(rootUrl, aUrl)
- if aUrl.startswith('javascript:'): return False
- auId = self.get_urlId(aUrl, refUrlId=refUrlId)
- print('', end=f'\r[curUrlId={self.curUrlId}] Adding [#{auId}]: {aUrl}')
- if self.scrawlLogs[aUrl]['status'] in [None, 'retry']:
- self.scrawlLogs[aUrl]['status'] = (
- 'skip' if aUrl in self.skipUrls else qStatus )
- return not (aUrl in self.skipUrls)
- return False
- def logScrape(self, scrapeRes):
- ## could be interacting with database intead of:
- sUrl = scrapeRes['url'] if 'url' in scrapeRes else '!missingUrl'
- auId, em = self.get_urlId(sUrl), '' # initiate url logId & error msg
- print('', end=(f'\r[{self.curUrlId}] Logging ' +
- f'[{self.scrapeCt}][#{auId}]: {sUrl}'))
- if type(scrapeRes) != dict:
- em = f'ScrapeResults should be <dict> not {type(scrapeRes)!=dict}'
- self.scrawlLogs[sUrl]['errorMessage'] = f'Invalid Format - {em}'
- self.scrawlLogs[sUrl]['stringifiedResult'] = str(scrapeRes)
- self.scrawlLogs[sUrl]['status'] = '!fail'
- return self.get_next_fromScrawlQ()
- for k, v in scrapeRes.items():
- if k != 'pageUrls': self.scrawlLogs[sUrl][k] = v
- if sUrl == '!missingUrl':
- scrapeRes['errorMessage'], em = '!missingUrl', '[missingUrl]'
- pul = scrapeRes['pageUrls'] if 'pageUrls' in scrapeRes else []
- pageUrls, ntq = (pul[:] if type(pul) == list else []), 0
- for i, pl in enumerate(pageUrls):
- pageUrls[i], qstat = self.get_urlId(pl,auId), 'queued'
- if type(self.pageLimit) == int and self.pageLimit < ntq:
- qstat = 'page_limit_exceeded'
- ntq += int(self.addUrl_to_queue(pl, sUrl, auId, qstat))
- pageUrls = [p for p in pageUrls if p]
- self.scrawlLogs[sUrl]['pageUrlCt'] = len(pageUrls)
- self.scrawlLogs[sUrl]['newToQueue'] = ntq
- if pageUrls: self.scrawlLogs[sUrl]['pageUrlIds'] = pageUrls
- self.scrawlLogs[sUrl]['status'] = f'!error{em}' if (
- 'errorMessage' in scrapeRes) else ((
- '' if pageUrls else '?')+'scraped')
- return self.get_next_fromScrawlQ()
- def getPageUrls(self, sUrl, scSoup):
- pUrls = [urljoin(sUrl, a['href']) for a in scSoup.select('a[href]')]
- if self.samesite:
- rnl = urlsplit(sUrl).netloc
- pUrls = [u for u in pUrls if urlsplit(u).netloc==rnl]
- ## CAN ADD MORE FILTERS ##
- return pUrls
- def getDataFromPage(self, srcUrl, sSoup):
- pageUrls = self.getPageUrls(srcUrl, sSoup)
- ####################### EXTRACT SOME PAGE DATA #######################
- ## [only extracting page title and [truncted] body text]
- pageTitle = sSoup.select_one('head title')
- pageTitle = pageTitle.get_text().strip() if pageTitle else None
- pageText = ' '.join([word for word in (
- sSoup.main if sSoup.main else (sSoup.body if sSoup.body else sSoup)
- ).get_text(' ').split() if word])
- if len(pageText) > 70:
- pageText = pageText[:33] + '...' + pageText[-33:]
- ## for a function that can extract various data from html,
- #### take a look at "htreeToDict" at https://pastebin.com/BpjZSQPi
- #### [returns a python dictionary containing specified details]
- ######################################################################
- return {
- 'url': srcUrl, 'pageTitle': pageTitle,
- 'pageText': pageText, 'pageUrls': pageUrls
- }
- def scrapeUrl(self, sUrl):
- auId = self.get_urlId(
- sUrl, '[starter]' if sUrl==self.starterUrl else None)
- self.scrawlLogs[sUrl]['status'] = 'scraping'
- self.scrapeCt = self.scrapeCt + 1
- print('', end=(
- f'\r[{self.curUrlId}] Scraping ' +
- f'[{self.scrapeCt}][#{auId}]: {sUrl}'))
- scSoup = linkToSoup(sUrl, isv=False, returnErr=True)
- if type(scSoup) == str:
- return {'url': sUrl, 'errorMessage': scSoup}
- return self.getDataFromPage(sUrl, scSoup)
- ## [probably wouldn't need if using a database]
- def saveScrawlSess(self, logPath, varsPath, logMode='w'):
- logPath, varsPath = str(logPath), str(varsPath)
- if logMode != 'a': logMode = 'w'
- scrawlDF = pandas.DataFrame(list(self.scrawlLogs.values()))
- scrawlDF.to_csv(logPath, index=False, mode=logMode)
- print('Saved scrawlLogs to', logPath)
- with open(varsPath, 'w') as f: json.dump({
- 'starterUrl': self.starterUrl, 'pageLimit': self.pageLimit,
- 'maxScrapes': self.maxScrapes, 'scrapeCt': self.scrapeCt,
- 'curUrlId': self.curUrlId, 'skipUrls': self.skipUrls
- }, f, indent=4)
- print('Saved gloabals to', varsPath)
- ## [probably wouldn't need if using a database]
- def loadScrawlSess(self, logPath, varsPath, mode='continue'):
- prevLog, prevVars = [], {}
- try: prevLog = pandas.read_csv(logPath).to_dict('records')
- except Exception as e:
- print(f'Unable to load log from {logPath}: {e}')
- try: prevVars = json.load(open(varsPath, 'r'))
- except Exception as e:
- print(f'Unable to load vars from {varsPath}: {e}')
- self.scrawlLogs = { r['url']: r for r in (
- list(self.scrawlLogs.values()) + prevLog) }
- print('queue length after loading:', len(self.scrawlLogs))
- if mode != 'continue': prevVars['scrapeCt'] = 0
- self.setGlobals(prevVars)
- ## [if using database, this could be a scheduled procedure]
- if mode in ['q<--page_limit_exceeded', 'q<--allUnscraped']:
- eligRefs, eligUrls = list(set([
- l['refUrlId'] for l in self.scrawlLogs.values()
- if l['status'] == 'page_limit_exceeded'
- ])), []
- for r in eligRefs:
- eligUrls += [
- u for u, l in self.scrawlLogs.items()
- if l['refUrlId'] == r
- ][:self.pageLimit]
- for u in eligUrls: self.scrawlLogs[u]['status'] = 'queued'
- if mode in ['q<--NaN', 'q<--allUnscraped']:
- eligRefs, eligUrls = list(set([
- l['refUrlId'] for l in self.scrawlLogs.values()
- if not l['status'] or pandas.isna(l['status'])
- ])), []
- for r in eligRefs:
- eligUrls += [
- u for u, l in self.scrawlLogs.items()
- if l['refUrlId'] == r
- ][:self.pageLimit]
- for u in eligUrls: self.scrawlLogs[u]['status'] = 'queued'
- def run(self, saveTo=None):
- nextUrl = self.get_next_fromScrawlQ()
- while nextUrl: nextUrl = self.logScrape(self.scrapeUrl(nextUrl))
- print()
- if isinstance(saveTo, dict): self.saveScrawlSess(**saveTo)
- elif hasattr(saveTo, '__iter__'): self.saveScrawlSess(*saveTo)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement