Использование RxJs и Angular 2 для обработки событий, отправленных сервером

Я пытаюсь отобразить значения, отправленные сервером, в приложении angular 2/RxJs.

Серверная часть регулярно отправляет отдельные строки клиенту через события, отправленные сервером.

Я не уверен, как обращаться с полученными значениями на стороне angular 2/RxJs.

Вот мой клиент (компонент ng):

import {Component, OnInit} from 'angular2/core';
import {Http, Response} from 'angular2/http';
import 'rxjs/Rx';
import {Observable}     from 'rxjs/Observable';

@Component({
    selector: 'my-app',
    template: `<h1>My second Angular 2 App</h1>
    <ul>
        <li *ngFor="#s of someStrings | async">
           a string: {{ s }}
        </li>
    </ul>
    `
})
export class AppComponent implements OnInit {

    constructor(private http:Http) {
    }

    errorMessage:string;
    someStrings:string[];

    ngOnInit() {
        this.getSomeStrings()
            .subscribe(
                aString => this.someStrings.push(aString),
                error => this.errorMessage = <any>error);
    }

    private getSomeStrings():Observable<string> {
        return this.http.get('interval-sse-observable')
            .map(this.extractData)
            .catch(this.handleError);
    }

    private extractData(res:Response) {
        if (res.status < 200 || res.status >= 300) {
            throw new Error('Bad response status: ' + res.status);
        }
        let body = res.json();
        return body || {};
    }

    private handleError(error:any) {
        // In a real world app, we might send the error to remote logging infrastructure
        let errMsg = error.message || 'Server error';
        console.error(errMsg); // log to console instead
        return Observable.throw(errMsg);
    }
}

Бэкэнд-метод выглядит следующим образом (и использует RxJava):

   @ResponseStatus(HttpStatus.OK)
   @RequestMapping(method = RequestMethod.GET, path = "interval-sse-observable")
    public SseEmitter tickSseObservable() {
        return RxResponse.sse(
                Observable.interval(5, TimeUnit.SECONDS, Schedulers.io())
                        .map(tick -> randomUUID().toString())
        );
    }

Я только что заметил, что приложение зависает на запросе и на странице ничего не отображается.

Я подозреваю, что есть проблема с использованием метода карты, то есть .map(this.extractData).

Я просто хотел бы добавить входящие строки в массив и отобразить этот массив в шаблоне, который будет обновляться по мере поступления строк.

Кто-нибудь может помочь?

изменить: вот рабочее решение (благодаря ответу Тьерри ниже):

import {Component, OnInit} from 'angular2/core';
import 'rxjs/Rx';

@Component({
    selector: 'my-app',
    template: `<h1>My second Angular 2 App</h1>
    <ul>
        <li *ngFor="#s of someStrings">
           a string: {{ s }}
        </li>
    </ul>
    `
})
export class AppComponent implements OnInit {

    someStrings:string[] = [];

    ngOnInit() {
        let source = new EventSource('/interval-sse-observable');
        source.addEventListener('message', aString => this.someStrings.push(aString.data), false);
    }
}

person balteo    schedule 23.04.2016    source источник
comment
Не могли бы вы сообщить мне, смогли ли вы заставить это работать с angular 4? Я использую import {Component, OnInit} из '@angular/core'; и он не находит класс EventSource. Не могли бы вы сообщить мне, если вы установили какой-либо другой пользовательский пакет? Был бы признателен, если бы вы могли поделиться полным кодом на github или около того, для справки. Спасибо.   -  person csharpnewbie    schedule 20.12.2017


Ответы (3)


Вы не можете использовать класс Http Angular2 для обработки событий на стороне сервера, поскольку он основан на объекте XHR.

Вы можете использовать объект EventSource:

var source = new EventSource('/...');
source.addListener('message', (event) => {
  (...)
});

См. эти статьи:

person Thierry Templier    schedule 23.04.2016
comment
Привет, Тьерри. Это вопрос немного не по теме: рекомендуется ли сегодня использовать SSE? Будет ли SSE заменена http2? Есть ли лучшие альтернативы SSE? Я заметил, что некоторые браузеры вообще не поддерживают SSE... - person balteo; 24.04.2016

Вот рабочий пример:

SseService

import {Injectable} from '@angular/core';
import {Observable} from 'rxjs/Observable';

declare var EventSource;

@Injectable()
export class SseService {

    constructor() {
    }

    observeMessages(sseUrl: string): Observable<string> {
        return new Observable<string>(obs => {
            const es = new EventSource(sseUrl);
            es.addEventListener('message', (evt) => {
                console.log(evt.data);
                obs.next(evt.data);
            });
            return () => es.close();
        });
    }
}

Компонент приложения

import {Component, OnDestroy, OnInit} from '@angular/core';
import {SseService} from './shared/services/sse/sse.service';
import {Observable, Subscription} from 'rxjs/Rx';

@Component({
    selector: 'my-app',
    template: `<h1>Angular Server-Sent Events</h1>
    <ul>
        <li *ngFor="let message of messages">
             {{ message }}
        </li>
    </ul>
    `
})
export class AppComponent implements OnInit, OnDestroy {
    private sseStream: Subscription;
    messages:Array<string> = [];

    constructor(private sseService: SseService){
    }

    ngOnInit() {
        this.sseStream = this.sseService.observeMessages('https://server.com/mysse')
                        .subscribe(message => {
                            messages.push(message);
                        });
    }

    ngOnDestroy() {
        if (this.sseStream) {
            this.sseStream.unsubscribe();
        }
    }
}
person abahet    schedule 29.12.2017
comment
Небольшое объяснение было бы неплохо, но я все равно не могу перестать голосовать за это :) - person pulsejet; 22.01.2018
comment
очень секси! Самоочевидный, очень красиво сделанный абахет. Отлично спасибо! - person user3777549; 07.03.2018
comment
как бы вы добавили это в NGRX Effects? - person FussinHussin; 18.09.2018
comment
Отличный ответ! и красиво сделано... - person Neminda Prabhashwara; 26.12.2020

Чтобы добавить к ответ Тьерри, по умолчанию используется тип события "сообщение". Однако тип события может быть любым ('чат', 'журнал' и т. д.) в зависимости от реализации на стороне сервера. В моем случае первые два события с сервера были «сообщением», а остальные — «журналом». Мой код выглядит так, как показано ниже

var source = new EventSource('/...'); source.addListener('message', message => { (...) }); source.addListener('log', log => { (...) });

person rudrasiva86    schedule 26.01.2018