Guest User

Untitled

a guest
Dec 19th, 2018
142
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.42 KB | None | 0 0
  1. from decimal import Decimal
  2.  
  3. from cryptofeed.defines import BID, ASK
  4. from cryptofeed.backends._util import book_convert
  5.  
  6. import asyncio
  7. import aio_pika
  8. import json
  9.  
  10. class RabbitCallback:
  11. def __init__(self,
  12. host='127.0.0.1',
  13. port=5672,
  14. username='guest',
  15. password='guest',
  16. routing_key='test_queue',
  17. **kwargs):
  18. self.url = f'amqp://{username}:{password}@{host}:{port}/'
  19. self.routing_key = routing_key
  20.  
  21. async def connect(self):
  22. loop = asyncio.get_event_loop()
  23. return await aio_pika.connect_robust(self.url, loop=loop)
  24.  
  25. async def publish(self, message):
  26. connection = await self.connect()
  27. channel = await connection.channel()
  28. print(message)
  29. await channel.default_exchange.publish(
  30. aio_pika.Message(
  31. body=message.encode()
  32. ),
  33. routing_key=self.routing_key
  34. )
  35. await connection.close()
  36.  
  37.  
  38. class TradeRabbitCallback(RabbitCallback):
  39. async def __call__(self, *, feed: str, pair: str, side: str, amount: Decimal, price: Decimal, order_id=None, timestamp=None):
  40. trade = {'feed': feed, 'pair': pair, 'id': order_id, 'timestamp': timestamp, 'side': side, 'amount': float(amount), 'price': float(price)}
  41. data = {'type': 'trade', 'data': trade}
  42. await self.publish(json.dumps(data))
  43.  
  44.  
  45. class FundingRabbitCallback(RabbitCallback):
  46. async def __call__(self, **kwargs):
  47. for key in kwargs:
  48. if isinstance(kwargs[key], Decimal):
  49. kwargs[key] = float(kwargs[key])
  50.  
  51. data = {'type': 'funding', 'data': kwargs}
  52. await self.publish(json.dumps(data))
  53.  
  54.  
  55. class BookRabbit(RabbitCallback):
  56. def __init__(self, *args, **kwargs):
  57. super().__init__(*args, **kwargs)
  58. self.depth = kwargs.get('depth', None)
  59. self.previous = {BID: {}, ASK: {}}
  60.  
  61. async def __call__(self, *, feed, pair, book, timestamp):
  62. data = {'timestamp': timestamp, BID: {}, ASK: {}}
  63. book_convert(book, data, self.depth)
  64. upd = {'type': 'book', 'feed': feed, 'pair': pair, 'data': data}
  65.  
  66. if self.depth:
  67. if upd['data'][BID] == self.previous[BID] and upd['data'][ASK] == self.previous[ASK]:
  68. return
  69. self.previous[ASK] = upd['data'][ASK]
  70. self.previous[BID] = upd['data'][BID]
  71.  
  72. await self.publish(json.dumps(upd))
Add Comment
Please, Sign In to add comment