How to project one observable to another?

Hello folks,
I need to make an Observable<User[]> from an Observable<Transaction[]>.

Transaction and User classes look like:

export class Transaction {
  userId: string;
  // some other properties
}

export class User {
  userId: string;
  // some other properties
}

To get a User object for a given userId, I should use my UserService as following:

this.userService.getUser(userId)
  .subscribe(u => this.users = u);
}

Similarly I’ve got a TransactionService that gives me all the Transaction objects as an Observable<Transaction[]>

let transactionCandidates: Observable<Transaction[]> = this.transactionService.getTransactions();

What I need is to get is a list of all users that have done a transaction. For doing this I tried the following, but it doesn’t work. Can you please let me know what my mistakes are?

users: Observable<User[]>;
...
...
let transactionCandidates: Observable<Transaction[]> = this.transactionService.getTransactions();
this.users = transactionCandidates
   .map<Transaction[], User[]>(transactions => {
         let users: User[]; 
          transactions.forEach(transaction => {
             this.userService.getUser(transaction.userId)
                .subscribe(user => users.push(user));
          });
          return users;
     });

Ionic Framework: 3.9.2
Ionic App Scripts: 3.1.10
Angular Core: 5.2.11
Angular Compiler CLI: 5.2.11
Node: 8.11.1
OS Platform: Windows 7
Navigator Platform: Win32
User Agent: Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36

The fundamental rule I try to follow to avoid problems like this is “always separate production and consumption”. In practical terms, this means avoiding subscribe in providers.

So what do we have?

  1. a source of Observable<Transaction[]>.
  2. a way to convert a Transaction into an Observable<User>

When your #2 is capable of turning an A into a B, map is your friend. However, as we have here, when it can turn an A into an Observable<B>, we need mergeMap instead.

transactedUsers(): Observable<User[]> {
  return this.transactionService.getTransactions().pipe(
    mergeMap(txen => txen.map(tx => this.userService.getUser(tx.userId))));
}
1 Like

Thanks for the code snippet. The thing is that all the services mentioned above are 3rd party APIs that I have no control over them. I tried your code. It’s close, but not working as expected because tx is actually an array of type Transaction[].

By the way I imported mergeMap using import { mergeMap } from "rxjs/operators"; I hope it’s correct.

OK, see edited post above.

Thanks for the prompt reply. This is even closer, but it returns an Observable<Observable<user>> instead of Observable<User[]>

Serves me right for trying to type these out on the fly instead of actually testing them. Take a look at this wombat service; I think you will be able to adapt the fundamental structure to your use case.

Perfect. thanks. I changed the code per your wombat sample, and it seems to be working. I don’t get a compile error anymore and I’m sure I will get the correct results when running the app.

So for other people who may have similar question, I should say based on @rapropos’s code, the solution is

this.users =  this.transactionService.getTransactions().pipe(
        mergeMap(ns => forkJoin(ns.map(n => this.userService.getUser(n.userId)))));
2 Likes

So in practise the above code didn’t work. But the following does work:

let usersArray: User[] = []; 
this.users = Observable.of(usersArray); 

this.transactionService.getTransactions()
  .switchMap(transactionsArray => 
    transactionsArray.map(transaction =>
      this.userService.getUser(transaction.userId)).reduce(x => x)).subscribe(
        user => usersArray.push(user));

Sorry, but I know the wombat code works, and the way you have structured this you are combining Observable creation and subscription in the same place, which makes for overly coupled and hard-to-maintain designs.

The problem in your previous suggestion is that you are trying to assign to this.users. You don’t want to be assigning anything in functions that create Observables. You simply return the Observable, and let the client (usually a page) update its own copy of concrete data (such as User[]) using subscribe.

I would respectfully ask you to reconsider this structure, especially because it is a common enough scenario that you are likely to end up married to whatever you first feel satisfied with.

Thanks for the comment, but if do

getUsers(): Observable<User[]> {
 retrun  this.transactionService.getTransactions().pipe(
        mergeMap(ns => forkJoin(ns.map(n => this.userService.getUser(n.userId)))));
}

And then in my page I do:

getUsers.subscribe(s => {
   console.log("there is something to subscribed to: ", s);
});

Nothing is logged on the console!!!

Actually in my real code on the html, I need to use this observale<User[]> i.e. users as following:

<button ion-item *ngFor="let user of users | async" class="user" (click)="doSomthing(user)">

Apologies if this is just a typo, but this isn’t calling getUsers.

My apologies for not adding full code in my previous comment. It’s called:

ngOnInit() {
    console.log("ngOnInit => getUsers()");
    this.users = this.getUsers();  
  }

And I was hopping that *ngFor="let user of users | async" will subscribe to it and does its magic.

Does your code still state retrun instead of return?

Oh sorry, no that’s my typo. And the code builds fine with no error on my machine, but no buttons are added to HTML, which mean the observable doesn’t work.

I resurrected the wombats from that other post and used this in HomePage:

wombats: Observable<string[]>;

constructor(private _wombats: WombatService) {
}

ngOnInit(): void {
  this.wombats = this._wombats.wombats();
}
<div>{{wombats | async}}</div>

After some time passes, I see “4 wombats, 16 wombats, 36 wombats, 64 wombats”. I wonder what could be different from your situation. Could it possibly be an rxjs version issue? I’m still using ionic 3 / rxjs 5, fwiw.

Mine are (in package.json):

"rxjs": "^6.2.2",
"rxjs-compat": "^6.2.2",