Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding:UTF-8 -*-
- # based on https://github.com/yxw19870806/yxw19870806/blob/master/googlePlus/googlePlus.py
- # license: WTFPL
- import argparse
- import wget
- import sys
- import re
- import zlib
- import os
- import string
- import json
- import requests
- import slugify
- from multiprocessing.pool import ThreadPool
- URL = {
- 'archive': 'https://get.google.com/albumarchive/{}',
- 'album': 'https://get.google.com/albumarchive/{}/album/{}',
- 'data': 'https://get.google.com/_/AlbumArchiveUi/data',
- 'post_photos': "https://get.google.com/albumarchive/{}/albums/photos-from-posts"
- }
- def find_between(input, first_string, last_string):
- try:
- start = input.rindex( first_string ) + len( first_string )
- end = input.rindex( last_string, start ) if last_string else None
- return input[start:end]
- except ValueError:
- return ""
- def check_sub_key(needles, haystack):
- if not isinstance(needles, tuple):
- needles = tuple(needles)
- if isinstance(haystack, dict):
- for needle in needles:
- if needle not in haystack:
- return False
- return True
- return False
- def parse_url(url):
- pattern = 'https://get.google.com/albumarchive/([^/]*)(/album/([^/#]*)|)'
- match = re.findall(pattern, url)
- if match:
- match = match[0]
- user_id = match[0]
- if match[2]:
- album_id = match[2]
- else:
- album_id = None
- else:
- user_id = None
- album_id = None
- return user_id, album_id
- def get_list_of_albums(user_id):
- pattern = '<h2\s*class=\"?[^\">]*\"\s*jsname=\"?[^\">]*\"\s*aria-label=\"?([^\">]*)\">\s*</h2>\s*<a\s*href=\"?([^\">]*)\"\s*class=\"?[^\">]*\">'
- url = URL['archive'].format(user_id)
- albums_list = {}
- matches = re.findall(pattern, requests.get(url).text)
- for match in matches:
- if '/album/' in match[1]: # too lazy to parse posts and profile albums
- albums_list[match[1].split('/album/')[1]] = match[0]
- return albums_list
- def get_list_for_album(user_id, album_id):
- album_page_url = URL['album'].format(user_id, album_id)
- image_url_list = []
- album_page_response = requests.get(album_page_url)
- if album_page_response.status_code == 200:
- output = album_page_response.text
- album_title = re.findall('<div\s*class=\"?[^\"]*\">?([^<]*)<\/div><div\s*class=\"?[^\"]*\"><span\s*class=\"?[^\"]*\">?[^\<]*<\/span>', output)
- script_data = find_between(output, "AF_initDataCallback({key: 'ds:0'", "</script>")
- script_data = find_between(script_data, "return ", "}});")
- try:
- script_data = json.loads(script_data)
- user_key = script_data[4][0]
- continue_token = script_data[3]
- for data in script_data[4][1]:
- image_url_list.append(str(data[1]))
- except ValueError:
- return image_url_list, album_title
- except IndexError:
- return image_url_list, album_title
- else:
- while continue_token:
- continue_image_page_url = URL['data']
- post_data = {"f.req": '[[[113305010,[{"113305010":["%s",null,24,"%s"]}],null,null,0]]]' % (user_key, continue_token)}
- continue_image_page_response = requests.post(continue_image_page_url, data=post_data)
- if continue_image_page_response.status_code == 200:
- continue_data = find_between(continue_image_page_response.text, ")]}'", None).strip()
- try:
- continue_data = json.loads(continue_data)
- continue_token = continue_data[0][2]["113305010"][3]
- for data in continue_data[0][2]["113305010"][4][1]:
- image_url_list.append(str(data[1]))
- except ValueError:
- image_url_list = []
- continue_token = ""
- return image_url_list, album_title
- def get_photos_from_posts(user_id):
- post_photos_url = URL['post_photos'].format(user_id)
- extra_info = {
- "is_error": True,
- "blog_info_list": [],
- "key": None,
- "json_data": None,
- }
- post_photos_response = requests.get(post_photos_url)
- if post_photos_response.status_code == 200:
- script_data = find_between(post_photos_response.text, "AF_initDataCallback({key: 'ds:0'", "</script>")
- script_data = find_between(script_data, "return ", "}});")
- try:
- script_data = json.loads(script_data)
- except ValueError:
- script_data = []
- else:
- extra_info["json_data"] = script_data
- if len(script_data) == 3:
- if script_data[1] is not None:
- extra_info["is_error"] = False
- for data in script_data[1]:
- extra_blog_info = {
- "blog_id": None,
- "blog_time": None,
- "json_data": data,
- }
- if len(data) >= 2 and check_sub_key(("113305016",), data[1]) and len(data[1]["113305016"]) == 1 and len(data[1]["113305016"][0]) >= 5:
- extra_blog_info["blog_id"] = str(data[1]["113305016"][0][0])
- if isinstance(data[1]["113305016"][0][4], int):
- extra_blog_info["blog_time"] = int(data[1]["113305016"][0][4] / 1000)
- extra_info["blog_info_list"].append(extra_blog_info)
- extra_info["key"] = str(script_data[2])
- return extra_info
- def download_file(params):
- url, silent, dir = params
- try:
- filename = wget.download(url, out=dir, bar=None)
- if not silent:
- print(filename)
- except: # TODO
- print('ERROR {}'.format(url))
- def download_pool(list_of_files, dir, threads=8, silent=False):
- pool = ThreadPool(processes=threads)
- pool.map(download_file, zip( # TODO rewrite
- list_of_files,
- (silent for i in range(len(list_of_files))),
- (dir for i in range(len(list_of_files))))
- )
- if __name__ == "__main__":
- parser = argparse.ArgumentParser(description="Google archive - download images from album.")
- parser.add_argument("-u", "--url", help="album url", nargs='+', default=[])
- parser.add_argument("-i", "--input-file", help="file with urls separated by new lines", default=None)
- parser.add_argument("-t", "--threads", default=8, help="number of download threads (def. 2)", type=int)
- parser.add_argument("-s", "--silent", action="store_true", help="no output to stdout")
- parser.add_argument("--assume-yes", action="store_true", help="assume yes for redownloading albums")
- parser.add_argument("--assume-no", action="store_true", help="assume no for redownloading albums")
- args = parser.parse_args()
- if args.input_file:
- with open(args.input_file) as file:
- for line in file:
- args.url.append(line.strip())
- for album_url in args.url:
- user_id, album_id = parse_url(album_url)
- if not album_id:
- print('auto albums download not yet supported!')
- continue
- print('Gathering started')
- image_url_list, album_title = get_list_for_album(user_id, album_id)
- album_name = slugify.slugify(album_title[0], separator='_')
- album_dir = user_id + '-' + str(hex(zlib.crc32(bytes(album_id, 'ascii'))))[2:] + '-' + album_name
- if os.path.exists(album_dir):
- print('{} already exists!'.format(album_dir))
- if not args.assume_yes or not args.assume_no:
- user_input = input('Should I process anyway? [y/n]: ')
- elif args.assume_no:
- continue
- elif args.assume_yes:
- user_input = 'y'
- if user_input in ['y', 'yes']:
- download_pool(image_url_list, album_dir, args.threads, args.silent)
- else:
- continue
- else:
- print('{} - download started'.format(album_dir))
- os.makedirs(album_dir)
- download_pool(image_url_list, album_dir, args.threads, args.silent)
- print('done.')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement