Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import datetime
- import logging
- import random
- import time
- from django.db import connection
- from suds.client import Client
- from .models import Instrument
- from django.conf import settings
- import threading
- from threading import Thread
- class DatastreamException(Exception):
- pass
- class DatastreamClient:
- """
- Minimal client to interact with Datastream SOAP service
- """
- WSDL_URL = "http://dataworks.thomson.com/dataworks/enterprise/1.0/webServiceClient.asmx?wsdl"
- INSTRUMENT_CUSTOM_PRICE_FIELDS = {
- 'UCITAIG': {'ri': 'NAV', 'ri_currency': ''},
- 'UCITLSE': {'ri': 'NAV', 'ri_currency': ''},
- 'UCITEMN': {'ri': 'NAV', 'ri_currency': ''},
- 'SGIETOT': {'ri': 'TR', 'ri_currency': ''},
- 'ALEURSP': {'price': 'ER', 'ri_currency': ''},
- 'HKEURSP': {'price': 'ER', 'ri_currency': ''},
- 'UKEURSP': {'price': 'ER', 'ri_currency': ''},
- 'NWEURSP': {'price': 'ER', 'ri_currency': ''},
- 'SWEURSP': {'price': 'ER', 'ri_currency': ''},
- 'NZEURSP': {'price': 'ER', 'ri_currency': ''},
- 'AUEURSP': {'price': 'ER', 'ri_currency': ''},
- 'JPEURSP': {'price': 'ER', 'ri_currency': ''},
- 'USEURSP': {'price': 'ER', 'ri': 'ER', 'ri_currency': ''},
- 'DCBGLTR': {'ri': 'TR', 'ri_currency': ''},
- 'LCP3MTH': {'ri': 'P', 'ri_currency': ''},
- 'GOLDBLN': {'ri': 'P', 'ri_currency': ''},
- 'STEUBOE': {'ri': 'ER', 'ri_currency': ''},
- 'JPEUBOE': {'ri': 'ER', 'ri_currency': ''},
- 'CHIYUA$': {'ri': 'ER', 'ri_currency': ''},
- 'CVXCS00': {'ri': 'PS', 'ri_currency': ''},
- 'JPMPTOT': {'ri': 'RI', 'ri_currency': ''},
- 'JGEMCOM': {'ri': 'RI', 'ri_currency': ''},
- 'UCITAIS': {'ri': 'NAV', 'ri_currency': ''},
- 'MLEXPTE': {'ri': 'RI', 'ri_currency': ''},
- 'MLEXPAE': {'ri': 'RI', 'ri_currency': ''},
- 'JPMEGTO': {'ri': 'RI', 'ri_currency': ''},
- 'MLD1T3E': {'ri': 'RI', 'ri_currency': ''},
- 'MLEMUCU': {'ri': 'RI', 'ri_currency': ''},
- 'ML£BADE': {'ri': 'RI', 'ri_currency': ''},
- 'MLHGBCU': {'ri': 'RI', 'ri_currency': ''},
- 'MLUCCAE': {'ri': 'RI', 'ri_currency': ''},
- 'UCITAFX': {'ri': 'NAV', 'ri_currency': ''},
- 'MSUSAM$': {'ri': 'RI', 'ri_currency': ''},
- 'UCITAED': {'ri': 'NAV', 'ri_currency': ''},
- 'UCITAIM': {'ri': 'NAV', 'ri_currency': ''},
- 'E:CTG': {'ri': 'RI', 'ri_currency': ''},
- 'HFRUEHE': {'ri': 'RI', 'ri_currency': ''},
- 'HFRUHCE': {'ri': 'RI', 'ri_currency': ''}
- }
- def __init__(self, username, password, wsdl=WSDL_URL):
- """Creating connection to the Thomson Reuters Dataworks Enterprise (DWE) server
- (former Thomson Reuters Datastream).
- """
- self.client = Client(wsdl, username=username, password=password)
- self.userdata = self.client.factory.create('UserData')
- self.userdata.Username = username
- self.userdata.Password = password
- # Trying to connect
- # try:
- # self.ver = self.version()
- # except:
- # raise Exception('Unable to connect')
- def version(self):
- """Return version of the TR DWE."""
- res = self.client.service.Version()
- return '.'.join([str(x) for x in res[0]])
- def info(self):
- response = self.client.service.SystemInfo()
- def fmt(xs): '.'.join([str(x) for x in xs.ArrayValue[0]]) if hasattr(xs, 'ArrayValue') else ''
- return {f.Name: f.Value if hasattr(f, 'Value') else fmt(f) for f in response.Field}
- def fetch(self, query, source='Datastream', fields=None, options=None, symbol_set=None, tag=None):
- rd = self.build_request_data(query,source,fields,options,symbol_set,tag)
- records = self.client.service.RequestRecordAsXml(self.userdata, rd, 0)
- status = self.parse_status(records.Record)
- return records, status
- def bulk_fetch(self, queries):
- rds = self.client.factory.create('ArrayOfRequestData')
- rds.RequestData = []
- for query in queries:
- rds.RequestData.append(self.build_request_data(query))
- records = self.client.service.RequestRecordsAsXml(self.userdata, rds, 0)
- return records
- def build_request_data(self, query, source='Datastream', fields=None, options=None, symbol_set=None, tag=None):
- rd = self.client.factory.create('RequestData')
- rd.Source = source
- rd.Instrument = query
- if fields is not None:
- rd.Fields = self.client.factory.create('ArrayOfString')
- rd.Fields.string = fields
- rd.SymbolSet = symbol_set
- rd.Options = options
- rd.Tag = tag
- return rd
- def build_query_instrument_fields(self, ticker, date_start):
- instrument_fields = {'price': 'P', 'ri': 'RI', 'ri_currency': '~~E'}
- if ticker in self.INSTRUMENT_CUSTOM_PRICE_FIELDS.keys():
- custom_params = self.INSTRUMENT_CUSTOM_PRICE_FIELDS[ticker]
- instrument_fields = {**instrument_fields, **custom_params}
- price_field = instrument_fields['price']
- ri_field = instrument_fields['ri']
- ri_currency = instrument_fields['ri_currency']
- yesterday = datetime.datetime.now() - datetime.timedelta(1)
- query = '%s(%s),%s(%s)%s~%s~:%s' % (ticker, price_field,
- ticker, ri_field, ri_currency,
- date_start.strftime("%Y-%m-%d"),
- yesterday.strftime("%Y-%m-%d"))
- return query, instrument_fields
- def fetch_multiprices(self,batch):
- date_field = 'DATE'
- for item in batch:
- query, instrument_fields = self.build_query_instrument_fields(item['code'], item['since'])
- item['query'] = query
- item['instrument_fields'] = instrument_fields
- item['price_field'] = instrument_fields['price']
- item['ri_field'] = instrument_fields['ri']
- item['date_field'] = 'DATE'
- records = self.bulk_fetch([q['query'] for q in batch])
- records = records.Records[0]
- instruments = {}
- for item in batch:
- instruments[item['query']] = item
- results = []
- for record in records:
- status = self.parse_status(record)
- instrument = instruments[status['Request']]
- result = {'instrument_id':instrument['instrument_id'],
- 'code':instrument['code'],
- 'since':instrument['since'],
- 'data':[]}
- if status['StatusCode'] > 0:
- logging.info("Skipped:"+status['Request'])
- continue
- fields = record.Fields
- many_prices = isinstance(fields[date_field].Item, list)
- tot_downloaded = len(fields[date_field].Item) if many_prices else 1
- def ref_date(ix):
- return fields[date_field].Item[ix] \
- if many_prices else fields[date_field].Item
- def price(ix):
- price_field = instrument['price_field']
- price = -1 if instrument['is_index'] else "NULL"
- has_price = hasattr(fields, price_field)
- if has_price:
- price = fields[price_field].Item[ix] \
- if many_prices else fields[price_field].Item
- return price
- def ri(ix):
- ri_field = instrument['ri_field']
- has_ri = hasattr(fields, ri_field)
- ri_val = "NULL"
- if has_ri:
- ri_val = fields[ri_field].Item[ix] \
- if many_prices else fields[ri_field].Item
- return ri_val
- result['data'] = [(ref_date(i), price(i), ri(i))
- for i in range(0, tot_downloaded)]
- results.append(result)
- return results
- def fetch_prices(self, ticker, date_start, is_index):
- """
- Get PRICE and RETURN INDEX (or NAV) from Datastream for a given
- instrument CODE and PERIOD. To customize fields query for an
- instrument use self.INSTRUMENT_CUSTOM_PRICE_FIELDS.
- Retrieval PERIOD is set between date_start and (TODAY - 1 DAY)
- Returned data population strategy:
- - If PRICE value is NULL
- -> if INSTRUMENT is 'index'
- -> return -1 (insert query will NOT throw a error)
- -> else
- -> return "NULL" (insert query will throw a error)
- - if RI field is NULL
- -> return NULL (insert query will NOT throw error)
- :param ticker: datastrem ticker code
- :param date_start: date to request data since
- :param is_index: true if given item is an index
- :return: a generator of tuple containing reference_date,
- price and return_index for the given ticker and period
- """
- query, instrument_fields = self.build_query_instrument_fields(ticker,date_start)
- price_field = instrument_fields['price']
- ri_field = instrument_fields['ri']
- date_field = 'DATE'
- prices, status = self.fetch(query)
- if status['StatusCode'] > 0:
- raise DatastreamException(str(status))
- fields = prices.Record.Fields
- many_prices = isinstance(fields[date_field].Item, list)
- tot_downloaded = len(fields[date_field].Item) if many_prices else 1
- def ref_date(idx):
- return fields[date_field].Item[idx] \
- if many_prices else fields[date_field].Item
- def price(idx):
- price = -1 if is_index else "NULL"
- has_price = hasattr(fields, price_field)
- if has_price:
- price = fields[price_field].Item[idx] \
- if many_prices else fields[price_field].Item
- return price
- def ri(idx):
- has_ri = hasattr(fields, ri_field)
- ri_val = "NULL"
- if has_ri:
- ri_val = fields[ri_field].Item[idx] \
- if many_prices else fields[ri_field].Item
- return ri_val
- price_and_ri = [(ref_date(i), price(i), ri(i))
- for i in range(0, tot_downloaded)]
- return price_and_ri
- @classmethod
- def instance(cls, user=None, password=None):
- return DatastreamClient(username=user if user else settings.WS_USER,
- password=password if password else settings.WS_PWD)
- @classmethod
- def build_query(cls, ticker, fields=None, ref_date=None,
- date_from=None, date_to=None, freq=None):
- """Construct a request string for querying TR DWE.
- tickers - ticker or symbol
- fields - list of fields.
- date - date for a single-date query
- date_from, date_to - date range (used only if "date" is not specified)
- freq - frequency of data: daily('D'), weekly('W') or monthly('M')
- Use here 'REP' for static requests
- Some of available fields:
- P - adjusted closing price
- PO - opening price
- PH - high price
- PL - low price
- VO - volume, which is expressed in 1000's of shares.
- UP - unadjusted price
- OI - open interest
- MV - market value
- EPS - earnings per share
- DI - dividend index
- MTVB - market to book value
- PTVB - price to book value
- ...
- The full list of data fields is available at http://dtg.tfn.com/.
- """
- fmt = '%Y-%m-%d'
- u = lambda x: ','.join(x) if isinstance(x, list) and len(x) > 0 else x
- query = u(ticker)
- if fields is not None:
- query += '~=' + u(fields)
- if ref_date is not None:
- query += '~@' + ref_date.strftime(fmt)
- else:
- if date_from is not None:
- query += '~' + date_from.strftime(fmt)
- if date_to is not None:
- query += '~:' + date_to.strftime(fmt)
- if freq is not None:
- query += '~' + freq
- return query
- @classmethod
- def parse_status(cls, record=None):
- """Extract status from the retrieved data and save it as a property of an object.
- If record with data is not specified then the status of previous operation is
- returned.
- status - dictionary with data source, string with request and status type,
- code and message.
- status['StatusType']: 'Connected' - the data is fine
- 'Stale' - the source is unavailable. It may be
- worthwhile to try again later
- 'Failure' - data could not be obtained (e.g. the
- instrument is incorrect)
- 'Pending' - for internal use only
- status['StatusCode']: 0 - 'No Error'
- 1 - 'Disconnected'
- 2 - 'Source Fault'
- 3 - 'Network Fault'
- 4 - 'Access Denied' (user does not have permissions)
- 5 - 'No Such Item' (no instrument with given name)
- 11 - 'Blocking Timeout'
- 12 - 'Internal'
- """
- last_status = None
- if record is not None:
- last_status = {'Source': str(record['Source']),
- 'StatusType': str(record['StatusType']),
- 'StatusCode': int(record['StatusCode']),
- 'StatusMessage': str(record['StatusMessage']),
- 'Request': str(record['Instrument'])}
- return last_status
- SIX_YEARS_AGO = datetime.datetime.now() - datetime.timedelta(365 * 6)
- def refresh_isin_from_file(path):
- with open(path) as f:
- a = f.readlines()
- isins = [l.rstrip('\r\n') for l in a]
- insts = Instrument.objects.filter(isin_code__in=isins)
- refresh_instrument_prices(insts)
- def split(arr, size):
- arrs = []
- while len(arr) > size:
- pice = arr[:size]
- arrs.append(pice)
- arr = arr[size:]
- arrs.append(arr)
- return arrs
- def divide_instrument(instruments):
- splitted = split(instruments,150)
- instrument_size = len(splitted)
- array1 = []
- array2 = []
- if instrument_size % 2 == 0:
- array1 = splitted[0:int(instrument_size/2)]
- array2 = splitted[int(instrument_size/2):]
- else:
- array1 = splitted[0:int(instrument_size / 2)+1]
- array2 = splitted[int(instrument_size / 2):]
- return array1,array2
- def prepare_batch(instruments, fallback_period_start, last_dloaded_prices,overlap_days_ago):
- result = []
- for instrument in instruments:
- fallback_start_date = instrument.base_date if instrument.base_date else fallback_period_start
- #datetime.datetime.now() - datetime.timedelta(5)
- since = datetime.datetime.now() - datetime.timedelta(15) # last_dloaded_prices.get(instrument.id, fallback_start_date) - datetime.timedelta(overlap_days_ago)
- is_index = instrument.type == 'index'
- code = instrument.datastream_code if instrument.datastream_code else instrument.isin_code
- result.append({'instrument_id':instrument.id,
- 'code':code,
- 'since':since,
- 'is_index':is_index})
- return result
- def refresh_instrument_prices(instruments, fallback_period_start=SIX_YEARS_AGO):
- """
- Query prices for each given instrument on datastream and update the DB.
- Price/Return index data is downloaded between
- (DATE_START - OVERLAP_DAYS) and today
- if not null instrument.datastream_code is used to query Datastream,
- instrument.isin_code otherwise.
- DATE_START is selected as it follows:
- - last day we have a price in DB for current instrument or
- - instrument.base_date if no price downloaded yet
- - fallback date if instrument.bdate is null and no price downloaded yet
- OVERLAP_DAYS is set to 9 days
- :param instruments: a list of Instrument objects
- :param fallback_period_start: fallback value to query Instrument price since
- :return: error count for instrument updates
- """
- print("Disable Trigger")
- with connection.cursor() as cursor:
- cursor.execute('''update pg_index set indisvalid = false where indexrelid = 'idx_price_instrument_id'::regclass;''')
- def upsert_instruments_prices(instrument_prices):
- for instrument in instrument_prices:
- SQL_TMPL = '''INSERT INTO price ("ref_date", "price", "ri", "instrument_id")
- VALUES ('%s', %s, %s, '%s') ON CONFLICT ON CONSTRAINT %s DO
- UPDATE SET price = %s, ri = %s WHERE price.ref_date = '%s' AND price.instrument_id = %s;'''
- sql_pkey = 'price_ref_date_ea332e77_uniq'
- sql_prices_upsert = []
- for xs in instrument['data']:
- has_price_and_ri = xs[1] != "NULL" and xs[1] != 'NaN' and xs[2] != 'NaN'
- if has_price_and_ri:
- query = SQL_TMPL % (*xs, instrument['instrument_id'], sql_pkey,
- xs[1], xs[2], xs[0], instrument['instrument_id'])
- sql_prices_upsert.append(query)
- if len(sql_prices_upsert) > 0:
- with connection.cursor() as cursor:
- cursor.execute('\n'.join(sql_prices_upsert))
- def upsert_instrument_prices(instrument, instrument_prices):
- """
- Build and execute upsert queries to update instrument prices and return
- index for a given instrument
- :param instrument: a investable.models.Instrument object
- :param instrument_prices: list of instrument prices to be inserted,
- format: [(ref_date, price_val, ri_val), ...]
- :return:
- """
- SQL_TMPL = '''INSERT INTO price ("ref_date", "price", "ri", "instrument_id")
- VALUES ('%s', %s, %s, '%s') ON CONFLICT ON CONSTRAINT %s DO
- UPDATE SET price = %s, ri = %s WHERE price.ref_date = '%s' AND price.instrument_id = %s;'''
- sql_pkey = 'price_ref_date_ea332e77_uniq' # TODO: move outside as a PARAMETER
- sql_prices_upsert = []
- for xs in instrument_prices:
- has_price_and_ri = xs[1] != "NULL" and xs[1] != 'NaN' and xs[2] != 'NaN'
- if has_price_and_ri:
- query = SQL_TMPL % (*xs, instrument.id, sql_pkey,
- xs[1], xs[2], xs[0], instrument.id)
- sql_prices_upsert.append(query)
- if len(sql_prices_upsert) > 0:
- with connection.cursor() as cursor:
- cursor.execute('\n'.join(sql_prices_upsert))
- def instruments_lastupdated():
- """
- find last date having price for each intrument
- :return: a dictionary like {insrument_id: last_retrieved_date}
- """
- SQL_TMPL = '''SELECT instrument_id, MAX(ref_date) FROM price GROUP BY instrument_id;'''
- cursor = connection.cursor()
- cursor.execute(SQL_TMPL)
- return {k: v for k, v in cursor.fetchall()}
- start_time = time.time()
- datastream_c1 = DatastreamClient.instance()
- datastream_c2 = DatastreamClient.instance(user='DS:ZEFJ001',password='EBONY292')
- last_dloaded_prices = instruments_lastupdated()
- updated_count, error_count = 0, 0
- overlap_days_ago = 9
- bulk1, bulk2 = divide_instrument([i for i in instruments])
- def worker1():
- print("Worker 1: presenti "+str(len(bulk1))+" bulk")
- while len(bulk1) > 0:
- print("Lavorazione Bulk1_"+str(len(bulk1)))
- work_bulk(bulk1.pop(), datastream_c1, last_dloaded_prices, overlap_days_ago, upsert_instruments_prices)
- def worker2():
- print("Worker 2: presenti " + str(len(bulk2)) + " bulk")
- while len(bulk2) > 0:
- print("Lavorazione Bulk2_" + str(len(bulk2)))
- work_bulk(bulk2.pop(), datastream_c2, last_dloaded_prices, overlap_days_ago, upsert_instruments_prices)
- t1 = Thread(target=worker1)
- t2 = Thread(target=worker2)
- t1.start()
- t2.start()
- t1.join()
- t2.join()
- """
- for instrument in instruments:
- code = None
- try:
- if updated_count > 0 and ((updated_count + error_count) % 850 == 0):
- logging.info("Refreshing datastream client...")
- # Wait for a random period...
- time.sleep(random.randint(0, 1000) / 1000)
- datastream = DatastreamClient.instance()
- updated_count += 1
- fallback_start_date = instrument.base_date if instrument.base_date else fallback_period_start
- since = last_dloaded_prices.get(instrument.id, fallback_start_date) - datetime.timedelta(overlap_days_ago)
- is_index = instrument.type == 'index'
- code = instrument.datastream_code if instrument.datastream_code else instrument.isin_code
- item_prices = datastream.fetch_prices(code, since, is_index)
- upsert_instrument_prices(instrument, item_prices)
- logging.info("Updated instrument: %s, isin: %s, type: %s, since: %s, downloaded_prices: %d" %
- (instrument.id, code, instrument.type, since.strftime("%Y-%m-%d"), len(item_prices)))
- except Exception as e:
- logging.error("Error on instrument: %s, isin: '%s', name: '%s', type: '%s', status: '%s'"
- % (instrument.id, code, instrument.name, instrument.type, e))
- error_count += 1
- """
- elapsed = time.time() - start_time
- logging.info("Elapsed: %f" % elapsed)
- print("Enabe Trigger")
- with connection.cursor() as cursor:
- cursor.execute("update pg_index set indisvalid = true where indexrelid = 'idx_price_instrument_id'::regclass;")
- cursor.execute("REINDEX TABLE price")
- return error_count
- def work_bulk(bulk, datastream, last_dloaded_prices, overlap_days_ago, upsert_instruments_prices):
- updated_count,error_count = 0,0
- try:
- if updated_count > 0 and (updated_count % 800 == 0):
- logging.info("Refreshing datastream client...")
- time.sleep(random.randint(0, 1000) / 1000)
- datastream = DatastreamClient.instance(user=datastream.userdata.Username,
- password=datastream.userdata.Password)
- logging.info("New DataStream client with username:"+datastream.userdata.Username)
- fallback_period_start = datetime.datetime.now() - datetime.timedelta(1)
- batch = prepare_batch(bulk, fallback_period_start, last_dloaded_prices, overlap_days_ago)
- instrument_prices = datastream.fetch_multiprices(batch)
- updated_count += len(instrument_prices)
- upsert_instruments_prices(instrument_prices)
- for instrument in instrument_prices:
- logging.info("Updated instrument: %s, isin: %s, since: %s, downloaded_prices: %d" %
- (instrument['instrument_id'], instrument['code'], instrument['since'].strftime("%Y-%m-%d"), len(instrument['data'])))
- except Exception as e:
- logging.error("Error on bulk processing:" +str(e))
- error_count += 1
- def refresh_isin_prices(isin_ids, fallback_period_start=SIX_YEARS_AGO):
- if not hasattr(isin_ids, '__iter__'):
- isin_ids = [isin_ids]
- insts = Instrument.objects.filter(isin_code__in=isin_ids)
- return refresh_instrument_prices(insts, fallback_period_start)
- def refresh_analisys_driver_prices():
- return refresh_instrument_prices(Instrument.analysis_drivers())
- def refresh_all_instrument_prices():
- return refresh_instrument_prices(Instrument.all())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement