javascripttypescriptrxjsobservableconcatmap

complete dynamic array observables one by one (concatMap)


I am making a chat app and I would like to save messages in the correct order.

Imagine, I would have a static number of messages

// 4 messages. array of static length: 4
chatMessages: string[] = ['hello', 'world', 'and', 'stack overflow members']; // 

now, let's create a save observables for them.

chatMessages: Observable<ChatMessage>[] = chatMessages.map((message: string) => {
    return chatService.saveMessage(message); // returns an Observable to call API
})

Now, I want to save them one by one, one after another.

I do it this way:

from(chatMessages).pipe(
    concatMap((observable) => observable),
     toArray(),
     take(1)
).subscribe();

Now My question is, if the initial chatMessages array is dynamic - (can be added a message in any point of time, even during saving). How do I loop the array to save chat messages one by one, keeping the order they were added ?

For example: two out of four messages were saved, the 3rd is being processed, and in that moment the 5th message is added to the chatMessages array. How do I manage that?


Solution

  • If the initial chatMessages array is dynamic - (can be added a message in any point of time, even during saving)

    You are describing an Observable of Message (string), not an array! Since you are processing items one at a time, there is no need for array.

    You can just use a simple subject that emits messages as they are received and have one subscription to that stream that saves the messages for you:

    chatMessage$ = new Subject<string>();
    
    function saveMessage(message: string) {
        chatMessage$.next(message);
    }
    
    chatMessage$.pipe(
        concatMap(message => chatService.saveMessage(message))
    ).subscribe();
    
    

    This will processes the new messages one at a time, in the correct order.