I'm trying to get some idea/intuition about how Dart's async/await work.
Here is a simple Dart program where there is:
pipe
(a List<String>
).pipe
every 100ms, removing any strings found.Both functions have been made to run concurrently as 'tasks on the event loop' by 'futurizing' them (what's the correct word?). I don't care about the actual value reported by the Futures
, just that I get two concurrent tasks on the event loop.
Also, there is no need to "synchronize" access to pipe
(i.e. make access thread-safe) as there really is only one CPU thread doing all the work. Additionally, the consumer function must call Future.delayed()
to yield the CPU so that the producer is able to run at all.
Is there a way to perform waiting on pipe
until it is not-empty rather than checking every 100ms? Am I supposed to use a Dart stream
for that?
import 'dart:async';
import 'dart:math';
final pipe = <String>[];
final rand = Random();
String now() {
return '${DateTime.now()}';
}
Future producer() async {
print('Producer starts at ${now()}');
for (int i = 0; i < 5; i++) {
int delay_ms = 500 + rand.nextInt(3000);
print('Producer delaying by $delay_ms ms ...');
await Future.delayed(Duration(milliseconds: delay_ms));
final payload = 'payload $i';
pipe.add(payload);
print("Producer produced '$payload' at ${now()}");
}
pipe.add('DONE');
print('Producer ends at ${now()}');
}
Future consumer() async {
print('Consumer starts at ${now()}');
String? received;
while (received != 'DONE') {
// do nothing for 100ms, giving the producer CPU time
// --- but is there a better way? ---
await Future.delayed(const Duration(milliseconds: 100));
received = null;
while (pipe.isNotEmpty && received != 'DONE') {
received = pipe.removeAt(0);
print("Consumer received '$received' at ${now()}");
}
}
print('Consumer ends at ${now()}');
}
Future<void> main() async {
print('main(): Starting');
final Future producerFuture = producer().then((_) {
print('Producer finished');
});
print('main(): Producer is running');
final Future consumerFuture = consumer().then((_) {
print('Consumer finished');
});
print('main(): Consumer is running');
final Future commonFuture = Future.wait([producerFuture, consumerFuture]);
await commonFuture;
print('main(): Done');
}
An example run might be:
main(): Starting
Producer starts at 2024-05-28 15:39:38.021731
Producer delaying by 2092 ms ...
main(): Producer is running
Consumer starts at 2024-05-28 15:39:38.040345
main(): Consumer is running
Producer produced 'payload 0' at 2024-05-28 15:39:40.125837
Producer delaying by 2526 ms ...
Consumer received 'payload 0' at 2024-05-28 15:39:40.170401
Producer produced 'payload 1' at 2024-05-28 15:39:42.652816
Producer delaying by 2167 ms ...
Consumer received 'payload 1' at 2024-05-28 15:39:42.695814
Producer produced 'payload 2' at 2024-05-28 15:39:44.820820
Producer delaying by 3091 ms ...
Consumer received 'payload 2' at 2024-05-28 15:39:44.917827
Producer produced 'payload 3' at 2024-05-28 15:39:47.912795
Producer delaying by 2285 ms ...
Consumer received 'payload 3' at 2024-05-28 15:39:47.947775
Producer produced 'payload 4' at 2024-05-28 15:39:50.198882
Producer ends at 2024-05-28 15:39:50.199027
Producer finished
Consumer received 'payload 4' at 2024-05-28 15:39:50.271304
Consumer received 'DONE' at 2024-05-28 15:39:50.271424
Consumer ends at 2024-05-28 15:39:50.271494
Consumer finished
main(): Done
Is there a way to perform waiting on pipe until it is not-empty rather than checking every 100ms? Am I supposed to use a Dart stream for that?
Yes, there is. And you guessed it right, dealing with Stream is a proper way to handle such a functionality.
Instead of "manual checking" every 100ms, consider the pipe
a Stream<int>
and let it do the job by listening to its changes.
Additionally, let's use a StreamController
A controller with the stream it controls. This controller allows sending data, error and done events on its stream. This class can be used to create a simple stream that others can listen on, and to push events to that stream.
Example:
import 'dart:async';
import 'dart:math';
final StreamController<String> controller = StreamController<String>();
String now() {
return '${DateTime.now()}';
}
Future<void> runProducer() async {
print('Producer starts at ${now()}');
for (int i = 0; i < 5; i++) {
final delayMs = 500 + Random().nextInt(3000);
print('Producer delaying by $delayMs ms ...');
// Sleeping here will switch execution to the consumer which will
// then find and process "payload i-1" through the stream
// listener
await Future.delayed(Duration(milliseconds: delayMs));
final payload = 'payload $i';
controller.add(payload);
print("Producer produced '$payload' at ${now()}");
}
print('Producer ends at ${now()}');
}
Future<void> runConsumer() async {
print('Consumer starts at ${now()}');
controller.stream.listen((String received) {
print("Consumer received '$received' at ${now()}");
}, onDone: () {
print('Consumer ends at ${now()}');
});
}
Main:
void main() async {
print('main(): Starting');
final Future commonFuture = Future.wait([runProducer(), runConsumer()]);
await commonFuture;
print('main(): Done');
}
Btw, make sure to close the StreamController
when it is no longer needed to free up resources and avoid memory leaks:
controller.close();