Advertisement
Guest User

Untitled

a guest
Apr 26th, 2017
61
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.35 KB | None | 0 0
  1. """ Post trades from Gemini to MongoDB """
  2.  
  3. from gemini.client import Client
  4. from pymongo import MongoClient
  5.  
  6. import os
  7. import datetime
  8. import time
  9.  
  10. def calc_sma(prices, periods):
  11. # TODO: implement later
  12. if len(prices) < periods:
  13. return 0
  14. else:
  15. return int(sum(prices[-periods::])/len(prices[-periods::]))
  16.  
  17. def calc_ema(prices, periods, ema):
  18. # TODO: implement later
  19. multiplier = (2 / (periods + 1))
  20.  
  21. if len(ema) == 0:
  22. return 0
  23. else:
  24. current_price = prices[-1]
  25. prev_ema = ema[-1]
  26. return ((current_price - prev_ema)*multiplier)+prev_ema
  27.  
  28. if __name__ == '__main__':
  29. # connect to Mongo
  30. try:
  31. client = MongoClient('mongodb://localhost:27017')
  32. except:
  33. print("Cannot connect to Mongo")
  34.  
  35. # epoch timestamps with ms
  36. current_time = int(time.time()*1000)
  37.  
  38. # client object for the exchange
  39. c = Client(os.environ['GEMINI_API_KEY'], os.environ['GEMINI_API_SECRET'])
  40.  
  41. # select database
  42. db = client['trades']
  43. gemini = db['gemini']
  44.  
  45. # figure out where we are starting
  46. pipeline = [
  47. {
  48. "$group": {
  49. "_id": "Latest Record",
  50. "latest": {
  51. "$max": "$timestampms"
  52. }
  53. }
  54. }
  55. ]
  56.  
  57. latest = list(db.gemini.aggregate(pipeline))
  58. try:
  59. start_time = latest[0]['latest']+1
  60. except IndexError:
  61. # new collection - start over with 1/1/2015
  62. start_time = 1420070400000
  63. print("Blank collection...setting new time")
  64.  
  65. while start_time < current_time:
  66.  
  67. s = start_time / 1000.0
  68. print(datetime.datetime.fromtimestamp(s).strftime('%Y-%m-%d %H:%M:%S.%f'))
  69.  
  70. # list to hold documents for bulk inserts
  71. doc = list()
  72.  
  73. data = c.get_trade_history(symbol='btcusd', since=start_time, limit_trades=500)
  74. if data:
  75. data.reverse()
  76.  
  77. for d in data:
  78. if d['timestampms'] > start_time:
  79. start_time = d['timestampms']+1
  80.  
  81. doc.append({
  82. "price": str(d['price']),
  83. "timestampms": d['timestampms'],
  84. "tid": d['tid'],
  85. "type": str(d['type']),
  86. "timestamp": d['timestamp'],
  87. "amount": d['amount'],
  88. "exchange": str(d['exchange'])
  89. })
  90.  
  91. # bulk insert this batch
  92. doc_ids = gemini.insert_many(doc)
  93. else:
  94. "Incrementing start time"
  95. start_time += 1
  96.  
  97. # cleanup: remove any duplicate transaction id's
  98. pipeline = [{
  99. "$group": {
  100. "_id": {
  101. "tid": "$tid"
  102. },
  103. "count": {
  104. "$sum": 1
  105. }
  106. }
  107. },{
  108. "$match": {
  109. "count": {
  110. "$gt": 1
  111. }
  112. }
  113. }]
  114.  
  115. dups = list(db.gemini.aggregate(pipeline))
  116. for d in dups:
  117. deleteOne = gemini.delete_one(d['_id'])
  118. print("Cleaning up duplicate tid's...")
  119.  
  120. print("Done\n")
  121.  
  122. # print some stats/sanity checks
  123. stats = db.command("collstats", "gemini")
  124. dups = list(db.gemini.aggregate(pipeline))
  125.  
  126. print("Size on disk:\t%s" % stats['storageSize'])
  127. print("Number of documents:\t%s" % gemini.count())
  128. print("Duplicate tid's:\t%s" % len(dups))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement