Advertisement
Guest User

ma.py

a guest
Apr 7th, 2018
154
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 2.47 KB | None | 0 0
  1. import csv
  2. import re
  3. from functools import reduce
  4. from dateutil import parser
  5.  
  6. CSV_FILE = 'input.csv'
  7. EMA_LENGTH = 5
  8. EMA_SOURCE = 'close'
  9.  
  10. candles = []
  11.  
  12. # Reads the input file and saves to `candles` all the candles found. Each candle is
  13. # a dict with the timestamp and the OHLC values.
  14. def read_candles():
  15.     with open(CSV_FILE, 'r') as csvfile:
  16.         reader = csv.reader(csvfile, delimiter=',')
  17.         for row in reader:
  18.             try:
  19.                 candles.append({
  20.                     'ts': parser.parse(row[0]),
  21.                     'low': float(re.sub(",", "", row[1])),
  22.                     'high': float(re.sub(",", "", row[2])),
  23.                     'open': float(re.sub(",", "", row[3])),
  24.                     'close': float(re.sub(",", "", row[4]))
  25.                 })
  26.             except:
  27.                 print('Error parsing {}'.format(row))
  28.  
  29. # Calculates the SMA of an array of candles using the `source` price.
  30. def calculate_sma(candles, source):
  31.     length = len(candles)
  32.     sum = reduce((lambda last, x: { source: last[source] + x[source] }), candles)
  33.     sma = sum[source] / length
  34.     return sma
  35.  
  36. # Calculates the EMA of an array of candles using the `source` price.
  37. def calculate_ema(candles, source):
  38.     length = len(candles)
  39.     target = candles[0]
  40.     previous = candles[1]
  41.  
  42.     # if there is no previous EMA calculated, then EMA=SMA
  43.     if 'ema' not in previous or previous['ema'] == None:
  44.         return calculate_sma(candles, source)
  45.  
  46.     else:
  47.         # multiplier: (2 / (length + 1))
  48.         # EMA: (close - EMA(previous)) x multiplier + EMA(previous)
  49.         multiplier = 2 / (length + 1)
  50.         ema =  ((target[source] - previous['ema']) * multiplier) + previous['ema']
  51.  
  52.         return ema
  53.  
  54. def calculate(candles, source):
  55.     sma = calculate_sma(candles, source)
  56.     ema = calculate_ema(candles, source)
  57.     candles[0]['sma'] = sma
  58.     candles[0]['ema'] = ema
  59.  
  60. if __name__ == '__main__':
  61.     read_candles()
  62.  
  63.     # progress through the array of candles to calculate the indicators for each
  64.     # block of candles
  65.     position = 0
  66.     while position + EMA_LENGTH <= len(candles):
  67.         current_candles = candles[position:(position+EMA_LENGTH)]
  68.         current_candles = list(reversed(current_candles))
  69.         calculate(current_candles, EMA_SOURCE)
  70.         position += 1
  71.  
  72.     for candle in candles:
  73.         if 'sma' in candle:
  74.             print('{}: sma={} ema={}'.format(candle['ts'], candle['sma'], candle['ema']))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement