Callbacks not executing completely (for async events)

Hi folks, I posted this in SO (no response yet), but also wanted to post this here for folks who may be able to offer advice. I am pretty sure I am fundamentally misunderstanding some core flow and I don’t know what.

Edited: removed unnecessary promise wrap, explicitly defined return type

I wrote a simple function to intercept the accelerometer data using the standard subscribe pattern and it worked absolutely great. It was basically like this:

// freq is 500ms
 this.accSub = this.deviceMotion.watchAcceleration({ frequency: this.freq }).subscribe((acceleration: DeviceMotionAccelerationData) => {
      console.log (">>>>>>>>>>>>> ACC CALLBACK")
      this.processCharts(data, this.charts.accChart, 'acc'); 
    });

I then decided to move this code to an @Injectable and separate out the processCharts() from the subscription code.

In other words, I created a service, lets call it sensors with this code:

// note: instead of Function, I also tried my own function type like so:
// type SensorCallbackFunction = (data:any) => void;

startAllSensors(onAccData:Function) {
    // listen to acc. data
    this.accSub = this.deviceMotion.watchAcceleration({ frequency: this.freq }).subscribe((acceleration: DeviceMotionAccelerationData) => {
      console.log (">>>>>>>>>>>>> ACC CALLBACK")
      onAccData(acceleration);
    });

That is, I now wrote a service that accepts a callback that I could invoke from the calling component.

In the calling component, I invoke it this way:

this.sensors.startAllSensors(this.accDataReceived)

where
this.accDataReceived is:

 accDataReceived(data) { // mycode here
 console.log ("acc data:"+JSON.stringify(data));
 }

All awesome. Works great, I see the callback being repeatedly called.

Whats completely confusing me is that things mess up when I add more logic to accData (that takes, lets say, some finite time to complete).

If I change accData to this:

accDataReceived(data) {
   
    console.log ("acc data:"+JSON.stringify(data));
     let dataClone = data;
    //let dataClone = data.map(x => Object.assign({}, x));
     console.log ("ACC"+JSON.stringify(dataClone));
     console.log ("HERE acc");
    this.processCharts(dataClone, this.charts.accChart, 'acc'); 
}

Then what I observe is that the service subscribe is no longer called with new data. So naturally, I assumed that this was because it invoked a callback, that did not return immediately, and that messed up its state.

So I converted both accDataReceived() and processCharts into promises like so:

accDataReceived(data): Promise <boolean> {
   
    console.log ("acc data:"+JSON.stringify(data));
        //let dataClone = data;
       console.log ("ACC"+JSON.stringify(dataClone));
       let dataClone = data.map(x => Object.assign({}, x));
      console.log ("HERE acc");
     return this.processCharts(dataClone, this.charts.accChart, 'acc'); 
}

Note that in addition to doing that, I also made a deep copy of the data thinking the data might get corrupted between the callback execution and the subscription update.

What I see now, is the “ACC” console log, but nothing after that and subscription continues to updates correctly. (I don’t even see “HERE acc”. Which makes me believe this is something fundamentally wrong on how I am approaching this, but I don’t know what.

Any advice? thanks

This is almost always the first three words of an antipattern. If you want a simple rule, never do it. The correct structure is along the lines of

foo(data: typeOfData): Promise<someType> {
  let dataClone = // modify data
  return this.processCharts(dataClone, otherParameters); // return type of processCharts is Promise<someType>
}

To be honest, I don’t quite understand what you need yet, but I figured I’d start here.

True. I just took off the unnecessary promise wrapping. I’ve updated the code

However, my core issue is this:

  • When I synchronously execute all the code inside the sensor subscription, every works well
  • When I try and split the sensor subscription and the callback via a function callback as illustrated above, it doesn’t work

I’m struggling to figure out why. The reason I want to split this functionality is startSensor() makes sense inside an injectable. The action to be performed on it makes sense in a calling component (there are multiple components that use a similar function)

Why are you passing a function though? Why not just
this.sensors.startAllSensors().then( _ => nextFunctionOfInterest);

Edit: to do this, you need startAllSensors to return a Promise.

startAllSensors returns a constant stream of data (every time you move your phone), so I can try and do this in the calling component:

this.sensors.startAllSensors().subscribe( _ => nextFunctionOfInterest);

That being said, startAllSensors actually starts the accelerometer, gyroscope and Gps watchers and therefore it made sense I passed 3 callbacks one for each watcher. I simplified the problem set in my original post.

I’m still very keen to know what the core problem is in my approach.

So it’s named wrong? It doesn’t start sensors? It provides a stream of sensor data? If that’s what’s going on you could do something like


sensorStream(): Observable<something> {}

responseToChanges = this.sensorStream().subscribe(sensorData => doSomething);

responseToChanges.unsubscribe(); // stop responding to changes

You could break this down as accelerometerStream etc, or you could use combineLatest like

sensorStream() {
  return combineLatest(accelerometerStream, gyroscopeStream, gpsStream);
}

if you wanted an array of all three most recent values.

So it’s named wrong? It doesn’t start sensors? It provides a stream of sensor data?

In my mind, startSensors also implies stream but I like streamSensors too.

Anyway, I’m going to try your approach given I can’t figure out what’s going wrong in mine. Will report back

Cool. Just one point about “starting.” There’s a critical difference between a Hot Observable and a Cold Observable. Hot means the stream exists and emits items whether your app listens or not. Cold means the values only start emitting if you subscribe. Probably you’re dealing with hot Observables, where the device GPS is emitting values whether your app listens or not. But maybe I’m wrong, and it’s important for you to know what is going on. If you really are starting sensors, life is different from you starting to listen to sensors that are already started.

Resident opinionated curmudgeon here chiming in that I think the very idea of callbacks is an antipattern in Angular applications. Transforming Observables is definitely the way to go here.

Lots to report on the observable approach, but a simple question for now:

startAndStreamSensors(): Observable <[DeviceMotionAccelerationData,GyroscopeOrientation, Geoposition]>{
    // listen to acc. data
     this.sensorHandles.accSub = this.deviceMotion.watchAcceleration({ frequency: this.freq });
    this.sensorHandles.gyrSub = this.gyroscope.watch({ frequency: this.freq });

   this.perm.checkPermission(this.perm.PERMISSION.ACCESS_FINE_LOCATION)
   .then( _ => { 
        this.sensorHandles.gpsSub = this.geo.watchPosition();  
        return Observable.combineLatest (this.sensorHandles.accSub, this.sensorHandles.gyrSub, this.sensorHandles.gpsSub);
        
      })
      .catch (err => {
        this.utils.presentToast("Error latching to GPS", "error"); 
        return Observable.combineLatest (this.sensorHandles.accSub, this.sensorHandles.gyrSub, this.sensorHandles.gpsSub);
      });
      
      //return Observable.combineLatest (this.sensorHandles.accSub, this.sensorHandles.gyrSub, this.sensorHandles.gpsSub);
      
  }

As you see, I need to wait for permission for one of the sensors before I do a combineLatest. If I do a return Observable.combineLatest at the end of the fuction it will contain an invalid reference to gpsSub. If I don’t return (see last commented statement) typescript complains that the function must return a value. What’s the right way to write this?

Whatever you do, make the first word of the function be return. That should get you started on the right path.

I do need some more help here!
this.perm.checkPermissions is a promise. I can’t just return that.
Trying to create a wrapper Observable create() doesn’t work either (maybe I’m doing it wrong).

Any observable expert willing to rewrite my startAndStreamSensors to return the observable only when its ready to be returned ? (inside checkPermissions) without blowing up my subscribe?

Close, but you want fromPromise. Also look at mergeMap for chaining Observables like you do with then for Promises.

I did try fromPromise - the problem is wrapping the promise in an observable only returns one value. My observable returns 3.

This is my subscribe code:

this.sensors.startAndStreamSensors()
            .subscribe (combined => {
               console.log ("got combined");
              var [acc,gyro, gps] = combined;
              this.accDataReceived(acc);
              this.gyroDataReceived(gyro);
              this.gpsDataReceived(gps);

            })

And this is my observable code: (which needs a proper return)

startAndStreamSensors(): Observable<[DeviceMotionAccelerationData, GyroscopeOrientation, Geoposition]> {
   
      this.sensorHandles.accSub = this.deviceMotion.watchAcceleration({ frequency: this.freq });
      this.sensorHandles.gyrSub = this.gyroscope.watch({ frequency: this.freq });

       this.perm.checkPermission(this.perm.PERMISSION.ACCESS_FINE_LOCATION)
      .then(_ => {
          this.sensorHandles.gpsSub = this.geo.watchPosition();
          console.log ("**** good return")
         return Observable.combineLatest(this.sensorHandles.accSub, this.sensorHandles.gyrSub, this.sensorHandles.gpsSub);

        })
        .catch(err => {
          this.utils.presentToast("Error latching to GPS", "error");
          return Observable.combineLatest(this.sensorHandles.accSub, this.sensorHandles.gyrSub, this.sensorHandles.gpsSub);
        });
  }

If I wrap this code inside Observable.create() I get undefined values in my subscribe. I’m not sure how to use fromPromise() and still make sure my subscription code works correctly.

Holy Batman. I figured it out. I switched back to my old callback mechanism - I really think this is a better approach than observables because it keeps the complete subscription/unsubscription code inside the service, which is the right place for it. The calling component only needs to deal with the ‘actOnNewData’ part.

My problem was this:

Component A was passing a function callback (A.f) to S (Service)
S was latching onto the sensors and creating a subscription (Hot observable)
When new data was arriving, S was invoking A.f(data), which was fine.
But inside A.f, I was trying to call this.process which is another function inside A.

problem was, this's context is now wrong. Since S was calling this function, this belonged to S.

So I fixed it by explicitly binding this to Component A

this.sensors.startAndStreamSensors(this.accDataReceived.bind(this), this.gyroDataReceived.bind(this))

Now I bet someone will thwack me on the head saying this is terrible, but I find it to be exactly contained as I want it to be :slight_smile: