Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import { EventStreamConnectionOptions } from './EventStreamConnectionOptions';
- import client = require ('node-eventstore-client');
- import Long from 'long';
- export class EventStreamConnectionFactory{
- public static getConnection( host:string, port:string, opts: any = {} ){
- return client.createConnection(opts, "tcp://" + host + ":" + port);
- }
- }
- export class GenericEventStreamProcessor{
- protected connection: client.EventStoreNodeConnection;
- protected isLive: boolean = false;
- protected connOptions: EventStreamConnectionOptions;
- protected streamName: string;
- constructor( streamName: string, opts: EventStreamConnectionOptions ){
- this.connOptions = opts;
- this.connection = EventStreamConnectionFactory.getConnection(opts.eventStoreServer, opts.eventStorePort);
- this.streamName = streamName;
- }
- highWaterMark: Long = Long.fromNumber(0);
- onEventJSON(event: client.ResolvedEvent, eventType: string, json: string) {
- throw new Error("Method not implemented.");
- }
- handleRawEventJSON(event: client.ResolvedEvent, json:string ){
- if(!event.event)
- return;
- let eventType = event.event.eventType;
- try{
- this.onEventJSON(event, eventType, json);
- if(event && event.link){
- this.highWaterMark = event.link.eventNumber;
- }
- }
- catch(e){
- console.log('Error processing event on stream' + this.streamName + ' :');
- if(event.link && event.link.eventNumber)
- console.log('Event number ' + event.link.eventNumber.toString());
- console.log(e);
- process.exit(1);
- }
- }
- public eventAppeared(subscription: client.EventStoreCatchUpSubscription, event: client.ResolvedEvent){
- if(event.event && event.event.data){
- let jsonData = JSON.parse(event.event.data.toString());
- this.handleRawEventJSON(event, jsonData);
- }
- }
- public liveProcessing(subscription: client.EventStoreCatchUpSubscription){
- console.log('gne live ' + this.streamName);
- this.isLive = true;
- }
- public subscriptionDropped(subscription: client.EventStoreCatchUpSubscription, reason: string, error?: Error){
- console.log('Error in event stream - subscription dropped:');
- console.log(reason);
- console.log(error);
- if(process.env.NODE_ENV == 'production')
- process.exit(1);
- }
- public async init(){
- try{
- // open a connection
- await this.connection.connect();
- let esCredentials = { username: this.connOptions.eventStoreUser || '', password: this.connOptions.eventSTorePass || '' };
- // start streaming
- this.connection.subscribeToStreamFrom(
- this.streamName,
- this.connOptions.highWatermark,
- true,
- (a,b) => {this.eventAppeared(a,b)},
- (sub)=>{this.liveProcessing(sub)},
- (a,b)=>{this.subscriptionDropped(a,b)},
- esCredentials,
- 1);
- }
- catch(e){
- console.log('Error in event stream processor init:');
- console.log(e);
- process.exit(1);
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement