Advertisement
Guest User

Untitled

a guest
Mar 14th, 2018
134
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 24.17 KB | None | 0 0
  1. import datetime
  2. import logging
  3. import random
  4. import time
  5.  
  6. from django.db import connection
  7. from suds.client import Client
  8.  
  9. from .models import Instrument
  10. from django.conf import settings
  11.  
  12. import threading
  13. from threading import Thread
  14.  
  15.  
  16. class DatastreamException(Exception):
  17. pass
  18.  
  19.  
  20. class DatastreamClient:
  21. """
  22. Minimal client to interact with Datastream SOAP service
  23. """
  24. WSDL_URL = "http://dataworks.thomson.com/dataworks/enterprise/1.0/webServiceClient.asmx?wsdl"
  25.  
  26. INSTRUMENT_CUSTOM_PRICE_FIELDS = {
  27. 'UCITAIG': {'ri': 'NAV', 'ri_currency': ''},
  28. 'UCITLSE': {'ri': 'NAV', 'ri_currency': ''},
  29. 'UCITEMN': {'ri': 'NAV', 'ri_currency': ''},
  30. 'SGIETOT': {'ri': 'TR', 'ri_currency': ''},
  31. 'ALEURSP': {'price': 'ER', 'ri_currency': ''},
  32. 'HKEURSP': {'price': 'ER', 'ri_currency': ''},
  33. 'UKEURSP': {'price': 'ER', 'ri_currency': ''},
  34. 'NWEURSP': {'price': 'ER', 'ri_currency': ''},
  35. 'SWEURSP': {'price': 'ER', 'ri_currency': ''},
  36. 'NZEURSP': {'price': 'ER', 'ri_currency': ''},
  37. 'AUEURSP': {'price': 'ER', 'ri_currency': ''},
  38. 'JPEURSP': {'price': 'ER', 'ri_currency': ''},
  39. 'USEURSP': {'price': 'ER', 'ri': 'ER', 'ri_currency': ''},
  40. 'DCBGLTR': {'ri': 'TR', 'ri_currency': ''},
  41. 'LCP3MTH': {'ri': 'P', 'ri_currency': ''},
  42. 'GOLDBLN': {'ri': 'P', 'ri_currency': ''},
  43. 'STEUBOE': {'ri': 'ER', 'ri_currency': ''},
  44. 'JPEUBOE': {'ri': 'ER', 'ri_currency': ''},
  45. 'CHIYUA$': {'ri': 'ER', 'ri_currency': ''},
  46. 'CVXCS00': {'ri': 'PS', 'ri_currency': ''},
  47. 'JPMPTOT': {'ri': 'RI', 'ri_currency': ''},
  48. 'JGEMCOM': {'ri': 'RI', 'ri_currency': ''},
  49. 'UCITAIS': {'ri': 'NAV', 'ri_currency': ''},
  50. 'MLEXPTE': {'ri': 'RI', 'ri_currency': ''},
  51. 'MLEXPAE': {'ri': 'RI', 'ri_currency': ''},
  52. 'JPMEGTO': {'ri': 'RI', 'ri_currency': ''},
  53. 'MLD1T3E': {'ri': 'RI', 'ri_currency': ''},
  54. 'MLEMUCU': {'ri': 'RI', 'ri_currency': ''},
  55. 'ML£BADE': {'ri': 'RI', 'ri_currency': ''},
  56. 'MLHGBCU': {'ri': 'RI', 'ri_currency': ''},
  57. 'MLUCCAE': {'ri': 'RI', 'ri_currency': ''},
  58. 'UCITAFX': {'ri': 'NAV', 'ri_currency': ''},
  59. 'MSUSAM$': {'ri': 'RI', 'ri_currency': ''},
  60. 'UCITAED': {'ri': 'NAV', 'ri_currency': ''},
  61. 'UCITAIM': {'ri': 'NAV', 'ri_currency': ''},
  62. 'E:CTG': {'ri': 'RI', 'ri_currency': ''},
  63. 'HFRUEHE': {'ri': 'RI', 'ri_currency': ''},
  64. 'HFRUHCE': {'ri': 'RI', 'ri_currency': ''}
  65. }
  66.  
  67.  
  68. def __init__(self, username, password, wsdl=WSDL_URL):
  69. """Creating connection to the Thomson Reuters Dataworks Enterprise (DWE) server
  70. (former Thomson Reuters Datastream).
  71. """
  72. self.client = Client(wsdl, username=username, password=password)
  73. self.userdata = self.client.factory.create('UserData')
  74. self.userdata.Username = username
  75. self.userdata.Password = password
  76.  
  77. # Trying to connect
  78. # try:
  79. # self.ver = self.version()
  80. # except:
  81. # raise Exception('Unable to connect')
  82.  
  83. def version(self):
  84. """Return version of the TR DWE."""
  85. res = self.client.service.Version()
  86. return '.'.join([str(x) for x in res[0]])
  87.  
  88. def info(self):
  89. response = self.client.service.SystemInfo()
  90. def fmt(xs): '.'.join([str(x) for x in xs.ArrayValue[0]]) if hasattr(xs, 'ArrayValue') else ''
  91. return {f.Name: f.Value if hasattr(f, 'Value') else fmt(f) for f in response.Field}
  92.  
  93. def fetch(self, query, source='Datastream', fields=None, options=None, symbol_set=None, tag=None):
  94. rd = self.build_request_data(query,source,fields,options,symbol_set,tag)
  95. records = self.client.service.RequestRecordAsXml(self.userdata, rd, 0)
  96. status = self.parse_status(records.Record)
  97. return records, status
  98.  
  99. def bulk_fetch(self, queries):
  100. rds = self.client.factory.create('ArrayOfRequestData')
  101. rds.RequestData = []
  102. for query in queries:
  103. rds.RequestData.append(self.build_request_data(query))
  104. records = self.client.service.RequestRecordsAsXml(self.userdata, rds, 0)
  105. return records
  106.  
  107. def build_request_data(self, query, source='Datastream', fields=None, options=None, symbol_set=None, tag=None):
  108. rd = self.client.factory.create('RequestData')
  109. rd.Source = source
  110. rd.Instrument = query
  111. if fields is not None:
  112. rd.Fields = self.client.factory.create('ArrayOfString')
  113. rd.Fields.string = fields
  114. rd.SymbolSet = symbol_set
  115. rd.Options = options
  116. rd.Tag = tag
  117. return rd
  118.  
  119. def build_query_instrument_fields(self, ticker, date_start):
  120. instrument_fields = {'price': 'P', 'ri': 'RI', 'ri_currency': '~~E'}
  121. if ticker in self.INSTRUMENT_CUSTOM_PRICE_FIELDS.keys():
  122. custom_params = self.INSTRUMENT_CUSTOM_PRICE_FIELDS[ticker]
  123. instrument_fields = {**instrument_fields, **custom_params}
  124. price_field = instrument_fields['price']
  125. ri_field = instrument_fields['ri']
  126. ri_currency = instrument_fields['ri_currency']
  127. yesterday = datetime.datetime.now() - datetime.timedelta(1)
  128. query = '%s(%s),%s(%s)%s~%s~:%s' % (ticker, price_field,
  129. ticker, ri_field, ri_currency,
  130. date_start.strftime("%Y-%m-%d"),
  131. yesterday.strftime("%Y-%m-%d"))
  132. return query, instrument_fields
  133.  
  134.  
  135.  
  136. def fetch_multiprices(self,batch):
  137. date_field = 'DATE'
  138. for item in batch:
  139. query, instrument_fields = self.build_query_instrument_fields(item['code'], item['since'])
  140. item['query'] = query
  141. item['instrument_fields'] = instrument_fields
  142. item['price_field'] = instrument_fields['price']
  143. item['ri_field'] = instrument_fields['ri']
  144. item['date_field'] = 'DATE'
  145.  
  146. records = self.bulk_fetch([q['query'] for q in batch])
  147.  
  148. records = records.Records[0]
  149.  
  150. instruments = {}
  151. for item in batch:
  152. instruments[item['query']] = item
  153.  
  154. results = []
  155. for record in records:
  156. status = self.parse_status(record)
  157. instrument = instruments[status['Request']]
  158. result = {'instrument_id':instrument['instrument_id'],
  159. 'code':instrument['code'],
  160. 'since':instrument['since'],
  161. 'data':[]}
  162. if status['StatusCode'] > 0:
  163. logging.info("Skipped:"+status['Request'])
  164. continue
  165.  
  166. fields = record.Fields
  167. many_prices = isinstance(fields[date_field].Item, list)
  168. tot_downloaded = len(fields[date_field].Item) if many_prices else 1
  169.  
  170. def ref_date(ix):
  171. return fields[date_field].Item[ix] \
  172. if many_prices else fields[date_field].Item
  173.  
  174. def price(ix):
  175. price_field = instrument['price_field']
  176. price = -1 if instrument['is_index'] else "NULL"
  177. has_price = hasattr(fields, price_field)
  178. if has_price:
  179. price = fields[price_field].Item[ix] \
  180. if many_prices else fields[price_field].Item
  181. return price
  182.  
  183. def ri(ix):
  184. ri_field = instrument['ri_field']
  185. has_ri = hasattr(fields, ri_field)
  186. ri_val = "NULL"
  187. if has_ri:
  188. ri_val = fields[ri_field].Item[ix] \
  189. if many_prices else fields[ri_field].Item
  190. return ri_val
  191.  
  192.  
  193. result['data'] = [(ref_date(i), price(i), ri(i))
  194. for i in range(0, tot_downloaded)]
  195. results.append(result)
  196. return results
  197.  
  198.  
  199. def fetch_prices(self, ticker, date_start, is_index):
  200. """
  201. Get PRICE and RETURN INDEX (or NAV) from Datastream for a given
  202. instrument CODE and PERIOD. To customize fields query for an
  203. instrument use self.INSTRUMENT_CUSTOM_PRICE_FIELDS.
  204.  
  205. Retrieval PERIOD is set between date_start and (TODAY - 1 DAY)
  206.  
  207. Returned data population strategy:
  208.  
  209. - If PRICE value is NULL
  210. -> if INSTRUMENT is 'index'
  211. -> return -1 (insert query will NOT throw a error)
  212. -> else
  213. -> return "NULL" (insert query will throw a error)
  214.  
  215. - if RI field is NULL
  216. -> return NULL (insert query will NOT throw error)
  217.  
  218. :param ticker: datastrem ticker code
  219. :param date_start: date to request data since
  220. :param is_index: true if given item is an index
  221. :return: a generator of tuple containing reference_date,
  222. price and return_index for the given ticker and period
  223. """
  224.  
  225. query, instrument_fields = self.build_query_instrument_fields(ticker,date_start)
  226. price_field = instrument_fields['price']
  227. ri_field = instrument_fields['ri']
  228. date_field = 'DATE'
  229.  
  230. prices, status = self.fetch(query)
  231. if status['StatusCode'] > 0:
  232. raise DatastreamException(str(status))
  233.  
  234. fields = prices.Record.Fields
  235. many_prices = isinstance(fields[date_field].Item, list)
  236. tot_downloaded = len(fields[date_field].Item) if many_prices else 1
  237.  
  238. def ref_date(idx):
  239. return fields[date_field].Item[idx] \
  240. if many_prices else fields[date_field].Item
  241.  
  242. def price(idx):
  243. price = -1 if is_index else "NULL"
  244. has_price = hasattr(fields, price_field)
  245. if has_price:
  246. price = fields[price_field].Item[idx] \
  247. if many_prices else fields[price_field].Item
  248. return price
  249.  
  250. def ri(idx):
  251. has_ri = hasattr(fields, ri_field)
  252. ri_val = "NULL"
  253. if has_ri:
  254. ri_val = fields[ri_field].Item[idx] \
  255. if many_prices else fields[ri_field].Item
  256. return ri_val
  257.  
  258. price_and_ri = [(ref_date(i), price(i), ri(i))
  259. for i in range(0, tot_downloaded)]
  260. return price_and_ri
  261.  
  262. @classmethod
  263. def instance(cls, user=None, password=None):
  264. return DatastreamClient(username=user if user else settings.WS_USER,
  265. password=password if password else settings.WS_PWD)
  266.  
  267. @classmethod
  268. def build_query(cls, ticker, fields=None, ref_date=None,
  269. date_from=None, date_to=None, freq=None):
  270. """Construct a request string for querying TR DWE.
  271. tickers - ticker or symbol
  272. fields - list of fields.
  273. date - date for a single-date query
  274. date_from, date_to - date range (used only if "date" is not specified)
  275. freq - frequency of data: daily('D'), weekly('W') or monthly('M')
  276. Use here 'REP' for static requests
  277. Some of available fields:
  278. P - adjusted closing price
  279. PO - opening price
  280. PH - high price
  281. PL - low price
  282. VO - volume, which is expressed in 1000's of shares.
  283. UP - unadjusted price
  284. OI - open interest
  285. MV - market value
  286. EPS - earnings per share
  287. DI - dividend index
  288. MTVB - market to book value
  289. PTVB - price to book value
  290. ...
  291. The full list of data fields is available at http://dtg.tfn.com/.
  292. """
  293. fmt = '%Y-%m-%d'
  294. u = lambda x: ','.join(x) if isinstance(x, list) and len(x) > 0 else x
  295.  
  296. query = u(ticker)
  297. if fields is not None:
  298. query += '~=' + u(fields)
  299. if ref_date is not None:
  300. query += '~@' + ref_date.strftime(fmt)
  301. else:
  302. if date_from is not None:
  303. query += '~' + date_from.strftime(fmt)
  304. if date_to is not None:
  305. query += '~:' + date_to.strftime(fmt)
  306. if freq is not None:
  307. query += '~' + freq
  308. return query
  309.  
  310. @classmethod
  311. def parse_status(cls, record=None):
  312. """Extract status from the retrieved data and save it as a property of an object.
  313. If record with data is not specified then the status of previous operation is
  314. returned.
  315. status - dictionary with data source, string with request and status type,
  316. code and message.
  317. status['StatusType']: 'Connected' - the data is fine
  318. 'Stale' - the source is unavailable. It may be
  319. worthwhile to try again later
  320. 'Failure' - data could not be obtained (e.g. the
  321. instrument is incorrect)
  322. 'Pending' - for internal use only
  323. status['StatusCode']: 0 - 'No Error'
  324. 1 - 'Disconnected'
  325. 2 - 'Source Fault'
  326. 3 - 'Network Fault'
  327. 4 - 'Access Denied' (user does not have permissions)
  328. 5 - 'No Such Item' (no instrument with given name)
  329. 11 - 'Blocking Timeout'
  330. 12 - 'Internal'
  331. """
  332. last_status = None
  333. if record is not None:
  334. last_status = {'Source': str(record['Source']),
  335. 'StatusType': str(record['StatusType']),
  336. 'StatusCode': int(record['StatusCode']),
  337. 'StatusMessage': str(record['StatusMessage']),
  338. 'Request': str(record['Instrument'])}
  339. return last_status
  340.  
  341.  
  342. SIX_YEARS_AGO = datetime.datetime.now() - datetime.timedelta(365 * 6)
  343.  
  344.  
  345. def refresh_isin_from_file(path):
  346. with open(path) as f:
  347. a = f.readlines()
  348. isins = [l.rstrip('\r\n') for l in a]
  349. insts = Instrument.objects.filter(isin_code__in=isins)
  350. refresh_instrument_prices(insts)
  351.  
  352.  
  353. def split(arr, size):
  354. arrs = []
  355. while len(arr) > size:
  356. pice = arr[:size]
  357. arrs.append(pice)
  358. arr = arr[size:]
  359. arrs.append(arr)
  360. return arrs
  361.  
  362. def divide_instrument(instruments):
  363. splitted = split(instruments,150)
  364. instrument_size = len(splitted)
  365. array1 = []
  366. array2 = []
  367. if instrument_size % 2 == 0:
  368. array1 = splitted[0:int(instrument_size/2)]
  369. array2 = splitted[int(instrument_size/2):]
  370. else:
  371. array1 = splitted[0:int(instrument_size / 2)+1]
  372. array2 = splitted[int(instrument_size / 2):]
  373. return array1,array2
  374.  
  375. def prepare_batch(instruments, fallback_period_start, last_dloaded_prices,overlap_days_ago):
  376. result = []
  377. for instrument in instruments:
  378. fallback_start_date = instrument.base_date if instrument.base_date else fallback_period_start
  379. #datetime.datetime.now() - datetime.timedelta(5)
  380. since = datetime.datetime.now() - datetime.timedelta(15) # last_dloaded_prices.get(instrument.id, fallback_start_date) - datetime.timedelta(overlap_days_ago)
  381. is_index = instrument.type == 'index'
  382. code = instrument.datastream_code if instrument.datastream_code else instrument.isin_code
  383. result.append({'instrument_id':instrument.id,
  384. 'code':code,
  385. 'since':since,
  386. 'is_index':is_index})
  387. return result
  388.  
  389. def refresh_instrument_prices(instruments, fallback_period_start=SIX_YEARS_AGO):
  390. """
  391. Query prices for each given instrument on datastream and update the DB.
  392. Price/Return index data is downloaded between
  393. (DATE_START - OVERLAP_DAYS) and today
  394.  
  395.  
  396. if not null instrument.datastream_code is used to query Datastream,
  397. instrument.isin_code otherwise.
  398.  
  399. DATE_START is selected as it follows:
  400. - last day we have a price in DB for current instrument or
  401. - instrument.base_date if no price downloaded yet
  402. - fallback date if instrument.bdate is null and no price downloaded yet
  403.  
  404. OVERLAP_DAYS is set to 9 days
  405.  
  406. :param instruments: a list of Instrument objects
  407. :param fallback_period_start: fallback value to query Instrument price since
  408. :return: error count for instrument updates
  409. """
  410.  
  411. print("Disable Trigger")
  412. with connection.cursor() as cursor:
  413. cursor.execute('''update pg_index set indisvalid = false where indexrelid = 'idx_price_instrument_id'::regclass;''')
  414.  
  415. def upsert_instruments_prices(instrument_prices):
  416. for instrument in instrument_prices:
  417. SQL_TMPL = '''INSERT INTO price ("ref_date", "price", "ri", "instrument_id")
  418. VALUES ('%s', %s, %s, '%s') ON CONFLICT ON CONSTRAINT %s DO
  419. UPDATE SET price = %s, ri = %s WHERE price.ref_date = '%s' AND price.instrument_id = %s;'''
  420. sql_pkey = 'price_ref_date_ea332e77_uniq'
  421. sql_prices_upsert = []
  422. for xs in instrument['data']:
  423. has_price_and_ri = xs[1] != "NULL" and xs[1] != 'NaN' and xs[2] != 'NaN'
  424. if has_price_and_ri:
  425. query = SQL_TMPL % (*xs, instrument['instrument_id'], sql_pkey,
  426. xs[1], xs[2], xs[0], instrument['instrument_id'])
  427. sql_prices_upsert.append(query)
  428.  
  429. if len(sql_prices_upsert) > 0:
  430. with connection.cursor() as cursor:
  431. cursor.execute('\n'.join(sql_prices_upsert))
  432.  
  433.  
  434. def upsert_instrument_prices(instrument, instrument_prices):
  435. """
  436. Build and execute upsert queries to update instrument prices and return
  437. index for a given instrument
  438.  
  439. :param instrument: a investable.models.Instrument object
  440. :param instrument_prices: list of instrument prices to be inserted,
  441. format: [(ref_date, price_val, ri_val), ...]
  442. :return:
  443. """
  444. SQL_TMPL = '''INSERT INTO price ("ref_date", "price", "ri", "instrument_id")
  445. VALUES ('%s', %s, %s, '%s') ON CONFLICT ON CONSTRAINT %s DO
  446. UPDATE SET price = %s, ri = %s WHERE price.ref_date = '%s' AND price.instrument_id = %s;'''
  447. sql_pkey = 'price_ref_date_ea332e77_uniq' # TODO: move outside as a PARAMETER
  448. sql_prices_upsert = []
  449. for xs in instrument_prices:
  450. has_price_and_ri = xs[1] != "NULL" and xs[1] != 'NaN' and xs[2] != 'NaN'
  451. if has_price_and_ri:
  452. query = SQL_TMPL % (*xs, instrument.id, sql_pkey,
  453. xs[1], xs[2], xs[0], instrument.id)
  454. sql_prices_upsert.append(query)
  455.  
  456. if len(sql_prices_upsert) > 0:
  457. with connection.cursor() as cursor:
  458. cursor.execute('\n'.join(sql_prices_upsert))
  459.  
  460. def instruments_lastupdated():
  461. """
  462. find last date having price for each intrument
  463. :return: a dictionary like {insrument_id: last_retrieved_date}
  464. """
  465. SQL_TMPL = '''SELECT instrument_id, MAX(ref_date) FROM price GROUP BY instrument_id;'''
  466. cursor = connection.cursor()
  467. cursor.execute(SQL_TMPL)
  468. return {k: v for k, v in cursor.fetchall()}
  469.  
  470. start_time = time.time()
  471.  
  472. datastream_c1 = DatastreamClient.instance()
  473. datastream_c2 = DatastreamClient.instance(user='DS:ZEFJ001',password='EBONY292')
  474. last_dloaded_prices = instruments_lastupdated()
  475. updated_count, error_count = 0, 0
  476. overlap_days_ago = 9
  477.  
  478. bulk1, bulk2 = divide_instrument([i for i in instruments])
  479.  
  480. def worker1():
  481. print("Worker 1: presenti "+str(len(bulk1))+" bulk")
  482. while len(bulk1) > 0:
  483. print("Lavorazione Bulk1_"+str(len(bulk1)))
  484. work_bulk(bulk1.pop(), datastream_c1, last_dloaded_prices, overlap_days_ago, upsert_instruments_prices)
  485.  
  486. def worker2():
  487. print("Worker 2: presenti " + str(len(bulk2)) + " bulk")
  488. while len(bulk2) > 0:
  489. print("Lavorazione Bulk2_" + str(len(bulk2)))
  490. work_bulk(bulk2.pop(), datastream_c2, last_dloaded_prices, overlap_days_ago, upsert_instruments_prices)
  491.  
  492.  
  493. t1 = Thread(target=worker1)
  494. t2 = Thread(target=worker2)
  495.  
  496. t1.start()
  497. t2.start()
  498. t1.join()
  499. t2.join()
  500.  
  501. """
  502. for instrument in instruments:
  503. code = None
  504. try:
  505. if updated_count > 0 and ((updated_count + error_count) % 850 == 0):
  506. logging.info("Refreshing datastream client...")
  507. # Wait for a random period...
  508. time.sleep(random.randint(0, 1000) / 1000)
  509. datastream = DatastreamClient.instance()
  510.  
  511. updated_count += 1
  512. fallback_start_date = instrument.base_date if instrument.base_date else fallback_period_start
  513. since = last_dloaded_prices.get(instrument.id, fallback_start_date) - datetime.timedelta(overlap_days_ago)
  514. is_index = instrument.type == 'index'
  515. code = instrument.datastream_code if instrument.datastream_code else instrument.isin_code
  516.  
  517. item_prices = datastream.fetch_prices(code, since, is_index)
  518. upsert_instrument_prices(instrument, item_prices)
  519.  
  520. logging.info("Updated instrument: %s, isin: %s, type: %s, since: %s, downloaded_prices: %d" %
  521. (instrument.id, code, instrument.type, since.strftime("%Y-%m-%d"), len(item_prices)))
  522. except Exception as e:
  523. logging.error("Error on instrument: %s, isin: '%s', name: '%s', type: '%s', status: '%s'"
  524. % (instrument.id, code, instrument.name, instrument.type, e))
  525. error_count += 1
  526. """
  527. elapsed = time.time() - start_time
  528. logging.info("Elapsed: %f" % elapsed)
  529. print("Enabe Trigger")
  530. with connection.cursor() as cursor:
  531. cursor.execute("update pg_index set indisvalid = true where indexrelid = 'idx_price_instrument_id'::regclass;")
  532. cursor.execute("REINDEX TABLE price")
  533. return error_count
  534.  
  535.  
  536. def work_bulk(bulk, datastream, last_dloaded_prices, overlap_days_ago, upsert_instruments_prices):
  537. updated_count,error_count = 0,0
  538. try:
  539. if updated_count > 0 and (updated_count % 800 == 0):
  540. logging.info("Refreshing datastream client...")
  541. time.sleep(random.randint(0, 1000) / 1000)
  542. datastream = DatastreamClient.instance(user=datastream.userdata.Username,
  543. password=datastream.userdata.Password)
  544. logging.info("New DataStream client with username:"+datastream.userdata.Username)
  545. fallback_period_start = datetime.datetime.now() - datetime.timedelta(1)
  546. batch = prepare_batch(bulk, fallback_period_start, last_dloaded_prices, overlap_days_ago)
  547. instrument_prices = datastream.fetch_multiprices(batch)
  548. updated_count += len(instrument_prices)
  549. upsert_instruments_prices(instrument_prices)
  550. for instrument in instrument_prices:
  551. logging.info("Updated instrument: %s, isin: %s, since: %s, downloaded_prices: %d" %
  552. (instrument['instrument_id'], instrument['code'], instrument['since'].strftime("%Y-%m-%d"), len(instrument['data'])))
  553. except Exception as e:
  554. logging.error("Error on bulk processing:" +str(e))
  555. error_count += 1
  556.  
  557. def refresh_isin_prices(isin_ids, fallback_period_start=SIX_YEARS_AGO):
  558. if not hasattr(isin_ids, '__iter__'):
  559. isin_ids = [isin_ids]
  560. insts = Instrument.objects.filter(isin_code__in=isin_ids)
  561. return refresh_instrument_prices(insts, fallback_period_start)
  562.  
  563.  
  564. def refresh_analisys_driver_prices():
  565. return refresh_instrument_prices(Instrument.analysis_drivers())
  566.  
  567.  
  568. def refresh_all_instrument_prices():
  569. return refresh_instrument_prices(Instrument.all())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement