Advertisement
Guest User

Untitled

a guest
Oct 22nd, 2019
128
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import { EventStreamConnectionOptions } from './EventStreamConnectionOptions';
  2.  
  3. import client = require ('node-eventstore-client');
  4. import Long from 'long';
  5.  
  6.  
  7.  
  8. export class EventStreamConnectionFactory{
  9.     public static getConnection( host:string, port:string, opts: any = {} ){
  10.         return client.createConnection(opts, "tcp://" + host + ":" + port);
  11.     }
  12. }
  13.  
  14.    
  15.  
  16. export class GenericEventStreamProcessor{
  17.  
  18.     protected connection: client.EventStoreNodeConnection;
  19.     protected isLive: boolean = false;
  20.     protected connOptions: EventStreamConnectionOptions;
  21.     protected streamName: string;
  22.  
  23.     constructor( streamName: string, opts: EventStreamConnectionOptions ){
  24.         this.connOptions = opts;
  25.         this.connection = EventStreamConnectionFactory.getConnection(opts.eventStoreServer, opts.eventStorePort);
  26.         this.streamName = streamName;
  27.     }
  28.  
  29.  
  30.     highWaterMark: Long = Long.fromNumber(0);
  31.  
  32.     onEventJSON(event: client.ResolvedEvent, eventType: string, json: string) {
  33.         throw new Error("Method not implemented.");
  34.     }
  35.  
  36.     handleRawEventJSON(event: client.ResolvedEvent, json:string ){
  37.         if(!event.event)
  38.             return;
  39.         let eventType = event.event.eventType;
  40.  
  41.         try{
  42.             this.onEventJSON(event, eventType, json);
  43.  
  44.             if(event && event.link){
  45.                 this.highWaterMark = event.link.eventNumber;
  46.             }
  47.         }
  48.         catch(e){
  49.             console.log('Error processing event on stream' + this.streamName + ' :');
  50.             if(event.link && event.link.eventNumber)
  51.                 console.log('Event number ' + event.link.eventNumber.toString());
  52.             console.log(e);
  53.             process.exit(1);
  54.         }
  55.  
  56.     }
  57.  
  58.  
  59.     public eventAppeared(subscription: client.EventStoreCatchUpSubscription, event: client.ResolvedEvent){
  60.         if(event.event && event.event.data){
  61.             let jsonData = JSON.parse(event.event.data.toString());
  62.             this.handleRawEventJSON(event, jsonData);
  63.         }
  64.     }
  65.  
  66.     public liveProcessing(subscription: client.EventStoreCatchUpSubscription){
  67.         console.log('gne live ' + this.streamName);
  68.         this.isLive = true;        
  69.     }
  70.  
  71.     public subscriptionDropped(subscription: client.EventStoreCatchUpSubscription, reason: string, error?: Error){
  72.         console.log('Error in event stream - subscription dropped:');
  73.         console.log(reason);
  74.         console.log(error);
  75.         if(process.env.NODE_ENV == 'production')
  76.             process.exit(1);
  77.     }
  78.  
  79.     public async init(){
  80.         try{
  81.             // open a connection
  82.             await this.connection.connect();
  83.  
  84.             let esCredentials = { username: this.connOptions.eventStoreUser || '', password: this.connOptions.eventSTorePass || '' };
  85.                
  86.             // start streaming
  87.             this.connection.subscribeToStreamFrom(
  88.                 this.streamName,
  89.                 this.connOptions.highWatermark,
  90.                 true,
  91.                 (a,b) => {this.eventAppeared(a,b)},
  92.                 (sub)=>{this.liveProcessing(sub)},
  93.                 (a,b)=>{this.subscriptionDropped(a,b)},
  94.                 esCredentials,
  95.             1);
  96.                
  97.  
  98.            
  99.         }
  100.         catch(e){
  101.             console.log('Error in event stream processor init:');
  102.             console.log(e);
  103.             process.exit(1);
  104.         }
  105.     }
  106. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement