Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # AmBotTools.py
- import certifi
- import ssl
- from urllib.request import urlopen, Request
- from http.client import HTTPResponse
- import bs4
- from bs4 import BeautifulSoup
- class AmBotTools:
- KEY_ANCHOR = "anchor"
- KEY_HREF = "href"
- CHROME_SIGNATURE = "Chrome"
- FF78_SIGNATURE =\
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:78.0) Gecko/20100101 Firefox/78.0"
- def __init__(self):
- self.mCAFile = certifi.where()
- self.mSSLContext =\
- ssl.create_default_context(
- cafile=self.mCAFile
- )
- # def __init__
- def consumeUrl(
- self,
- pUrl:str,
- pbPreferBytes:bool=False,
- pStrEncoding="UTF-8",
- pHeaders:dict={}
- ):
- theRequest:Request = Request(
- url=pUrl,
- # custom http-headers
- headers=pHeaders
- )
- response:HTTPResponse =\
- urlopen(
- theRequest,
- #url = pUrl,
- context=self.mSSLContext
- )
- theBytes = response.read()
- if(pbPreferBytes):
- return theBytes
- else:
- strResponse = str(
- theBytes,
- pStrEncoding
- )
- return strResponse
- # if-else
- # def consumeUrl
- def getImgs(
- self,
- pUrl:str
- ):
- listOfImgSrcs = list()
- strContent = self.consumeUrl(pUrl=pUrl)
- if(strContent):
- bs = BeautifulSoup(
- strContent,
- "html5lib" # parser => python -m pip install html5lib
- )
- if(bs):
- theImgs = bs.findAll("img") #<img src="1.jpg" alt="bla bla">
- for img in theImgs:
- if("src" in img.attrs.keys()): # é um dict
- src = img.attrs['src']
- listOfImgSrcs.append(src)
- # if
- # for
- # if
- # if
- return listOfImgSrcs
- # def getImgs
- def getAnchors(
- self,
- pUrl:str
- ):
- listOfFoundAnchors = list() #
- strConteudo = \
- self.consumeUrl(
- pUrl=pUrl
- )
- if (strConteudo):
- bs = BeautifulSoup(
- strConteudo,
- "html5lib"
- )
- if (bs):
- theAs = bs.findAll("a")
- if (theAs):
- for anchor in theAs:
- #texto:str = anchor.title
- texto: str = anchor.text
- bThereIsHref = "href" in anchor.attrs.keys()
- if (bThereIsHref):
- href = anchor.attrs["href"]
- else:
- href = ""
- # if-else
- listOfFoundAnchors.append(
- {
- AmBotTools.KEY_ANCHOR:texto,
- AmBotTools.KEY_HREF:href
- }
- )
- # for every anchor
- # if there are anchors
- # if it was possible to get a bs object
- # if there is content
- return listOfFoundAnchors
- # def getAnchors
- """
- escreva um método que permita filtrar
- uma lista de anchors
- incluindo no retorno apenas aquelas
- cujo atributo href
- contenha certa expressão.
- Por exemplo:
- AmBotTools.getFilteredByHrefAnchors(
- theAnchors, # uma lista
- "4cdn.org" # uma frase de filtro
- )
- """
- @staticmethod
- def getFilteredByHrefAnchors(
- pListOfAnchors:list,
- pStrHrefFilter:str
- ):
- filtered = list()
- for thing in pListOfAnchors:
- #anchor = thing[AmBotTools.KEY_ANCHOR]
- href = thing[AmBotTools.KEY_HREF]
- bSatisfiesFilter =\
- pStrHrefFilter in href
- if(bSatisfiesFilter):
- filtered.append(thing)
- # if
- # for
- return filtered
- # def getFilteredByHrefAnchors
- # use for jpg, png, webm, etc, NOT for text
- def downloadBin(
- self,
- pUrlBin:str,
- pDestinationName:str,
- pHeaders:dict={
- "user-agent":FF78_SIGNATURE,
- "referer":""
- }
- ):
- theBytes =\
- self.consumeUrl(
- pUrl=pUrlBin,
- pbPreferBytes=True,
- pHeaders=pHeaders
- )
- try:
- fw = open(
- pDestinationName,
- "wb" # write binary
- )
- if(fw):
- fw.write(theBytes)
- fw.close()
- return True
- #if
- except Exception as e:
- print(str(e))
- #try-except
- return False
- # def downloadBin
- # class AmBotTools
- *****
- from AmBotTools import AmBotTools
- import json
- class BotFlickr:
- MEMORY = "memory.DB"
- BASE_URL = "https://www.flickr.com/photos"
- FILTER_FOR_IMG_SRC = "//live.staticflickr.com"
- MARK_IMG_DICT_START = "name: 'photo-page-scrappy-view',"
- MARK_IMG_DICT_END = "modelExport:"
- def __init__(self, pUserName):
- self.mUserName = pUserName
- self.mBot = AmBotTools()
- self.mMemory = self.readFromMemory()
- # def __init__
- def getUserUrl(self):
- return f"{BotFlickr.BASE_URL}/{self.mUserName}"
- # def getUserUrl
- def getUserUrlForUserPage(self, pN):
- # https://www.flickr.com/photos/projectapolloarchive/page2
- return f"{BotFlickr.BASE_URL}/{self.mUserName}/page{pN}"
- # def getUserUrlForUserPage
- def getPhotoIdsAtUrl(self, pUrl:str):
- theIds = list()
- theImgSrcs = self.mBot.getImgs(pUrl)
- # filter the images
- # '//live.staticflickr.com/65535/51357978988_cee7be2c65_z.jpg'
- for src in theImgSrcs:
- # example: src='//live.staticflickr.com/65535/51357978988_cee7be2c65_z.jpg'
- bSatisfiesFilter:bool = src.find(BotFlickr.FILTER_FOR_IMG_SRC)!=-1
- """
- "artur".find("z") # -1
- "artur".find("r") # 1
- """
- if(bSatisfiesFilter):
- theImgFileName = src.split("/")[-1] # '51357978988_cee7be2c65_z.jpg'
- theImgId = theImgFileName.split("_")[0]
- theIds.append(theImgId)
- # if
- return theIds
- # def getPhotoIdsAtUrl
- def getPhotoUrlFromPhotoId(self, pPhotoId):
- # https://www.flickr.com/photos/<user name>/<photo id>/
- return self.getUserUrl()+f"/{pPhotoId}/"
- # def getPhotoUrlFromPhotoId
- def getPhotoDictFromPhotoId(self, pPhotoId):
- url = self.getPhotoUrlFromPhotoId(pPhotoId)
- src = self.mBot.consumeUrl(url)
- iWhereDictStarts = src.find(BotFlickr.MARK_IMG_DICT_START)
- if(iWhereDictStarts!=-1):
- iWhereDictStarts+=len(BotFlickr.MARK_IMG_DICT_START)
- srcDict = src[iWhereDictStarts:]
- iWhereDictEnds = srcDict.find(BotFlickr.MARK_IMG_DICT_END)
- if(iWhereDictEnds!=-1):
- # add the starting curly-brace
- srcDict = "{"+ (srcDict[0:iWhereDictEnds]).strip()
- # fix the 1st key
- srcDict = srcDict.replace(
- "params:",
- "\"params\":"
- )
- srcDict:str = (srcDict.split("\n")[0])+"}"
- realDict:dict = json.loads(srcDict)
- return realDict
- # if
- # if
- return False
- # def getPhotoDictFromPhotoId
- def getDescriptionAndOriginalUrlFromPhotoId(self, pPhotoId):
- theDict = self.getPhotoDictFromPhotoId(pPhotoId)
- theDescription = theDict['params']['photoModel']['description']
- o = "https:" + theDict['params']['photoModel']['sizes']['o']['src']
- return theDescription, o
- # def getDescriptionAndOriginalUrlFromPhotoId
- def getDescriptionAndSmallUrlFromPhotoId(self, pPhotoId):
- theDict = self.getPhotoDictFromPhotoId(pPhotoId)
- theDescription = theDict['params']['photoModel']['description']
- sq = "https:" + theDict['params']['photoModel']['sizes']['sq']['src']
- return theDescription, sq
- # def getDescriptionAndSmallUrlFromPhotoId
- def downloadPhotoOriginalIfNotAlreadyDownloaded(self, pPhotoId):
- desc, url = self.getDescriptionAndOriginalUrlFromPhotoId(pPhotoId)
- #https://live.staticflickr.com/65535/51357005762_3f48a89897_o.jpg
- #filename:str = url.split("/")[len(url.split("/"))-1]
- filename: str = url.split("/")[-1]
- bAlreadyDownloaded = self.alreadyDownloaded(url)
- if (not bAlreadyDownloaded):
- bSuccess:bool = self.mBot.downloadBin(
- pUrlBin=url,
- pDestinationName=filename
- )
- return self.writeToMemory(desc, url, bSuccess)
- else:
- return False
- # def downloadPhotoOriginalIfNotAlreadyDownloaded
- def writeToMemory(self, desc, url, bSuccessOnDownload):
- fw = open(BotFlickr.MEMORY, "a")
- if(fw):
- record:str = f"{desc}\t{url}\t{bSuccessOnDownload}\n" # TSV = Tab Separated Values
- fw.write(record)
- fw.close()
- return True
- # if
- return False
- # def writeToMemory
- def alreadyDownloaded(self, url):
- for record in self.mMemory:
- urlInMemory = record["url"]
- if(urlInMemory==url):
- return record["dl"]
- # if
- # for
- return False
- # def alreadyDownloaded
- def readFromMemory(self):
- listOfRecords = list()
- try:
- fr = open(BotFlickr.MEMORY, "r")
- if(fr):
- strContent = fr.read()
- theLines = strContent.split("\n")
- for line in theLines:
- processedLine = line.strip()
- lineParts = processedLine.split("\t")
- bCaution = len(lineParts)==3
- if(bCaution):
- record = dict()
- record["desc"] = lineParts[0]
- record["url"] = lineParts[1]
- record["dl"] = lineParts[2]
- listOfRecords.append(record)
- # if
- # for
- # if
- except Exception as e:
- pass
- return listOfRecords
- # def readFromMemory
- # class BotFlickr
- *******************
- from bot_flickr import BotFlickr
- FLICKR_USER = "projectapolloarchive"
- whatToSearchFor = input("What do you want to search for?")
- whatToSearchFor = whatToSearchFor.lower() # para procura case-insensitive
- bot = BotFlickr(FLICKR_USER)
- listOfFindings = list()
- for record in bot.mMemory:
- desc = record["desc"]
- desc = desc.lower() # para procura case-insensitive
- bFound = desc.find(whatToSearchFor)!=-1
- if (bFound):
- listOfFindings.append(record)
- # if
- # for
- if (len(listOfFindings)==0):
- print ("Nothing found.")
- else:
- print(listOfFindings)
- ****************************
- from bot_flickr import BotFlickr
- TEST_USER = "projectapolloarchive"
- bot = BotFlickr(TEST_USER)
- url = bot.getUserUrl()
- print(url)
- #print (bot.getUserUrlForUserPage(159))
- #theSrcs = bot.mBot.getImgs(url)
- #print(theSrcs)
- theIds = bot.getPhotoIdsAtUrl(url)
- print(theIds)
- urlFor1stPhoto = bot.getPhotoUrlFromPhotoId(theIds[0])
- print(urlFor1stPhoto)
- #print(bot.getPhotoDictFromPhotoId(theIds[0]))
- """
- desc, urlOriginal = bot.getDescriptionAndOriginalUrlFromPhotoId(theIds[2])
- print(desc)
- print(urlOriginal)
- """
- """
- TEST_URL = "https://live.staticflickr.com/65535/51358764550_c5ae627240_o.jpg"
- respostaSeJaDescarregado = bot.alreadyDownloaded(TEST_URL)
- print (f"Já descarregado? {respostaSeJaDescarregado}")
- exit()
- """
- r = bot.downloadPhotoOriginalIfNotAlreadyDownloaded(theIds[3])
- print(r)
Advertisement
Add Comment
Please, Sign In to add comment