Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import { IMessageBusMessage,IMessageBus, IScene, IMessageBusLink } from "../../interfaces";
- import { Queue } from 'queue-typescript';
- import { MessageBusMessage } from "../../../messaging/messagebusmessage";
- import { link } from "fs";
- import { AblyMessageBus } from "../../../messaging/ablymessagebusmanager";
- import { TradingRigManagerScene } from "./tradingrigmanagerscene";
- import { Machine, MachineConfig, StateMachine, EventObject, interpret } from 'xstate';
- import { Interpreter } from "xstate/lib/interpreter";
- import { OrderUpdate } from "../../../market/orderupdate";
- import { AblyMessageBusLink } from "../../../messaging/messagebuslink";
- import { OrderStatus, MarketSide, OrderSide } from "../../../market/enums";
- export class OrderUpdateEvent
- {
- constructor(public type : string, public update:OrderUpdate)
- {
- }
- }
- //Acts as a bridge between the message bus and the scene
- //Message Bus messages are interpreted as events by the finite state machine
- //We create data objects representing market events (order updates) and
- //Place them on to a Queue managed by the Scene.
- export class TradingRigManagerLink extends AblyMessageBusLink
- {
- private fsm:StateMachine<Record<string,any>,any, EventObject>;
- private fsmService:Interpreter<Record<string,any>,any, EventObject>;
- constructor(public scene : TradingRigManagerScene)
- {
- super(scene);
- this.messageBus = new AblyMessageBus(this);
- this.fsm = Machine({
- id: "monitor",
- initial: "listening",
- states: {
- listening: {
- onEntry: 'connectToMessageBus',
- on: { SEND_KEY : "sending_key" }
- },
- sending_key: {
- onEntry: 'sendKeyToRig',
- on: { KEY_RECEIVED : "monitoring_account" }
- },
- monitoring_account: {
- on: { ORDER_UPDATE: {actions: "updateScene"} }
- },
- updating_scene: {
- onEntry: 'updateScene',
- on: { SCENE_UPDATED: "monitoring_account" }
- }
- }
- },
- {
- actions: {
- connectToMessageBus: (ctx, event) => {
- this.messageBus.connect("eZKP_Q.Rt63UA:C7sWf1WgiFmdJU_p");
- this.messageBus.joinChannel("test");
- },
- sendKeyToRig: (ctx, event) => {
- console.log("sending key to rig");
- this.messageBus.sendMessage("SENDING_KEY","0GhabqBc0mhrzfcZsUepT7JEie8JRYCjmDY3zyYiAChsEvAOXSsqKYA6A95gMQfy");
- },
- updateScene: (ctx, event) => {
- let orderUpdateEvent : OrderUpdateEvent = event as OrderUpdateEvent;
- console.log("adding order update to scene...");
- let orderUpdate = orderUpdateEvent.update;
- console.log(orderUpdate.id + "|" + orderUpdate.symbol + "|" + orderUpdate.side + "|" + orderUpdate.price + "|" + orderUpdate.quantity);
- scene.orderUpdateQueue.enqueue(orderUpdate);
- }
- }
- });
- this.fsmService = interpret(this.fsm);
- this.fsmService.start();
- }
- connect(apikey: string)
- {
- this.messageBus.connect(apikey)
- }
- joinChannel(channelName: string) {
- throw new Error("Method not implemented.");
- }
- sendMessage(topic: string, message: string) {
- throw new Error("Method not implemented.");
- }
- processEvent(eventName: string, eventData:string)
- {
- console.log('link processing event: ' + eventName + " : " + eventData);
- switch(eventName)
- {
- case "ORDER_UPDATE":
- {
- var orderUpdateObject = JSON.parse(eventData);
- let orderUpdate : OrderUpdate = new OrderUpdate(orderUpdateObject.s,
- orderUpdateObject.S,
- orderUpdateObject.q,
- orderUpdateObject.p,
- orderUpdateObject.X,
- orderUpdateObject.T,
- orderUpdateObject.i);
- let orderUpdateEvent : OrderUpdateEvent = new OrderUpdateEvent("ORDER_UPDATE",orderUpdate);
- this.fsmService.send(orderUpdateEvent);
- break;
- }
- default:
- {
- this.fsmService.send(eventName);
- }
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment