I am trying to run my pytests in parallel using the pytest plugins parallel-0.1.1 and xdist-3.2.1 along with the --tests-per-worker n flag. I have a set of tests that require a preprocessing step which must be run in a critical section. This section is protected by a multiprocessing lock to avoid simultaneous execution by multiple workers.
However, despite using the lock, the workers enter the critical section simultaneously, leading to synchronization problems.
Here is a simplified version of the problematic code:
Test code:
import pytest
@pytest.mark.parametrize("preprocess", ["config1"], indirect=True)
def test_example1(preprocess):
# Use the preprocessed data in the test
print(f"Test using preprocessed data: {preprocess}")
# do something with preprocess
@pytest.mark.parametrize("preprocess", ["config2"], indirect=True)
def test_example2(preprocess):
# Use the preprocessed data in the test
print(f"Test using preprocessed data: {preprocess}")
# do something with preprocess
conftest.py file:
import pytest
import multiprocessing
lock = multiprocessing.Lock()
@pytest.fixture
def preprocess(request):
with lock:
# critical section
Why is my lock not preventing simultaneous entry into the critical section when running the tests in parallel? How can I resolve this synchronization problem?
I appreciate any assistance with this matter!
A library such as https://pypi.org/project/fasteners/ has better locking mechanisms for the goal you're trying to accomplish. You want a lock based around a file so that different processes don't create different locks.
import fasteners
lock = fasteners.InterProcessLock('path/to/lock.file')
with lock:
... # exclusive access
Then your code becomes thread-safe / process-safe.