Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- """
- instaripper.py
- usage instaripper.py [-h] instagram directory
- @tobe81cwb
- """
- import argparse
- import json
- import logging
- import threading
- try:
- import queue
- except ImportError:
- import Queue as queue
- try:
- from urllib import parse
- from urllib.request import urlopen, urlretrieve
- from urllib.error import HTTPError
- from urllib.parse import urlparse
- except ImportError:
- import urlparse as parse
- from urllib import urlretrieve
- from urllib2 import urlopen
- from urllib2 import HTTPError
- from urlparse import urlparse
- import os
- import re
- logger = logging.getLogger(__name__)
- class InstaRipper(object):
- user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/' \
- '537.36 (KHTML, like Gecko) Chrome/' \
- '51.0.2704.103 Safari/537.36'
- profile_info = None
- def __init__(self, username, directory):
- self.username = username
- self.directory = directory
- # capture profile information of the instagram
- self.capture_profile_info()
- # download queue
- self.q = queue.Queue()
- # downloaded media (for progress)
- self.downloaded = 0
- def check_dir(self):
- """
- Check if directory exist, and creates if necessary
- """
- if not os.path.isdir(self.directory):
- os.makedirs(self.directory)
- def get_profile_url(self):
- """
- Get the full URL of instagram
- """
- return parse.urljoin('https://instagram.com', self.username)
- def capture_shared_data(self, cursor=None):
- """
- Capture the shared data (json object) from instagram page
- :param cursor: Instagram cursor / page
- :return: Shared data as object
- """
- url = self.get_profile_url() if not cursor else self.get_profile_url() + '/?max_id={0}'.format(cursor)
- logger.debug(u'Capturing shared data of {0}'.format(url))
- try:
- if not cursor:
- url = self.get_profile_url()
- # compatible with python 2 and 3
- response = urlopen(url)
- html = response.read()
- response.close()
- except HTTPError:
- raise InstaRipperProfileNotFoundError
- p = re.compile(r'window\._sharedData\s?=\s?({.*});')
- shared_data = re.search(p, html.decode('utf-8')).group(1)
- return json.loads(shared_data)
- def capture_profile_info(self):
- """
- Capture the profile information of instagram account
- :return: Object with Instagram Profile
- """
- logger.info(u'Capturing profile info')
- self.profile_info = self.capture_shared_data()['entry_data']['ProfilePage'][0]['user']
- def get_media_info(self, media_code):
- """
- Get JSON from media
- :param media_code: Code of the instagram media
- :return: Object with media info
- """
- url = 'https://www.instagram.com/p/{code}/?taken-by={username}&__a=1'.format(
- code=media_code, username=self.username)
- # compatible with python 2 and 3
- response = urlopen(url)
- html = response.read()
- response.close()
- return json.loads(html.decode('utf-8'))
- def download_media(self, media_code, overwrite=False):
- """
- Download a photo or video from instagram
- :param media_code: Instagram media code
- :param overwrite: Should overwrite local files?
- """
- info = self.get_media_info(media_code)
- url = info['media']['display_src'] if not info['media']['is_video'] else info['media']['video_url']
- filename = os.path.join(
- self.directory,
- os.path.basename(urlparse(url).path)
- )
- if overwrite or not os.path.isfile(filename):
- # compatible with python 2 and 3
- urlretrieve(url, filename)
- def capture_media_codes(self, cursor=None):
- """
- Capture all media codes from instagram page
- :param cursor: Instagram cursor / page
- :return: List of media codes
- """
- logger.debug(u'Capturing page on {0} position'.format(cursor))
- media_info = self.capture_shared_data(cursor)['entry_data']['ProfilePage'][0]['user']['media']
- nodes = media_info['nodes']
- page_info = media_info['page_info']
- media_code = map(lambda node: node['code'], nodes)
- next_cursor = page_info['end_cursor'] if page_info['has_next_page'] else None
- return media_code, next_cursor
- def get_full_name(self):
- """
- Get full name of instagram user
- :return: Full name of instagram user
- """
- return self.profile_info['full_name']
- def get_posts_count(self):
- """
- Get total posts of instagram user
- :return: Total posts
- """
- return self.profile_info['media']['count']
- def download_thread_producer(self, download_threads=10, overwrite=False):
- """
- Initiate the download threads
- :param download_threads: Number of download threads
- :param overwrite: Should overwrite local files?
- """
- logger.info(u'Capturing media files')
- logger.info(u'User {user} ({name}) has {total_posts} posts on Instagram.'
- .format(user=self.username,
- name=self.get_full_name(),
- total_posts=self.get_posts_count()))
- # create the threads
- threads = []
- for i in range(download_threads):
- t = threading.Thread(target=self.download_thread_consumer, args=(overwrite,))
- t.start()
- threads.append(t)
- # get media codes and put on queue
- cursor = None
- while True:
- page_medias = self.capture_media_codes(cursor)
- if page_medias[0]:
- for media in page_medias[0]:
- self.q.put(media)
- cursor = page_medias[1]
- if not cursor:
- break
- # wait for all threads to finish
- self.q.join()
- for i in range(download_threads):
- self.q.put(None)
- for t in threads:
- t.join()
- def download_thread_consumer(self, overwrite=False):
- """
- Work on download queue
- :param overwrite: Should overwrite local files?
- """
- while True:
- media_code = self.q.get()
- if not media_code:
- break
- logger.debug(u'Downloading media {0}'.format(media_code))
- self.download_media(media_code, overwrite)
- self.downloaded += 1
- logger.info(u'Downloaded media {0}/{1}'.format(
- self.downloaded, self.get_posts_count()
- ))
- self.q.task_done()
- def download_all_medias(self, download_threads=10, overwrite=False):
- """
- Download all media files from instagram account
- :param download_threads: Number of download threads
- :param overwrite: Should overwrite local files?
- """
- if self.profile_info['blocked_by_viewer'] or self.profile_info['country_block']:
- raise InstaRipperBlockedAccountError
- elif self.profile_info['is_private']:
- raise InstaRipperPrivateAccountError
- else:
- if not os.path.isdir(self.directory):
- os.makedirs(self.directory)
- self.download_thread_producer(download_threads, overwrite)
- class InstaRipperProfileNotFoundError(Exception):
- """Raised if the profile is a invalid profile"""
- class InstaRipperBlockedAccountError(Exception):
- """Raised if the profile account is blocked for user"""
- class InstaRipperPrivateAccountError(Exception):
- """Raised if the profile is private"""
- def init_logger():
- global logger
- logger.setLevel(logging.INFO)
- formatter = logging.Formatter('[%(asctime)s][%(levelname)s] - %(message)s')
- # console logger
- ch = logging.StreamHandler()
- ch.setFormatter(formatter)
- logger.addHandler(ch)
- def main():
- # parse arguments
- parser = argparse.ArgumentParser(description='InstaRipper')
- parser.add_argument('username', help='Instagram username')
- parser.add_argument('directory', help='Where to save the images')
- parser.add_argument('-t', '--threads', type=int, default=10, help='Download threads')
- parser.add_argument('--overwrite', action='store_true', help='Overwrite existing files')
- args = parser.parse_args()
- init_logger()
- try:
- ripper = InstaRipper(args.username, args.directory)
- ripper.download_all_medias(download_threads=args.threads, overwrite=args.overwrite)
- except InstaRipperProfileNotFoundError:
- logger.error(u'Invalid profile. Check the profile name and try again.')
- except InstaRipperPrivateAccountError:
- logger.error(u'This account is private.')
- except InstaRipperBlockedAccountError:
- logger.error(u'This account is blocked for you.')
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement