flutterdartstreamtransformrxdart

How to collect items from one stream into the list and expose the result list via other stream in dart (rxdart)?


I have a StreamController

final _controller = StreamController<List<LoadedPage<int, Entity>>>.broadcast();

and stream which exposes data using similar code:

void getPagingData() async {
  final response = _api.getData(...);
  final pages = _convertResponseToPadingData(response);
  // This is an example of exposed event:
  // <LoadedPage<int, Entity>>[
  //   LoadedPage(data: <Entity>[...]),
  //   LoadedPage(data: <Entity>[...]),
  //   LoadedPage(data: <Entity>[...]),
  // ]
  _controller.add(pages);
}

I need to collect all items from data property into the single List<Entity> and expose it as a single event via Stream<List<Entity>>.

Let say I have a list of 3 loaded pages where each page contains finite number of Entity items (for example 30 items). I need to collect all entities from each page to a single List<Entity> (i.e. 90 items) and expose this list to broadcasted stream.

Stream<List<Entity> get entities {
  // Here I need an expression which converts Stream<List<LoadedPage<int,Entity>>>
  // to Stream<List<Entity>>
}

Solution

  • After many experiments I found the solution:

    import 'package:rxdart/rxdart.dart';
    
    class Page<T> {
      Page({this.data = const []});
    
      final List<T> data;
    
      @override
      String toString() => 'Page $data';
    }
    
    final pages = <Page<String>>[
      Page(data: ['1', '2', '3']),
      Page(data: ['5', '6', '7']),
      Page(data: ['8', '9', '0']),
    ];
    
    final _controller = BehaviorSubject<List<Page<String>>>();
    
    void main() async {
      print('Input: $pages');
      final stream = _controller.stream.asyncExpand(
        (pages) => Stream.value(
          pages.fold<List<String>>(
              [], (entities, page) => [...entities, ...page.data]),
        ),
      );
      stream.listen((data) => print('Output: $data'),
          onDone: () => print('Done'), onError: print);
      _controller.add(pages);
    }
    

    Result is:

    Input: [Page [1, 2, 3], Page [5, 6, 7], Page [8, 9, 0]]
    Output: [1, 2, 3, 5, 6, 7, 8, 9, 0]