Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import pandas as pd
- import numpy as np
- from pandas import Timestamp
- import dask.dataframe as dd
- from dask.multiprocessing import get
- vwap_data = pd.DataFrame.from_dict({'TKER': {3118788: '1COV.DE', 3158847: '1COV.DE', 3198911: '1COV.DE', 3238976: '1COV.DE', 3279046: '1COV.DE', 3319119: '1COV.DE', 3359306: '1COV.DE', 3399515: '1COV.DE', 3439737: '1COV.DE', 3479971: '1COV.DE', 3520217: '1COV.DE', 3560473: '1COV.DE', 3600771: '1COV.DE', 3641081: '1COV.DE', 3681396: '1COV.DE', 3721720: '1COV.DE', 3762055: '1COV.DE', 3802398: '1COV.DE', 3842746: '1COV.DE', 3883119: '1COV.DE', 3923500: '1COV.DE', 3963885: '1COV.DE', 4004273: '1COV.DE'}, 'INTV': {3118788: 5248, 3158847: 7581, 3198911: 2575, 3238976: 18449, 3279046: 5729, 3319119: 4629, 3359306: 19876, 3399515: 7860, 3439737: 12346, 3479971: 7124, 3520217: 4957, 3560473: 3837, 3600771: 12622, 3641081: 9488, 3681396: 7079, 3721720: 9782, 3762055: 6074, 3802398: 8314, 3842746: 2989, 3883119: 9277, 3923500: 9921, 3963885: 12375, 4004273: 8330}, 'IVWP': {3118788: 44.0004, 3158847: 44.0465, 3198911: 44.0, 3238976: 44.0082, 3279046: 43.9813, 3319119: 43.931, 3359306: 43.9507, 3399515: 43.9736, 3439737: 44.0426, 3479971: 44.0168, 3520217: 43.9636, 3560473: 43.8867, 3600771: 43.8018, 3641081: 43.7593, 3681396: 43.7531, 3721720: 43.6628, 3762055: 43.6628, 3802398: 43.7449, 3842746: 43.6952, 3883119: 43.6071, 3923500: 43.498, 3963885: 43.5711, 4004273: 43.6618}, 'interval': {3118788: Timestamp('2019-05-10 14:05:00'), 3158847: Timestamp('2019-05-10 14:10:00'), 3198911: Timestamp('2019-05-10 14:15:00'), 3238976: Timestamp('2019-05-10 14:20:00'), 3279046: Timestamp('2019-05-10 14:25:00'), 3319119: Timestamp('2019-05-10 14:30:00'), 3359306: Timestamp('2019-05-10 14:35:00'), 3399515: Timestamp('2019-05-10 14:40:00'), 3439737: Timestamp('2019-05-10 14:45:00'), 3479971: Timestamp('2019-05-10 14:50:00'), 3520217: Timestamp('2019-05-10 14:55:00'), 3560473: Timestamp('2019-05-10 15:00:00'), 3600771: Timestamp('2019-05-10 15:05:00'), 3641081: Timestamp('2019-05-10 15:10:00'), 3681396: Timestamp('2019-05-10 15:15:00'), 3721720: Timestamp('2019-05-10 15:20:00'), 3762055: Timestamp('2019-05-10 15:25:00'), 3802398: Timestamp('2019-05-10 15:30:00'), 3842746: Timestamp('2019-05-10 15:35:00'), 3883119: Timestamp('2019-05-10 15:40:00'), 3923500: Timestamp('2019-05-10 15:45:00'), 3963885: Timestamp('2019-05-10 15:50:00'), 4004273: Timestamp('2019-05-10 15:55:00')}} )
- crosses_data = pd.DataFrame.from_dict({'RIC': {0: '0UBn.DE', 1: '0UBn.DE', 2: '1COV.DE', 3: '1COV.DE', 4: '1COV.DE', 5: '1COV.DE', 6: '1COV.DE', 7: '1COV.DE', 8: '1COV.DE', 9: '1COV.DE', 10: '1COV.DE', 11: '1COV.DE', 12: '1COV.DE', 13: '1COV.DE', 14: '1COV.DE', 15: '1COV.DE', 16: '1COV.DE', 17: '1COV.DE', 18: '1COV.DE', 19: '1COV.DE'}, 'Interval_Start_Human': {0: Timestamp('2019-05-10 15:10:11'), 1: Timestamp('2019-05-10 06:59:45'), 2: Timestamp('2019-05-10 14:15:12'), 3: Timestamp('2019-05-10 14:15:10'), 4: Timestamp('2019-05-10 14:15:10'), 5: Timestamp('2019-05-10 14:20:10'), 6: Timestamp('2019-05-10 14:20:12'), 7: Timestamp('2019-05-10 14:55:10'), 8: Timestamp('2019-05-10 15:05:09'), 9: Timestamp('2019-05-10 14:25:09'), 10: Timestamp('2019-05-10 14:10:08'), 11: Timestamp('2019-05-10 14:10:08'), 12: Timestamp('2019-05-10 14:10:10'), 13: Timestamp('2019-05-10 14:10:10'), 14: Timestamp('2019-05-10 14:03:01'), 15: Timestamp('2019-05-10 14:00:13'), 16: Timestamp('2019-05-10 14:05:06'), 17: Timestamp('2019-05-10 14:00:12'), 18: Timestamp('2019-05-10 14:00:11'), 19: Timestamp('2019-05-10 15:15:11')}, 'Interval_End_Human': {0: Timestamp('2019-05-10 15:37:00'), 1: Timestamp('2019-05-10 15:37:00'), 2: Timestamp('2019-05-10 14:20:11'), 3: Timestamp('2019-05-10 14:20:09'), 4: Timestamp('2019-05-10 14:20:10'), 5: Timestamp('2019-05-10 14:25:09'), 6: Timestamp('2019-05-10 14:25:12'), 7: Timestamp('2019-05-10 15:00:09'), 8: Timestamp('2019-05-10 15:10:08'), 9: Timestamp('2019-05-10 14:30:08'), 10: Timestamp('2019-05-10 14:15:06'), 11: Timestamp('2019-05-10 14:15:07'), 12: Timestamp('2019-05-10 14:15:08'), 13: Timestamp('2019-05-10 14:15:09'), 14: Timestamp('2019-05-10 15:35:00'), 15: Timestamp('2019-05-10 15:35:00'), 16: Timestamp('2019-05-10 14:10:06'), 17: Timestamp('2019-05-10 14:05:12'), 18: Timestamp('2019-05-10 14:05:09'), 19: Timestamp('2019-05-10 15:20:11')}})
- def calculate_vwap(ric_id, interval_start, interval_finish, vwap_data, row_n):
- some_tmp_vwap_interval_data = \
- vwap_data.query(
- 'TKER == @ric_id and interval > @interval_start and interval < '
- '@interval_finish '
- )[['IVWP', 'INTV']]
- if sum(some_tmp_vwap_interval_data['INTV']):
- return pd.Series(
- [sum(
- vwap * volume for vwap, volume in
- zip(some_tmp_vwap_interval_data['IVWP'],
- some_tmp_vwap_interval_data['INTV'])
- )
- / sum(some_tmp_vwap_interval_data['INTV']),
- some_tmp_vwap_interval_data.IVWP.iloc[0],
- some_tmp_vwap_interval_data.IVWP.iloc[-1],
- some_tmp_vwap_interval_data.INTV.sum()]
- )
- return None
- # Single core realization, works fine
- crosses_data.apply((lambda row: calculate_vwap(row[0], row[1], row[2], vwap_data, row.name)), axis=1)
- # Multithreaded realization, fails with AttributeError: 'Series' object has no attribute 'columns'
- dd.from_pandas(crosses_data,npartitions=4).map_partitions(
- lambda df : df.apply((lambda row: calculate_vwap(row[0], row[1], row[2], vwap_data, row.name)), axis=1)).\
- compute(scheduler=get)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement