Replace latching flag with observable

I have a bluetooth-le connection to a device that occasionally drops. When this happens I try to reconnect. The user can also choose to disconnect. I need to differentiate between these events.

  return this.ble.connect(macAddress, this.onDisconnect.bind(this))
        .then(connection => this.commsData.broadcastConnection(macAddress))

communications data service

  private _connection$: Subject<IConnectionEvent> = new Subject();
  public get connection$(): Observable<IConnectionEvent> {
    return this._connection$.asObservable();
  }
  public broadcastConnection(macAddress: string) {
    this._connection$.next({event: ConnectionEvent.CONNECT, macAddress});
  }
  public broadcastDisconnection(connectionEvent: ConnectionEvent) {
    this._connection$.next({event: connectionEvent});
  }

The bluetooth-le connect method takes an onDisconnect callback that fires in both situations. I have a subject that returns observables emitting connection states as they change. A connection emits ‘connect’, a manual disconnection emits ‘disconnect’ and I need a way to signal a dropped connection.

  private isExpectedDisconnection: boolean = false;

  onDisconnect(macAddress: string) {
    // default behaviour is to retry connection
    if (!this.isExpectedDisconnection) {
      return this.commsData.broadcastDisconnection(ConnectionEvent.DROP);
    }
    this.isExpectedDisconnection = false;
  }

I’ve achieved this with a flag, ‘isExpectedDisconnection’, which is set false by default, true when a user disconnection occurs and reset to false in the onDisconnect callback function. This is a really imperative way of doing something I feel should be handled reactively. But I’m still having a hard time thinking that way.

  this.commsData.connection$
    .subscribe({
      next: (connectionEvent: IConnectionEvent) => {
        if (!connectionEvent) return;

        if (connectionEvent.event === ConnectionEvent.CONNECT) return;

        if (connectionEvent.event === ConnectionEvent.DISCONNECT) {
          this.isExpectedDisconnection = true;
          return this.ble.disconnect(this.macAddress);
        }

        if (connectionEvent.event === ConnectionEvent.DROP) {
          return this.connect(this.macAddress);
        }
      }
    })

connection event

  export enum ConnectionEvent {
    CONNECT = 'connected',
    DISCONNECT = 'disconnected',
    DROP = 'dropped'
  }

How can (or should) I have another observable merged with connection$ and only act if the disconnect was unexpected?

Hi there

The Observable way to achieve this is to use rxjs operators to combine two Observable streams into one. A typical operator for this is switchMap.

So, you are already having one stream _connection$, beautifully created using Subject. Now you need the other even stream to be of the same type, so you can switchMap them.

Ps. at least I believe switchMap does the trick. There are others, that behave differently pending on which stream triggered first and so on.

Thanks for your response. My issue really is that I want to perform an action if an event didn’t occur. If _connection$ emits a disconnect and there was no preceding user action then I need to reconnect.

I can combine observables to act if both emit values. I can defer action after a UI instigated disconnect until another event occurs. But the dropped connection only disconnects, there’s no other observable to combine with.

I think I’ve got it working with the scan operator.

this.commsData.connection$
      .pipe(
        scan((prev, curr)=>{
          const isExpected = (prev.isStopped && curr.isDisconnected);
          let connectionEvent: ConnectionEvent = (isExpected)? null: curr;
        
          prev = curr;
          return connectionEvent;
        })
      .subscribe({
        next: (connectionEvent: ConnectionEvent) => {
          if (!connectionEvent) return; // <-- skip the deliberate disconnect
          if (connectionEvent.isConnected) return;

          if (connectionEvent.isStopped) {
            return this.ble.disconnect(this.macAddress);
          }

          if (connectionEvent.isDisconnected) {
            return this.connect(this.macAddress);
          }
        }
      })

Ok nice

There are many operators that merge streams and then you can monitor any of them - there are even more that helps working with digferent event streams - but this u may already know

Might it be possible to do this with strategic timing of subscription?

You have a Reconnecter that listens for DROPs and reconnects when it sees one. When the user wants to manually disconnect, you first unsubscribe the Reconnecter and put it to sleep. When the user connects again, after the connection is forged, resubscribe the Reconnecter.

That’s an interesting option. Rather than listening and ignoring based on previous state, I just stop listening. But starting and stopping subscriptions doesn’t seem idiomatic. I’m doing my best to get out of the habit of conditional subscriptions.

I’m just finding my feet here but it seems better to have listeners dealing with all incoming streams all of the time. That way they can act independently rather than follow procedure.

For now I’ll stick to the scan operator and go back to figuring out why it’s dropping the connection in the first place!

(Excuse the a/c switch)