Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import { Injectable } from '@angular/core';
- import { BehaviorSubject, Observable, Subject } from 'rxjs/Rx';
- import * as Stomp from '@stomp/stompjs';
- import * as SockJS from 'sockjs-client';
- import { ApiWorkerService } from '../api/api-worker.service';
- import { filter, map } from 'rxjs/operators';
- @Injectable({
- providedIn: 'root'
- })
- export class WebSocketService {
- private messageSubject: Subject<{ code: string, body: any }> = new Subject<{ code: string, body: any }>();
- private state: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(false);
- private readonly ws: WebSocket;
- private readonly client: Stomp.Client;
- constructor(private api: ApiWorkerService) {
- this.ws = new SockJS(this.api.prepareUrl('ws'));
- this.client = Stomp.over(this.ws);
- this.client.connect({}, () => {
- this.state.next(true);
- this.client.subscribe('/subscribe', message => {
- this.messageSubject.next({
- code: message.headers['code'],
- body: message.body
- });
- });
- }, () => {
- console.error('WebSocket connection error');
- this.state.next(false);
- });
- }
- public getObservable<T>(code: string): Observable<T> {
- return this.messageSubject.asObservable().pipe(
- filter(data => {
- return data.code === code;
- }),
- map(data => {
- return data.body as T;
- })
- );
- }
- }
Add Comment
Please, Sign In to add comment