htmlangularrxjsrxjs-pipeable-operatorsangular-signals

Prevent last emission - causes duplicate message


I need the LAST emission to call signals.onResponse(), thus the tap, so that my deep chat ui completes and re-enables the submit button after last message. I have to hack deep chat and use the addMessage as multiple messages can be emitted (deep chat does not support this out of the box), that is what the streamPending$ observable is for, it emits false when all messages have been received and that is when I need to call signals.onResponse(). Working with angular, rxjs and deep chat. This is my current code relevant to question. (messages()) is an input in angular, not a function)

combineLatest([this.service.streamPending$.pipe(skip(1)), this.messages()]).pipe(
  tap(([bool, message]) => {
    console.log('bool')
    if (bool) {
      signals.onResponse({ text: message } as MessageContent);
      signals.onClose();
    }
  }),
  takeWhile(([bool]) => !bool)
).subscribe(([bool, message]) => {
  if (!bool) {
    (this.chatElementRef()?.nativeElement as any).addMessage({
      text: message,
      role: 'ai'
    });
  }
});

console.log('bool'); is printed twice on the second message (if exists), which obviously shows the second message twice in my UI. I understand why, but don't know how to fix it. Help appreciated!


Solution

  • For now at least I abandoned the complete rxjs setup and used a variable for the message from api, so this now works, but I of course would like a rxjs solution :)

    const pendingSub = this.service.streamPending$.pipe(skip(1)).subscribe(bool => {
      // use variable instead
      const currentMessage = this.service.lastMessage;
      if (bool) {
        signals.onResponse({ text: currentMessage } as MessageContent);
        pendingSub.unsubscribe();
        signals.onClose();
      } else {
        (this.chatElementRef()?.nativeElement as any).addMessage({
          text: currentMessage,
          role: 'ai'
        });
      }
    });