Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os
- import io
- import math
- import os.path
- import sys
- import glob
- import asyncio
- import requests
- import urllib.request
- import logging as log
- import pandas as pd
- from time import sleep
- from random import randrange
- from datetime import datetime
- from typing import List, Dict, Tuple
- from pathlib import Path
- from proxybroker import Broker
- from itertools import cycle
- log.basicConfig(
- level=log.DEBUG,
- format=('%(asctime)s.%(msecs)03d:'
- '%(levelname)s:'
- '%(filename)s - %(module)s - %(funcName)s:\t'
- '%(message)s'),
- datefmt='%Y-%m-%d %H:%M:%S',
- handlers=[
- log.FileHandler("debug.log"),
- log.StreamHandler()
- ]
- )
- apikeys = ['XXXXXXXXXXXXXXXX', 'XXXXXXXXXXXXXXXX', 'XXXXXXXXXXXXXXXX']
- num = len(apikeys) # number of apikeys
- delay = 60 / num + 0.001 # the time to wait between API calls depends on the number of apikeys
- api_idx = None # index to select the next apikey
- proxy_dicts = []
- proxy_idx = 0 # index to select the next proxy
- BASE_URL = 'https://www.alphavantage.co/'
- # To download the data in a subdirectory where the script is located
- modpath = os.path.dirname(os.path.abspath(sys.argv[0]))
- def getProxies(n: int) -> List[str]:
- '''Return a list of n working proxies
- '''
- async def show(proxies):
- p = []
- while True:
- proxy = await proxies.get()
- if proxy is None: break
- p.append(f'{proxy.host}:{proxy.port}')
- return p
- proxies = asyncio.Queue()
- broker = Broker(proxies)
- tasks = asyncio.gather(broker.find(types=['HTTPS'], limit=n), show(proxies))
- loop = asyncio.get_event_loop()
- return loop.run_until_complete(tasks)[1]
- def list_to_list_of_dicts(lst: List, keyName) -> List[Dict]:
- '''Convert a list to a list of dictionaries'''
- return [{keyName: lst[i]} for i in range(len(lst))]
- def get_idx_list_dicts(list_dicts: List[Dict], key, value) -> int:
- '''Return the index of the dictionary with the {key: value} pair'''
- return next((i for i, dct in enumerate(list_dicts) if dct[key] == value), None)
- def get_dict(list_dicts: List[Dict], key, value) -> Dict:
- '''Return the dictionary with the {key: value} pair from the list'''
- return next((item for i, item in enumerate(list_dicts) if item[key] == value), None)
- def append_dict(list_dicts: List[Dict], key, values: List):
- '''Append dictionaries that don't already exist
- to a list of dictionaries
- '''
- for value in values:
- idx = get_idx_list_dicts(list_dicts, key, value)
- if idx is None:
- list_dicts.append({key: value})
- def get_proxy_without_delay() -> str:
- global proxy_dicts, proxy_idx, delay
- while True:
- proxy_dict = proxy_dicts[proxy_idx]
- # Keep the proxy if it hasn't been used
- # or the proper amount of time has passed since its use
- if 'burned' not in proxy_dict.keys() and \
- ('dt' not in proxy_dict.keys() or \
- (datetime.now()-proxy_dict['dt']).total_seconds() > delay):
- proxy = proxy_dict['proxy']
- break
- proxy_idx = (proxy_idx + 1) % len(proxy_dicts)
- sleep(1)
- return proxy
- def update_last_proxy_use():
- global proxy_dicts, proxy_idx
- proxy_dicts[proxy_idx]['dt'] = datetime.now() # update the time the proxy was used
- proxy_idx = (proxy_idx+1) % len(proxy_dicts)
- def download_with_proxy_wait(url: str) -> pd.DataFrame:
- global proxy_dicts, proxy_idx, num
- while True:
- if len(proxy_dicts) < num:
- append_dict(proxy_dicts, 'proxy', getProxies(num*2))
- proxy = get_proxy_without_delay()
- try:
- response = requests.get(url, proxies={'http': proxy, 'https': proxy}, timeout=1)
- update_last_proxy_use()
- df = pd.read_csv(io.StringIO(response.content.decode('utf-8'))).iloc[::-1]
- if len(df.columns) != 6:
- log.info('API limit hit:')
- log.info(df)
- log.info(f'API key: {apikeys[api_idx]}')
- log.info(f'Proxy: {proxy_dicts[proxy_idx]}')
- elif response.status_code == 200:
- break
- except (requests.Timeout, requests.exceptions.ProxyError) as err:
- log.warning(f'{err}', stack_info=True)
- proxy_dicts[proxy_idx]['burned'] = True
- return df
- def get_apikey() -> str:
- '''Get an apikey ready to be used
- '''
- global apikeys, api_idx, delay
- if api_idx is None:
- api_dicts = [] # list of dictionaries to save the last time an apikey was used
- append_dict(api_dicts, 'apikey', apikeys)
- apikeys = api_dicts
- api_idx = randrange(len(apikeys))
- api_dict = apikeys[api_idx]
- # Wait until the apikey can be used
- if 'dt' in api_dict.keys():
- secs = (datetime.now()-api_dict['dt']).total_seconds()
- if delay > secs:
- sleep(delay - secs)
- return api_dict['apikey']
- def update_apikey_last_used():
- global apikeys, api_idx
- apikeys[api_idx]['dt'] = datetime.now()
- api_idx = (api_idx+1) % len(apikeys)
- def download_previous_data(
- file: str,
- ticker: str,
- timeframe: str,
- _slice: str,
- ):
- global apikeys, api_idx, proxy_dicts, proxy_idx
- apikey = get_apikey()
- url = f'{BASE_URL}query?function=TIME_SERIES_INTRADAY_EXTENDED&symbol={ticker}&interval={timeframe}&slice={_slice}&apikey={apikey}&datatype=csv'
- log.info(f'Downloading {_slice} of {timeframe} for {ticker}...')
- try:
- df = download_with_proxy_wait(url).content
- if os.path.exists(file):
- pd.read_csv(file).append(df).drop_duplicates(inplace=True).to_csv(file, encoding='utf-8-sig')
- else:
- df.to_csv(file, mode='w', encoding='utf-8-sig')
- except:
- log.info(f"Couldn't download data for {ticker}")
- update_apikey_last_used()
- def get_tickers(filepath) -> List[str]:
- '''Get a list of all ticker symbols
- '''
- tickers = []
- if not os.path.exists(filepath):
- url = 'https://www.alphavantage.co/query?function=LISTING_STATUS&apikey=XXXXXXXXXXXXXXXX'
- log.info('Downloading ticker symbols:')
- urllib.request.urlretrieve(url, filepath)
- sleep(173)
- df = pd.read_csv(filepath)
- tickers = df.loc[df['exchange'] == 'NYSE'] \
- .loc[df['assetType'] == 'Stock']['symbol'].tolist()
- return df, tickers
- def create_download_folders(timeframes: List[str]):
- for timeframe in timeframes:
- download_path = f'{modpath}/{timeframe}'
- Path(download_path).mkdir(parents=True, exist_ok=True)
- def use_stocks_from_file(filepath: str) -> List[str]:
- filepath = f'{modpath}/{filepath}'
- with open(filepath) as f:
- tickers = f.read().replace('\n', '').split(',')
- return tickers
- def get_data():
- filepath = f'{modpath}/stocks_alphavantage.csv'
- df, tickers = get_tickers(filepath)
- timeframes = ['1min', '5min', '15min', '30min', '60min']
- tickers = use_stocks_from_file('my_traded_stocks.txt')
- create_download_folders(timeframes)
- slices = ['year2month12', 'year2month11', 'year2month10',
- 'year2month9', 'year2month8', 'year2month7',
- 'year2month6', 'year2month5', 'year2month4',
- 'year2month3', 'year2month2', 'year2month1',
- 'year1month12', 'year1month11', 'year1month10',
- 'year1month9', 'year1month8', 'year1month7',
- 'year1month6', 'year1month5', 'year1month4',
- 'year1month3', 'year1month2', 'year1month1']
- for _slice in slices:
- for ticker in tickers:
- if ticker not in df.values:
- log.info(f'{ticker} not available. Skiping...')
- continue
- name = df.loc[df['symbol'] == ticker, 'name'].iat[0]
- log.info(f'Downloading data for {ticker}: {name}...')
- for timeframe in timeframes:
- download_path = f'{modpath}/{timeframe}'
- filepath = f'{download_path}/{ticker}.csv'
- download_previous_data(filepath, ticker, timeframe, _slice)
- def main():
- get_data()
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement