Advertisement
ajr-dev

Rotating pool of proxies with delay

Nov 30th, 2020 (edited)
214
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 8.26 KB | None | 0 0
  1. import os
  2. import io
  3. import math
  4. import os.path
  5. import sys
  6. import glob
  7. import asyncio
  8. import requests
  9. import urllib.request
  10. import logging as log
  11. import pandas as pd
  12.  
  13. from time import sleep
  14. from random import randrange
  15. from datetime import datetime
  16. from typing import List, Dict, Tuple
  17. from pathlib import Path
  18. from proxybroker import Broker
  19. from itertools import cycle
  20.  
  21. log.basicConfig(
  22.     level=log.DEBUG,
  23.     format=('%(asctime)s.%(msecs)03d:'
  24.             '%(levelname)s:'
  25.             '%(filename)s - %(module)s - %(funcName)s:\t'
  26.             '%(message)s'),
  27.     datefmt='%Y-%m-%d %H:%M:%S',
  28.     handlers=[
  29.         log.FileHandler("debug.log"),
  30.         log.StreamHandler()
  31.     ]
  32. )
  33.  
  34. apikeys = ['XXXXXXXXXXXXXXXX', 'XXXXXXXXXXXXXXXX', 'XXXXXXXXXXXXXXXX']
  35. num = len(apikeys)  # number of apikeys
  36. delay = 60 / num + 0.001 # the time to wait between API calls depends on the number of apikeys
  37. api_idx = None  # index to select the next apikey
  38. proxy_dicts = []
  39. proxy_idx = 0  # index to select the next proxy
  40.  
  41. BASE_URL = 'https://www.alphavantage.co/'
  42. # To download the data in a subdirectory where the script is located
  43. modpath = os.path.dirname(os.path.abspath(sys.argv[0]))
  44.  
  45.  
  46. def getProxies(n: int) -> List[str]:
  47.     '''Return a list of n working proxies
  48.    '''
  49.     async def show(proxies):
  50.         p = []
  51.         while True:
  52.                 proxy = await proxies.get()
  53.                 if proxy is None: break
  54.                 p.append(f'{proxy.host}:{proxy.port}')
  55.         return p
  56.  
  57.     proxies = asyncio.Queue()
  58.     broker = Broker(proxies)
  59.     tasks = asyncio.gather(broker.find(types=['HTTPS'], limit=n), show(proxies))
  60.     loop = asyncio.get_event_loop()
  61.     return loop.run_until_complete(tasks)[1]
  62.  
  63.  
  64. def list_to_list_of_dicts(lst: List, keyName) -> List[Dict]:
  65.     '''Convert a list to a list of dictionaries'''
  66.     return [{keyName: lst[i]} for i in range(len(lst))]
  67.  
  68.  
  69. def get_idx_list_dicts(list_dicts: List[Dict], key, value) -> int:
  70.     '''Return the index of the dictionary with the {key: value} pair'''
  71.     return next((i for i, dct in enumerate(list_dicts) if dct[key] == value), None)
  72.  
  73.  
  74. def get_dict(list_dicts: List[Dict], key, value) -> Dict:
  75.     '''Return the dictionary with the {key: value} pair from the list'''
  76.     return next((item for i, item in enumerate(list_dicts) if item[key] == value), None)
  77.  
  78.  
  79. def append_dict(list_dicts: List[Dict], key, values: List):
  80.     '''Append dictionaries that don't already exist
  81.    to a list of dictionaries
  82.    '''
  83.     for value in values:
  84.         idx = get_idx_list_dicts(list_dicts, key, value)
  85.         if idx is None:
  86.             list_dicts.append({key: value})
  87.  
  88.  
  89. def get_proxy_without_delay() -> str:
  90.     global proxy_dicts, proxy_idx, delay
  91.  
  92.     while True:
  93.         proxy_dict = proxy_dicts[proxy_idx]
  94.         # Keep the proxy if it hasn't been used
  95.         # or the proper amount of time has passed since its use
  96.         if 'burned' not in proxy_dict.keys() and \
  97.                 ('dt' not in proxy_dict.keys() or \
  98.                 (datetime.now()-proxy_dict['dt']).total_seconds() > delay):
  99.             proxy = proxy_dict['proxy']
  100.             break
  101.         proxy_idx = (proxy_idx + 1) % len(proxy_dicts)
  102.         sleep(1)
  103.     return proxy
  104.  
  105.  
  106. def update_last_proxy_use():
  107.     global proxy_dicts, proxy_idx
  108.     proxy_dicts[proxy_idx]['dt'] = datetime.now() # update the time the proxy was used
  109.     proxy_idx = (proxy_idx+1) % len(proxy_dicts)
  110.  
  111.  
  112. def download_with_proxy_wait(url: str) -> pd.DataFrame:
  113.     global proxy_dicts, proxy_idx, num
  114.  
  115.     while True:
  116.         if len(proxy_dicts) < num:
  117.             append_dict(proxy_dicts, 'proxy', getProxies(num*2))
  118.  
  119.         proxy = get_proxy_without_delay()
  120.  
  121.         try:
  122.             response = requests.get(url, proxies={'http': proxy, 'https': proxy}, timeout=1)
  123.             update_last_proxy_use()
  124.  
  125.             df = pd.read_csv(io.StringIO(response.content.decode('utf-8'))).iloc[::-1]
  126.             if len(df.columns) != 6:
  127.                 log.info('API limit hit:')
  128.                 log.info(df)
  129.                 log.info(f'API key: {apikeys[api_idx]}')
  130.                 log.info(f'Proxy: {proxy_dicts[proxy_idx]}')
  131.             elif response.status_code == 200:
  132.                 break
  133.         except (requests.Timeout, requests.exceptions.ProxyError) as err:
  134.             log.warning(f'{err}', stack_info=True)
  135.             proxy_dicts[proxy_idx]['burned'] = True
  136.  
  137.     return df
  138.  
  139.  
  140. def get_apikey() -> str:
  141.     '''Get an apikey ready to be used
  142.    '''
  143.     global apikeys, api_idx, delay
  144.  
  145.     if api_idx is None:
  146.         api_dicts = []  # list of dictionaries to save the last time an apikey was used
  147.         append_dict(api_dicts, 'apikey', apikeys)
  148.         apikeys = api_dicts
  149.  
  150.     api_idx = randrange(len(apikeys))
  151.     api_dict = apikeys[api_idx]
  152.  
  153.     # Wait until the apikey can be used
  154.     if 'dt' in api_dict.keys():
  155.         secs = (datetime.now()-api_dict['dt']).total_seconds()
  156.         if delay > secs:
  157.             sleep(delay - secs)
  158.  
  159.     return api_dict['apikey']
  160.  
  161.  
  162. def update_apikey_last_used():
  163.     global apikeys, api_idx
  164.  
  165.     apikeys[api_idx]['dt'] = datetime.now()
  166.     api_idx = (api_idx+1) % len(apikeys)
  167.  
  168.  
  169. def download_previous_data(
  170.     file: str,
  171.     ticker: str,
  172.     timeframe: str,
  173.     _slice: str,
  174. ):
  175.     global apikeys, api_idx, proxy_dicts, proxy_idx
  176.     apikey = get_apikey()
  177.     url = f'{BASE_URL}query?function=TIME_SERIES_INTRADAY_EXTENDED&symbol={ticker}&interval={timeframe}&slice={_slice}&apikey={apikey}&datatype=csv'
  178.     log.info(f'Downloading {_slice} of {timeframe} for {ticker}...')
  179.     try:
  180.         df = download_with_proxy_wait(url).content
  181.  
  182.         if os.path.exists(file):
  183.             pd.read_csv(file).append(df).drop_duplicates(inplace=True).to_csv(file, encoding='utf-8-sig')
  184.         else:
  185.             df.to_csv(file, mode='w', encoding='utf-8-sig')
  186.     except:
  187.         log.info(f"Couldn't download data for {ticker}")
  188.     update_apikey_last_used()
  189.  
  190.  
  191. def get_tickers(filepath) -> List[str]:
  192.     '''Get a list of all ticker symbols
  193.    '''
  194.     tickers = []
  195.     if not os.path.exists(filepath):
  196.         url = 'https://www.alphavantage.co/query?function=LISTING_STATUS&apikey=XXXXXXXXXXXXXXXX'
  197.         log.info('Downloading ticker symbols:')
  198.         urllib.request.urlretrieve(url, filepath)
  199.         sleep(173)
  200.  
  201.     df = pd.read_csv(filepath)
  202.     tickers = df.loc[df['exchange'] == 'NYSE'] \
  203.                 .loc[df['assetType'] == 'Stock']['symbol'].tolist()
  204.  
  205.     return df, tickers
  206.  
  207.  
  208. def create_download_folders(timeframes: List[str]):
  209.     for timeframe in timeframes:
  210.         download_path = f'{modpath}/{timeframe}'
  211.         Path(download_path).mkdir(parents=True, exist_ok=True)
  212.  
  213.  
  214. def use_stocks_from_file(filepath: str) -> List[str]:
  215.     filepath = f'{modpath}/{filepath}'
  216.     with open(filepath) as f:
  217.         tickers = f.read().replace('\n', '').split(',')
  218.     return tickers
  219.  
  220.  
  221. def get_data():
  222.     filepath = f'{modpath}/stocks_alphavantage.csv'
  223.     df, tickers = get_tickers(filepath)
  224.     timeframes = ['1min', '5min', '15min', '30min', '60min']
  225.  
  226.     tickers = use_stocks_from_file('my_traded_stocks.txt')
  227.     create_download_folders(timeframes)
  228.  
  229.     slices = ['year2month12', 'year2month11', 'year2month10',
  230.                 'year2month9', 'year2month8', 'year2month7',
  231.                 'year2month6', 'year2month5', 'year2month4',
  232.                 'year2month3', 'year2month2', 'year2month1',
  233.                 'year1month12', 'year1month11', 'year1month10',
  234.                 'year1month9', 'year1month8', 'year1month7',
  235.                 'year1month6', 'year1month5', 'year1month4',
  236.                 'year1month3', 'year1month2', 'year1month1']
  237.  
  238.     for _slice in slices:
  239.         for ticker in tickers:
  240.             if ticker not in df.values:
  241.                 log.info(f'{ticker} not available. Skiping...')
  242.                 continue
  243.             name = df.loc[df['symbol'] == ticker, 'name'].iat[0]
  244.             log.info(f'Downloading data for {ticker}: {name}...')
  245.             for timeframe in timeframes:
  246.                 download_path = f'{modpath}/{timeframe}'
  247.                 filepath = f'{download_path}/{ticker}.csv'
  248.  
  249.                 download_previous_data(filepath, ticker, timeframe, _slice)
  250.  
  251.  
  252. def main():
  253.     get_data()
  254.  
  255.  
  256. if __name__ == '__main__':
  257.     main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement