I'm trying to write a task queue class in flutter, but I need a way of detecting if a task is already running to prevent deadlocks.
import 'dart:async';
Future<void> main() async {
final a = doSomething(1);
final b = doSomethingElse(2);
final c = doSomething(3);
await Future.wait([a, b, c]);
print('done');
}
final queue = TaskQueue();
Future<void> doSomething(int count) async {
return queue.enqueue(() async {
await Future.delayed(const Duration(milliseconds: 500));
print('something #$count');
await doSomethingElse(count);
});
}
Future<void> doSomethingElse(int count) {
return queue.enqueue(() => print('something else #$count'));
}
typedef Task<T> = FutureOr<T> Function();
class TaskQueue {
final _queue = <_TaskEntry>[];
var _running = false;
bool get isRunning => _running;
Future _begin(Future first) async {
_running = true;
await first;
while (true) {
final task = _queue.firstOrNull;
if (task == null) {
break;
}
_queue.removeAt(0);
await task.run();
}
_running = false;
}
Future<T> enqueue<T>(Task<T> task) {
if (_running) {
final completer = Completer<T>();
_queue.add(_TaskEntry(task, completer));
return completer.future;
}
final result = task();
if (result is! Future<T>) {
return Future.value(result);
}
_begin(result);
return result;
}
}
class _TaskEntry<T> {
final Task<T> task;
final Completer<T> completer;
_TaskEntry(this.task, this.completer);
Future run() async {
try {
final result = await task();
completer.complete(result);
} catch (error, stack) {
completer.completeError(error, stack);
}
}
}
In this example, the program freezes because the doSomething method awaits doSomethingElse, which queues another task, but the queue never starts that task as it is still waiting for doSomething to complete. Is there a way of detecting whether the current TaskQueue is already running a task inside the enqueue method, and running it immediately?
The expected output is
something #1
something else #1
something else #2
something #3
something else #3
done
I've managed to get the desired behaviour using the Zone
API. By creating a zone for each task, I can use Zone.current
to lookup the currently executing task in a map and add it as a sub-task.
import 'dart:async';
Future<void> main() async {
final a = doSomething(1);
final b = doSomethingElse(2);
final c = doSomething(3);
await Future.wait([a, b, c]);
print('done');
}
final queue = TaskQueue();
Future<void> doSomething(int count) async {
return queue.enqueue(() async {
await Future.delayed(const Duration(milliseconds: 500));
print('something #$count');
await doSomethingElse(count);
});
}
Future<void> doSomethingElse(int count) {
return queue.enqueue(() async {
await Future.delayed(const Duration(milliseconds: 100));
print('something else #$count');
});
}
typedef Task<T> = FutureOr<T> Function();
class TaskQueue {
final Zone? _zone;
final _queue = <_TaskEntry>[];
final _map = <Zone, _TaskEntry>{};
var _running = false;
bool get isRunning => _running;
TaskQueue({Zone? zone}) : _zone = zone;
/// If [enqueue] is called inside a task, returns the [_TaskEntry] for that task.
_TaskEntry? _getParent(Zone zone) {
while (true) {
final entry = _map[zone];
if (entry != null) {
return entry;
}
final parent = zone.parent;
if (parent == null) {
return null;
}
zone = parent;
}
}
Future<void> _run(_TaskEntry entry, Zone parentZone) async {
final zone = parentZone.fork();
_map[zone] = entry;
try {
await zone.run(entry.execute);
} finally {
_map.remove(zone);
}
}
Future<void> _drain() async {
assert(!_running);
assert(_queue.isNotEmpty);
_running = true;
final zone = _zone ?? Zone.current;
try {
do {
final task = _queue.removeAt(0);
await _run(task, zone);
} while (_queue.isNotEmpty);
} finally {
_running = false;
}
}
Future<T> enqueue<T>(Task<T> task) {
final completer = Completer<T>();
final entry = _TaskEntry(task, completer);
final zone = Zone.current;
final parent = _getParent(zone);
if (parent != null) {
parent.subTasks.add(entry);
_run(entry, zone);
} else {
_queue.add(entry);
if (!_running) {
_drain();
}
}
return completer.future;
}
}
class _TaskEntry<T> {
final Task<T> task;
final Completer<T> completer;
Future<void>? _execution;
/// tasks that are enqueued inside [task] will be added here.
final subTasks = <_TaskEntry>[];
_TaskEntry(this.task, this.completer);
Future<void> execute() {
assert(_execution == null);
return _execution = _execute();
}
Future<void> _execute() async {
try {
final result = await task();
completer.complete(result);
} catch (error, stack) {
completer.completeError(error, stack);
return;
} finally {
// make sure sub-tasks that are enqueued but not awaited complete
for (final child in subTasks) {
await child._execution;
}
}
}
}