Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from twitter.scraper import Scraper
- email, username, password = EMAIL, USERNAME, PASSWORD
- scraper = Scraper(email, username, password)
- from twitter.util import find_key
- from httpx import AsyncClient
- import asyncio
- import aiofiles
- from tqdm.asyncio import tqdm_asyncio
- from urllib.parse import urlsplit
- from pathlib import Path
- import requests
- def already_downloadeds(username):
- test_path = Path('media') / username
- downloaded_ids = set()
- for test_item in test_path.iterdir():
- result = re.match('i_status_(\d+)_.*\.(jpg|png)', test_item.name)
- downloaded_ids.add(result.groups()[0])
- return downloaded_ids
- def download_media(tweets, username, after_id=None):
- urls = []
- out = Path('media') / username
- out.mkdir(exist_ok = True)
- downloaded_ids = already_downloadeds(username)
- if not after_id:
- response = requests.get(f'http://{tracking_server_ip}:8080/twitter-user?username={username}')
- after_id = response.json()['lastTweetDownloaded']
- for tweet in tweets:
- tweet_id = find_key(tweet, 'id_str')[0]
- if not tweet_id in downloaded_ids and int(tweet_id) > after_id:
- url = f'https://twitter.com/i/status/{tweet_id}'
- media = [y for x in find_key(tweet, 'media') for y in x]
- photo_urls = list({u for m in media if 'ext_tw_video_thumb' not in (u := m['media_url_https'])})
- [urls.append([url, photo]) for photo in photo_urls]
- downloaded_ids.add(tweet_id)
- chunk_size = 8192
- print(f"Downloading {len(urls)} items")
- async def process():
- async with AsyncClient(headers=scraper.session.headers, cookies=scraper.session.cookies) as client:
- tasks = (download(client, x, y) for x, y in urls)
- if scraper.pbar:
- return await tqdm_asyncio.gather(*tasks, desc='Downloading media')
- return await asyncio.gather(*tasks)
- async def download(client: AsyncClient, post_url: str, cdn_url: str) -> None:
- name = urlsplit(post_url).path.replace('/', '_')[1:]
- ext = urlsplit(cdn_url).path.split('/')[-1]
- try:
- r = await client.get(cdn_url)
- async with aiofiles.open(out / f'{name}_{ext}', 'wb') as fp:
- for chunk in r.iter_bytes(chunk_size=chunk_size):
- await fp.write(chunk)
- except Exception as e:
- scraper.logger.error(f'[{RED}error{RESET}] Failed to download media: {post_url} {e}')
- asyncio.run(process())
- latest_downloaded = max([int(download_id) for download_id in downloaded_ids])
- print(f"Latest downloaded is {latest_downloaded}")
- requests.post(f'http://{tracking_server_ip}:8080/twitter-user-done', json={
- 'username': username,
- 'lastId': latest_downloaded
- })
- def entries_of_media(media_result):
- entries = media_result['data']['user']['result']['timeline_v2']['timeline']['instructions'][0]['entries']
- return [entry for entry in entries if entry['content']['entryType'] == 'TimelineTimelineItem']
- def user_entries(username, after_id = None):
- users = scraper.users([username])
- user_id = users[0]['data']['user']['result']['rest_id']
- media_results = scraper.media([user_id])
- all_entries = [entry for media_result in media_results for entry in entries_of_media(media_result)]
- return all_entries
- from fastapi import FastAPI
- from pydantic import BaseModel
- class UserModel(BaseModel):
- username: str
- app = FastAPI()
- @app.post("/download")
- async def download_user(json: UserModel):
- username = json.username
- media_entries = user_entries(username)
- download_media(media_entries, username)
- return f'{username} downloaded'
- import asyncio
- import uvicorn
- config = uvicorn.Config(app)
- server = uvicorn.Server(config)
- await server.serve()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement