I have an array of objects from which I need to pass each object separately into async method (process behind is handled with Promise and then converted back to Observable via Observable.fromPromise(...)
- this way is needed because the same method is used in case just single object is passed anytime; the process is saving objects into database). For example, this is an array of objects:
[
{
"name": "John",
...
},
{
"name": "Anna",
...
},
{
"name": "Joe",,
...
},
{
"name": "Alexandra",
...
},
...
]
Now I have the method called insert which
which inserts object into database. The store
method from database instance returns newly created id. At the end the initial object is copied and mapped with its new id:
insert(user: User): Observable<User> {
return Observable.fromPromise(this.database.store(user)).map(
id => {
let storedUser = Object.assign({}, user);
storedUser.id = id;
return storedUser;
}
);
}
This works well in case I insert single object. However, I would like to add support for inserting multiple objects which just call the method for single insert. Currently this is what I have, but it doesn't work:
insertAll(users: User[]): Observable<User[]> {
return Observable.forkJoin(
users.map(user => this.insert(user))
);
}
The insertAll
method is inserting users as expected (or something else filled up the database with that users), but I don't get any response back from it. I was debugging what is happening and seems that forkJoin
is getting response just from first mapped user, but others are ignored. Subscription to insertAll
does not do anything, also there is no any error either via catch on insertAll
or via second parameter in subscribe to insertAll
.
So I'm looking for a solution where the Observable (in insertAll
) would emit back an array of new objects with users in that form:
[
{
"id": 1,
"name": "John",
...
},
{
"id": 2,
"name": "Anna",
...
},
{
"id": 3,
"name": "Joe",,
...
},
{
"id": 4,
"name": "Alexandra",
...
},
...
]
How can this be resolved?
To convert from array to observable you can use Rx.Observable.from(array)
.
To convert from observable to array, use obs.toArray()
. Notice this does return an observable of an array, so you still need to .subscribe(arr => ...)
to get it out.
That said, your code with forkJoin
does look correct. But if you do want to try from
, write the code like this:
insertAll(users: User[]): Observable<User[]> {
return Observable.from(users)
.mergeMap(user => this.insert(user))
.toArray();
}
Another more rx like way to do this would be to emit values as they complete, and not wait for all of them like forkJoin
or toArray
does. We can just omit the toArray
from the previous example and we got it:
insertAll(users: User[]): Observable<User> {
return Observable.from(users)
.mergeMap(user => this.insert(user));
}
As @cartant mentioned, the problem might not be in Rx, it might be your database does not support multiple connections. In that case, you can replace the mergeMap
with concatMap
to make Rx send only 1 concurrent request:
insertAll(users: User[]): Observable<User[]> {
return Observable.from(users)
.concatMap(user => this.insert(user))
.toArray(); // still optional
}