genBTC

goxapi queueing mysql commits - python

Apr 20th, 2013
151
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 4.90 KB | None | 0 0
  1. """
  2. trading robot breadboard
  3. """
  4. import time
  5. import goxapi
  6. import MySQLdb
  7. import decimal
  8. import threading
  9. import Queue
  10.  
  11. cPrec = 1E5
  12. bPrec = 1E8
  13.  
  14. class Strategy(goxapi.BaseObject):
  15.     # pylint: disable=C0111,W0613,R0201
  16.  
  17.     def __init__(self, gox):
  18.         goxapi.BaseObject.__init__(self)
  19.         self.signal_debug.connect(gox.signal_debug)
  20.         gox.signal_keypress.connect(self.slot_keypress)
  21.         gox.signal_strategy_unload.connect(self.slot_before_unload)
  22.         gox.signal_ticker.connect(self.slot_tick)
  23.         gox.signal_depth.connect(self.slot_depth)
  24.         gox.signal_trade.connect(self.slot_trade)
  25.         gox.signal_userorder.connect(self.slot_userorder)
  26.         gox.signal_wallet.connect(self.slot_wallet)
  27.         gox.orderbook.signal_changed.connect(self.slot_obook)
  28.         self.gox = gox
  29.         self.name = "%s.%s" % (__name__, self.__class__.__name__)
  30.         self.debug("%s loaded" % self.name)
  31.         import MySQLdb
  32.         self.conn = MySQLdb.connect(host= "127.0.0.1",
  33.         user="genBTC",
  34.         passwd="genBTC",
  35.         db="bitcoins")  
  36.         self.commit_queue = Queue.Queue()
  37.         self._terminate_commits = False
  38.         self._commit_thread = self.start_thread(self.do_commit_queue)
  39.         self.obook_lastcommit = 0
  40.          
  41.     def __del__(self):
  42.         self.debug("%s unloaded" % self.name)
  43.  
  44.     def start_thread(self, thread_func):
  45.         """start a new thread to execute the supplied function"""
  46.         thread = threading.Thread(target=thread_func)
  47.         thread.daemon = True
  48.         thread.start()
  49.         return thread
  50.  
  51.     def slot_wallet(self, gox, _dummy):
  52.         currencies = ['EUR','GBP','USD','AUD','JPY','RUB','PLN','BTC']
  53.         for cur in currencies:
  54.             cdec = cPrec
  55.             if cur == 'BTC':
  56.                 cdec = bPrec
  57.             self.commit_queue.put(("UPDATE market_data SET WalletBal=%s WHERE Item_Name=%s",((float(gox.wallet[cur])/cdec),repr(cur))))
  58.    
  59.     def slot_obook(self, orderbook, _dummy):
  60.         if time.time() - self.obook_lastcommit > 2:
  61.             for y in range(0,10):
  62.                 bidprice = float(self.gox.orderbook.bids[y].price)/cPrec
  63.                 bidvol = float(self.gox.orderbook.bids[y].volume)/bPrec
  64.                 self.commit_queue.put(("INSERT INTO depth_eur (price, volume, type, RecordTime) VALUES (%s,%s,'bids',%s)",(bidprice, bidvol, now)))
  65.             for y in range(0,10):
  66.                 askprice = float(self.gox.orderbook.asks[y].price)/cPrec
  67.                 askvol = float(self.gox.orderbook.asks[y].volume)/bPrec
  68.                 self.commit_queue.put(("INSERT INTO depth_eur (price, volume, type, RecordTime) VALUES (%s,%s,'asks',%s)",(askprice, askvol, now)))
  69.             self.commit_queue.put(("DELETE FROM depth_eur WHERE RecordTime != (SELECT x.RT FROM (SELECT MAX(t.RecordTime) AS RT FROM depth_eur t) x)",()))
  70.             self.obook_lastcommit = time.time()
  71.         else:
  72.             return
  73.  
  74.     def do_commit_queue(self):
  75.         """send queued requests to the DB"""
  76.         while self._terminate_commits == False:
  77.             success = False
  78.             message,allvalues = self.commit_queue.get(True)
  79.             while success == False:
  80.                 try:
  81.                     if allvalues == ():
  82.                         self.conn.cursor().execute(message)
  83.                     else:
  84.                         self.conn.cursor().execute(message,allvalues)
  85.                     success = True
  86.                 except (AttributeError, MySQLdb.OperationalError):
  87.                     self.debug("### Error,failure.Reconnecting")
  88.                     self.conn = MySQLdb.connect(host= "127.0.0.1", user="genBTC", passwd="genBTC", db="bitcoins")
  89.             self.commit_queue.task_done()
  90.        
  91.     def slot_before_unload(self, _sender, _data):
  92.         self.debug("%s before unload" % self.name)
  93.         self.conn.close()
  94.  
  95.     def slot_keypress(self, gox, (key)):
  96.         self.debug("someone pressed the %s key" % chr(key))
  97.  
  98.     def slot_tick(self, gox, (bid, ask)):
  99.         now = time.strftime('%Y-%m-%d %H:%M:%S')
  100.         newask = float(ask)/cPrec
  101.         newbid = float(bid)/cPrec
  102.         self.commit_queue.put(("UPDATE market_data SET AskVal=%s, BidVal=%s, TTime=%s WHERE Item_Name='EUR'",(newask, newbid, now)))
  103.  
  104.     def slot_depth(self, gox, (typ, price, volume, total_volume)):
  105.         pass
  106.  
  107.     def slot_trade(self, gox, (date, price, volume, typ, own)):
  108.         now = time.strftime('%Y-%m-%d %H:%M:%S')
  109.         goxlag = self.gox.order_lag/1E6
  110.         newprice = float(price)/cPrec
  111.         setVal = {"bid":"BidVal","ask":"AskVal"}
  112.         self.commit_queue.put(("UPDATE market_data SET {0}=%s, TTime=%s WHERE Item_Name='EUR'".format(setVal[typ]),(newprice, now)))
  113.         self.commit_queue.put(("UPDATE market_data SET Value1=%s WHERE Item_Name='GOXLAG'",(goxlag)))
  114.  
  115.     def slot_userorder(self, gox, (price, volume, typ, oid, status)):
  116.         pass
Add Comment
Please, Sign In to add comment