Help making Observable.forkJoin() work


#1

Hello guys, I’m making an interceptor to get http requests and add some headers to it.

My interceptor is as follows:

intercept(request: HttpRequest<any>, next: HttpHandler): Observable<HttpEvent<any>> {
    if(request.url.includes("apigee") && !request.url.includes(this.urlLoginApigee)) {   //si la peticion es hacia sanitas (que pasan por apigee) y no de login, le añadimos los headers
      request = request.clone({
        setHeaders: {
          'Content-Type':  'application/json',
          'Authorization': 'Bearer ' + this.globalVariable.apigeeToken
        }
      });
    }

    return next.handle(request)
  }

This works fine so far. Next I’ve modified the get metod from the api provider that is generated automatically in order to do the following:

  • Perform a http request (a get)
  • If the requests goes well and doesn’t return a 401 (unauthorized) error then return the response.
  • If the request gets a 401 error then call a method that gets the apigee token and then relaunch the initial request.

This is how I do it and I’m not sure if it’s the right method but works. This if from a provider that I’ll call api from now on:

get(endpoint: string, params?: any, reqOpts?: any): Observable<any> {
    if (!reqOpts) {
      reqOpts = {
        params: new HttpParams()
      };
    }

    if (params) {
      reqOpts.params = new HttpParams();
      for (let k in params) {
        reqOpts.params = reqOpts.params.set(k, params[k]);
      }
    }

    return new Observable((observer) => {
      this.http.get(endpoint, reqOpts).subscribe(response => {
          observer.next(response);
        }, (err: any) => {
            if (err.status == 401) {
              this.apigee.login().subscribe(data => {   //the login method is just a http post done to apigee to get the token
                this.apigee.setToken(data.access_token);
                this.get(endpoint, params, reqOpts).subscribe((response: any) => {
                  observer.next(response);
                });
              })
            }
        });
    }).catch(error => {return Observable.throw("");});
  }

Ok now the problem is that if I try to do

Observable.forkJoin(method1, method2).subscribe(data => console.log("hello"))

Where method1 and method2 are two method that return an Observable using the get method that I put above.

With this, the forkJoin doesn’t get inside, it never prints hello

And I’ve proven that it’s because of how I have the get method from the api provider because if I use the http angular library directly it works perfectly.

Or even if I use the following:

Observable.forkJoin(Observable.of(1), Observable.of(2)).subscribe(data => console.log("hello"))

It works.

Does anyone know how to fix my get method to make this work?


#2

Hopefully there would be a way to restructure this so that you’re not needing to manually create Observables, but I can tell you what the fundamental problem is, because it bit me as well. forkJoin waits for all of its children to complete (not just emit something), and if any of them don’t, it will turn into a mysterious black hole. So the first thing I would try is adding observer.complete() calls after the places where you call next().


#3

Thanks you very much, this drove me crazy today.

Putting observer.complete() after observer.next(...) has made it

I’m seeing that all this observables stuff is a whole world and has many stuff.

Thanks


#4

Also @rapropos I’d like to ask you if thats how the get method (after adding the observer.complete()) should be done with what I want to achieve.

It works now but wanna know if that’s how it should be done if you wanna do something before returning the definitive value

Thanks


#5

I would do something more like this (off the top of my head, totally untested):

class TokenService {
  token = "";

  constructor(private _http: HttpClient) {}

  refresh(): Observable<string> {
    return this._http.get(TOKEN_URL).pipe(
      tap(t => this.token = t)
    );
  }
}

class ApigeeHttpInterceptor implements HttpInterceptor {
  constructor(private _tokens: TokensService) {}

  addAuthHeader(req: HttpRequest<any>): HttpRequest<any> {
    return req.clone({setHeaders: {Authorization: 'Bearer ' + this._tokens.token}});
  }

  intercept(req: HttpRequest<any>, next: HttpHandler): Observable<HttpEvent<any>> {
    return next.handle(this._addAuthHeader(req)).pipe(
      catchError((err) => {
        if (err instanceof HttpErrorResponse) {
          if (err.status === 401) {
            return this._tokens.refresh().pipe(
              switchMap(() => next.handle(this._addAuthHeader(req)))
            );
         }
        // deal with other errors
       }
     })
    );
  }
}