I have a large collection (+90 000 objects) and I would like to run while loop in parallel on it, source of my function is below
val context = newSingleThreadAsyncContext()
return KtxAsync.async(context) {
val fields = regularMazeService.generateFields(colsNo, rowsNo)
val time = measureTimeMillis {
withContext(newAsyncContext(10)) {
while (availableFieldsWrappers.isNotEmpty()) {
val wrapper = getFirstShuffled(availableFieldsWrappers.lastIndex)
.let { availableFieldsWrappers[it] }
if (wrapper.neighborsIndexes.isEmpty()) {
availableFieldsWrappers.remove(wrapper)
continue
}
val nextFieldIndex = getFirstShuffled(wrapper.neighborsIndexes.lastIndex)
.let {
val fieldIndex = wrapper.neighborsIndexes[it]
wrapper.neighborsIndexes.removeAt(it)
fieldIndex
}
if (visitedFieldsIndexes.contains(nextFieldIndex)) {
wrapper.neighborsIndexes.remove(nextFieldIndex)
fields[nextFieldIndex].neighborFieldsIndexes.remove(wrapper.index)
continue
}
val nextField = fields[nextFieldIndex]
availableFieldsWrappers.add(FieldWrapper(nextField, nextFieldIndex))
visitedFieldsIndexes.add(nextFieldIndex)
wrapper.field.removeNeighborWall(nextFieldIndex)
nextField.removeNeighborWall(wrapper.index)
}
}
}
Gdx.app.log("maze-time", "$time")
On top of class
private val availableFieldsWrappers = Collections.synchronizedList(mutableListOf<FieldWrapper>())
private val visitedFieldsIndexes = Collections.synchronizedList(mutableListOf<Int>())
I test it a few times results are below:
What I'm doing wrong?
You are using Collections.synchronizedList
from Java standard library, which returns a list wrapper that leverages blocking synchronized
mechanism to ensure thread safety. This mechanism is not compatible with coroutines, as in it blocks the other threads from accessing the collection until the operation is finished. You should generally use non-blocking concurrent collections when accessing data from multiple coroutines or protect the shared data with a non-blocking mutex.
List.contains
will be become slower and slower (O(n)) as more and more elements are added. Instead of a list, you should use a set for visitedFieldsIndexes
. Just make sure to either protect it with a mutex or use a concurrent variant. Similarly, removal of values with random indices from the availableFieldsWrappers
is pretty costly - instead, you can shuffle the list once and use simple iteration.
You are not reusing the coroutine contexts. In general, you can create asynchronous context once and reuse its instance instead of creating a new thread pool each time you need coroutines. You should invoke and assign the result of newAsyncContext(10)
just once and reuse it throughout your application.
The code you have currently written does not leverage coroutines very well. Instead of thinking of coroutines dispatcher as a thread pool where you can launch N big tasks in parallel (i.e. your while availableFieldsWrappers.isNotEmpty
loop), you should think of it as an executor of hundreds or thousands of small tasks, and adjust your code accordingly. I think you could avoid the available/visited collections altogether by rewriting your code with introduction of e.g. Kotlin flows or just multiple KtxAsync.async
/KtxAsync.launch
calls that handle smaller portion of the logic.
Unless some of the functions are suspending or use coroutines underneath, you're not really leveraging the multiple threads of an asynchronous context at all. withContext(newAsyncContext(10))
launches a single coroutine that handles the whole logic sequentially, leveraging only a single thread. See 4. for some ideas on how you can rewrite the code. Try collecting (or just printing) the thread hashes and names to see if you are using all of the threads well.