pythondjangoelasticsearchelasticsearch-dslelasticsearch-dsl-py

How can I do parallel tests in Django with Elasticsearch-dsl?


Has anyone gotten parallel tests to work in Django with Elasticsearch? If so, can you share what configuration changes were required to make it happen?

I've tried just about everything I can think of to make it work including the solution outlined here. Taking inspiration from how Django itself does the parallel DB's, I currently have created a custom new ParallelTestSuite that overrides the init_worker to iterate through each index/doctype and change the index names roughly as follows:

_worker_id = 0
def _elastic_search_init_worker(counter):
    global _worker_id

    with counter.get_lock():
        counter.value += 1
        _worker_id = counter.value

    for alias in connections:
        connection = connections[alias]
        settings_dict = connection.creation.get_test_db_clone_settings(_worker_id)
        # connection.settings_dict must be updated in place for changes to be
        # reflected in django.db.connections. If the following line assigned
        # connection.settings_dict = settings_dict, new threads would connect
        # to the default database instead of the appropriate clone.
        connection.settings_dict.update(settings_dict)
        connection.close()

    ### Everything above this is from the Django version of this function ###

    # Update index names in doctypes
    for doc in registry.get_documents():
        doc._doc_type.index += f"_{_worker_id}"

    # Update index names for indexes and create new indexes
    for index in registry.get_indices():
        index._name += f"_{_worker_id}"
        index.delete(ignore=[404])
        index.create()

    print(f"Started thread # {_worker_id}")

This seems to generally work, however, there's some weirdness that happens seemingly randomly (i.e. running the test suite again doesn't reliably reproduce the issue and/or the error messages change). The following are the various errors I've gotten and it seems to randomly fail on one of them each test run:

I'm thinking that there's something weird going on at the connection layer (like somehow the connections between Django test runner processes are getting the responses mixed up?) but I'm at a loss as to how that would be even possible since Django uses multiprocessing to parallelize the tests and thus they are each running in their own process. Is it somehow possible that the spun-off processes are still trying to use the connection pool of the original process or something? I'm really at a loss of other things to try from here and would greatly appreciate some hints or even just confirmation that this is in fact possible to do.


Solution

  • I'm thinking that there's something weird going on at the connection layer (like somehow the connections between Django test runner processes are getting the responses mixed up?) but I'm at a loss as to how that would be even possible since Django uses multiprocessing to parallelize the tests and thus they are each running in their own process. Is it somehow possible that the spun-off processes are still trying to use the connection pool of the original process or something?

    This is exactly what is happening. From the Elasticsearch DSL docs:

    Since we use persistent connections throughout the client it means that the client doesn’t tolerate fork very well. If your application calls for multiple processes make sure you create a fresh client after call to fork. Note that Python’s multiprocessing module uses fork to create new processes on POSIX systems.

    What I observed happening is that the responses get very weirdly interleaved with a seemingly random client that may have started the request. So a request to index a document might end up with a response to create an index which have very different attributes on them.

    The fix is to ensure that each test worker has its own Elasticsearch client. This can be done by creating worker-specific connection aliases and then overwriting the current connection aliases (with the private attribute _using) with the worker-specific one. Below is a modified version of the code you posted with the change

    _worker_id = 0
    def _elastic_search_init_worker(counter):
        global _worker_id
    
        with counter.get_lock():
            counter.value += 1
            _worker_id = counter.value
    
        for alias in connections:
            connection = connections[alias]
            settings_dict = connection.creation.get_test_db_clone_settings(_worker_id)
            # connection.settings_dict must be updated in place for changes to be
            # reflected in django.db.connections. If the following line assigned
            # connection.settings_dict = settings_dict, new threads would connect
            # to the default database instead of the appropriate clone.
            connection.settings_dict.update(settings_dict)
            connection.close()
    
        ### Everything above this is from the Django version of this function ###
    
        from elasticsearch_dsl.connections import connections
    
        # each worker needs its own connection to elasticsearch, the ElasticsearchClient uses
        # global connection objects that do not play nice otherwise
        worker_connection_postfix = f"_worker_{_worker_id}"
        for alias in connections:
            connections.configure(**{alias + worker_connection_postfix: settings.ELASTICSEARCH_DSL["default"]})
    
        # Update index names in doctypes
        for doc in registry.get_documents():
            doc._doc_type.index += f"_{_worker_id}"
            # Use the worker-specific connection
            doc._doc_type._using = doc.doc_type._using + worker_connection_postfix
    
        # Update index names for indexes and create new indexes
        for index in registry.get_indices():
            index._name += f"_{_worker_id}"
            index._using = doc.doc_type._using + worker_connection_postfix
            index.delete(ignore=[404])
            index.create()
    
        print(f"Started thread # {_worker_id}")