htmlangularrxjsserver-sent-eventsngzone

ERROR Error: InvalidPipeArgument: '[object Object]' for pipe 'AsyncPipe' even when returning an observable


I found few questions with same title and, as far as I could see, some of them are suggesting that the solution is basically return an Observable instead of an array (others are about FireBase which isn't my case). Well, as far I am concern, the code below does return an Observable (look at "getServerSentEvent(): Observable {return Observable.create ...")

My final goal is get all events from a stream returned from a Rest WebFlux. I didn't past bellow the backend because I am pretty sure the issue is related to some mistake in Angular.

On top of that, I can debug and see the events properlly comming to extratos$ from app.component.ts(see image bellow).

Whole logs

core.js:6185 ERROR Error: InvalidPipeArgument: '[object Object]' for pipe 'AsyncPipe'
    at invalidPipeArgumentError (common.js:5743)
    at AsyncPipe._selectStrategy (common.js:5920)
    at AsyncPipe._subscribe (common.js:5901)
    at AsyncPipe.transform (common.js:5879)
    at Module.ɵɵpipeBind1 (core.js:36653)
    at AppComponent_Template (app.component.html:8)
    at executeTemplate (core.js:11949)
    at refreshView (core.js:11796)
    at refreshComponent (core.js:13229)
    at refreshChildComponents (core.js:11527)

app.component.ts

import { Component, OnInit } from '@angular/core';
import { AppService } from './app.service';
import { SseService } from './sse.service';
import { Extrato } from './extrato';
import { Observable } from 'rxjs';

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  providers: [SseService],
  styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
  //extratos: any;
  extratos$ : Observable<any>;

  constructor(private appService: AppService, private sseService: SseService) { }

  ngOnInit() {
    this.getExtratoStream();
  }

  getExtratoStream(): void {
    this.sseService
      .getServerSentEvent("http://localhost:8080/extrato")
      .subscribe(
        data => {
          this.extratos$ = data;
        }
      );
  }
}

sse.service.ts

import { Injectable, NgZone } from '@angular/core';
import { Observable } from 'rxjs';
import { Extrato } from './extrato';

@Injectable({
  providedIn: "root"
})
export class SseService {
  extratos: Extrato[] = [];
  constructor(private _zone: NgZone) { }

  //getServerSentEvent(url: string): Observable<Array<Extrato>> {
  getServerSentEvent(url: string): Observable<any> {
    return Observable.create(observer => {
      const eventSource = this.getEventSource(url);
      eventSource.onmessage = event => {
        this._zone.run(() => {
          let json = JSON.parse(event.data);
          this.extratos.push(new Extrato(json['id'], json['descricao'], json['valor']));
          observer.next(this.extratos);
        });
      };
      eventSource.onerror = (error) => {
        if (eventSource.readyState === 0) {
          console.log('The stream has been closed by the server.');
          eventSource.close();
          observer.complete();
        } else {
          observer.error('EventSource error: ' + error);
        }
      }

    });
  }
  private getEventSource(url: string): EventSource {
    return new EventSource(url);
  }
}

app.component.html

<h1>Extrato Stream</h1>
<div *ngFor="let ext of extratos$ | async">
  <div>{{ext.descricao}}</div>
</div>

Evidence that the observable extratos$ is filled in

enter image description here


Solution

  • When you write this observer.next(this.extratos);, that means this.extratos is what you get on component side in the data argument of the callback, so when you do this this.extratos$ = data; you are actually storing the extratos Array. TypeScript doesn't complain about it, probably because it's not smart enough to infer the types when you build an Observable from scratch like you did.

    Try this:

    this.extratos$ = this.sseService
          .getServerSentEvent("http://localhost:8080/extrato");
    

    and in the template: <div *ngFor="let ext of extratos$ | async">