flutterdartblocflutter-cubit

How to divide event stream effectively with Cubit


I'm a fairly experienced C# dev trying to get my head around Flutter's cubit functionality.

I have an event stream from Flutter Blue which calls a method on my controlling class every time the device emits an update on a certain BLE characteristic. This works a charm and I can map the payload bytes onto this struct:

class WorkoutStatus {
  double speedInKmh;
  double distanceInKm;
  int timeInSeconds;
  int indicatedCalories;
  int steps;

  WorkoutStatus({
    required this.speedInKmh,
    required this.distanceInKm,
    required this.timeInSeconds,
    required this.indicatedCalories,
    required this.steps,
  });

  // Mapping code redacted for brevity
}

However, you'll notice that a TreadmillControlService has to do a lot more than just process these events, it has to handle the bluetooth connection as well for instance and it needs to be able to indicate back to the presentation layer what the connection state is etc. There's also a difference between the selected speed and the actual speed at certain times while it's ramping. I really think there are two distinct things going on here, managing the treadmill itself, and managing the workout (which needs the updates from the treadmill).

class TreadmillControlService extends Cubit<TreadmillWorkoutUnion> {
  BluetoothDevice? _device;
  BluetoothCharacteristic? _control;
  BluetoothCharacteristic? _workoutStatus;

  // Double underscore to ensure you use the setter
  double __requestedSpeed = 0;

  WorkoutStatus? _status;
  static const double minSpeed = 1;
  static const double maxSpeed = 6;

  TreadmillControlService(super.initialState) {
    FlutterBluePlus.setLogLevel(LogLevel.warning, color: false);
  } 
  
  // This method gets called when the treadmill connects
  Future<void> _setupServices() async {
    await _device!.discoverServices();
    var fitnessMachine = _device!.servicesList.firstWhere((s) => s.uuid == Guid("1826"));
    _control = fitnessMachine.characteristics.firstWhere((c) => c.uuid == Guid("2ad9"));
    _workoutStatus = fitnessMachine.characteristics.firstWhere((c) => c.uuid == Guid("2acd"));
    _workoutStatus!.onValueReceived.listen(_processStatusUpdate);
    _workoutStatus!.setNotifyValue(true);
  }

  void _processStatusUpdate(List<int> value) {
    _status = WorkoutStatus.fromBytes(value);
    // This is where I need to emit the update
  }

  // Redacted all these bodies because they're irrelevant

  Future<void> connect() async {  }

  Future<void> _wakeup() async {  }

  Future<void> start() async {  }

  Future<void> stop() async {  }

  Future<void> _setSpeed(double speed) async {  }

  void pause() {  }

  Future<void> speedUp() async {  }

  Future<void> speedDown() async {  }

  set _requestedSpeed(double value) {  }

  double get _requestedSpeed => __requestedSpeed;
}

So the way I see it, I have two options, I can emit a union of the two of these things and just accept that they're coupled:

class TreadmillWorkoutUnion {
  TreadmillState treadmillState;
  WorkoutStatus workoutStatus;

  TreadmillWorkoutUnion(this.treadmillState, this.workoutStatus);
}

Which I don't really like (but does work).

  void _processStatusUpdate(List<int> value) {
    final workoutStatus = WorkoutStatus.fromBytes(value);
    // Make a factory for this or something
    final treadmillSatus = TreadmillState(
        speedState: workoutStatus.speedInKmh == _requestedSpeed
            ? SpeedState.steady
            : workoutStatus.speedInKmh < _requestedSpeed
                ? SpeedState.increasing
                : SpeedState.decreasing,
        connectionState: _device!.isConnected ? ConnectionState.connected : ConnectionState.disconnected,
        requestedSpeed: _requestedSpeed,
        currentSpeed: workoutStatus.speedInKmh);
    emit(TreadmillWorkoutUnion(treadmillSatus, workoutStatus));
  }

Alternatively, what I want to do and what I'd do in C# is split this into two completely different event streams in my _processStatusUpdate method and have Cubits emitted separately for TreadmillState and WorkoutStatus. However, I can't see how to do this, where am I going wrong?


Solution

  • I would like to thank PurplePolyhedron for his answer, which set me off in the right direction but didn't quite get me where I wanted to go.


    The crux of the issue seems to be that I was bringing Cubit in at too low of a level. At the service level, I really needed to be dealing with streams themselves and then introducing cubit when I needed to bridge to the presentation layer.

    To do this, I first changed my TreadmillControlService to have two StreamControllers set up to broadcast(). One for treadmill status changes and one for workout status changes. In my characteristic listener handler, I can then write to these two streams separately.

    // Irrelevant methods redacted for brevity
    class TreadmillControlService implements Disposable {
      final StreamController<WorkoutStatus> _workoutStatusStreamController;
      final StreamController<TreadmillState> _treadmillStateStreamController;
    
      late Stream workoutStatusStream;
      late Stream treadmillStateStream;
    
      TreadmillControlService()
          : _workoutStatusStreamController = StreamController<WorkoutStatus>.broadcast(),
            _treadmillStateStreamController = StreamController<TreadmillState>.broadcast() {
        workoutStatusStream = _workoutStatusStreamController.stream;
        treadmillStateStream = _treadmillStateStreamController.stream;
        FlutterBluePlus.setLogLevel(LogLevel.warning, color: false);
      }
    
      void _processStatusUpdate(List<int> value) {
        final workoutStatus = WorkoutStatus.fromBytes(value);
        // Make a factory for this or something
        final treadmillState = TreadmillState(
            speedState: workoutStatus.speedInKmh == _requestedSpeed
                ? SpeedState.steady
                : workoutStatus.speedInKmh < _requestedSpeed
                    ? SpeedState.increasing
                    : SpeedState.decreasing,
            connectionState: _device!.isConnected ? ConnectionState.connected : ConnectionState.disconnected,
            requestedSpeed: _requestedSpeed,
            currentSpeed: workoutStatus.speedInKmh);
        _workoutStatusStreamController.add(workoutStatus);
        _treadmillStateStreamController.add(treadmillState);
      }
    }
    

    This has the added benefit of being able to listen to these streams in other parts of the business logic (hence the broadcast).

    Then I have two separate cubits for presenting these streams to the UI:

    Treadmill State:

    class TreadmillStateCubit extends Cubit<TreadmillState> {
      final TreadmillControlService _treadmillControlService;
    
      TreadmillStateCubit(this._treadmillControlService) : super(TreadmillState.initial()) {
        _treadmillControlService.treadmillStateStream.listen((state) {
          emit(state);
        });
      }
    
      void connect() => _treadmillControlService.connect();
    
      void start() => _treadmillControlService.start();
    
      void stop() => _treadmillControlService.stop();
    
      void speedUp() => _treadmillControlService.speedUp();
    
      void speedDown() => _treadmillControlService.speedDown();
    }
    

    Workout State:

    class WorkoutStatusCubit extends Cubit<WorkoutStatus> {
      final TreadmillControlService _treadmillControlService;
    
      WorkoutStatusCubit(this._treadmillControlService) : super(WorkoutStatus.zero()) {
        _treadmillControlService.workoutStatusStream.listen((state) {
          emit(state);
        });
      }
    }
    

    I also had to implement Equatable on both WorkoutStatus and TreadmillState to enable bloc to be able to accurately determine state changes. I don't think this would have been necessary for the stream coming from the real treadmill control service, as new objects are created each time. However, I have a simulation that mutates the state between emits.

    I then had to wrap my controls in a MultiBlocProvider:

    class ControlPage extends StatelessWidget {
      final TreadmillControlService treadmillControlService;
      const ControlPage(this.treadmillControlService, {Key? key}) : super(key: key);
    
      @override
      Widget build(BuildContext context) {
        return MultiBlocProvider(
          providers: [
            BlocProvider<WorkoutStatusCubit>(
              create: (ctx) => WorkoutStatusCubit(treadmillControlService),
            ),
            BlocProvider<TreadmillStateCubit>(
              create: (ctx) => TreadmillStateCubit(treadmillControlService),
            ),
          ],
          child: const Column(
            crossAxisAlignment: CrossAxisAlignment.stretch,
            children: [
              WorkoutStatusPanel(),
              TreadmillControls(),
            ],
          ),
        );
      }
    }
    

    And moved the relevant sub-widgets into their own classes, each with a BlocBuilder for their relevant Cubit:

    WorkoutStatusPanel:

    class WorkoutStatusPanel extends StatelessWidget {
      const WorkoutStatusPanel({super.key});
    
      @override
      Widget build(BuildContext context) {
        return BlocBuilder<WorkoutStatusCubit, WorkoutStatus>(builder: (ctx, state) {
          return Column(crossAxisAlignment: CrossAxisAlignment.stretch, children: [
            Row(
              children: [
                UnitQuantityCard(state.timeInSeconds, "s", 0),
                UnitQuantityCard(state.distanceInKm, "km", 2),
              ],
            ),
            Row(
              children: [
                UnitQuantityCard(state.indicatedCalories, "kCal", 0),
                UnitQuantityCard(state.steps, "steps", 0),
              ],
            )
          ]);
        });
      }
    }
    

    TreadmillControls:

    class TreadmillControls extends StatelessWidget {
      const TreadmillControls({super.key});
    
      @override
      Widget build(BuildContext context) {
        final controlCubit = BlocProvider.of<TreadmillStateCubit>(context);
        return BlocBuilder<TreadmillStateCubit, TreadmillState>(builder: (ctx, state) {
          return Column(children: [
            Text("Connection: ${state.connectionState.name}"),
            Text("Speed ${state.speedState.name} (${state.currentSpeed}/${state.requestedSpeed})"),
            ElevatedButton(
              onPressed: () {
                controlCubit.connect();
              },
              child: const Text('Connect'),
            ),
            ElevatedButton(
              onPressed: () {
                controlCubit.start();
              },
              child: const Text('Start'),
            ),
            ElevatedButton(
              onPressed: () {
                controlCubit.stop();
              },
              child: const Text('Stop'),
            ),
            ElevatedButton(
              onPressed: () {
                controlCubit.speedUp();
              },
              child: const Text('Speed Up'),
            ),
            ElevatedButton(
              onPressed: () {
                controlCubit.speedDown();
              },
              child: const Text('Speed Down'),
            ),
          ]);
        });
      }
    }
    
    

    This now works nicely and we have a good division of responsibilities.