djangoredisceleryflower

Celery/redis tasks don't always complete - not sure why or how to fit it


I am running celery v 4.0.3/redis v 4.09 in a django v 3.0.1 application (Python v 3.6.9). I am also using face_recognition in a celery task find_faces to find faces in the images I have uploaded to the application, among other image processing celery tasks. There are no issues processing five or fewer image files in that all the image processing celery tasks complete successfully.

When I have the image process tasks (including find_faces) iterate over 100 images there are 10-30 images where the find_faces task does not complete. When I use flower v0.9.7 to take a look at the celery tasks, I see that the find_faces task status is "started" for those images that did not complete. All the other images have find_faces task status as "success". The status of these "started" tasks never changes, and there are no errors or exceptions reported. I can then run the image processing tasks, including the find_faces task, on each of these images individually, and the task status is "success". These results do not change if I run celery as a daemon or locally, or if I run the django app using wsgi and apache or runserver. Flower also reports that retries = 0 for all my tasks.

I have CELERYD_TASK_SOFT_TIME_LIMIT = 60 set globally in the django app, and max_retries=5 for the find_faces task.

@app.task(bind=True, max_retries=5)
def find_faces_task(self, document_id, use_cuda=settings.USE_CUDA):
    logger.debug("find_faces_task START")
    try:
        temp_face = None
        from memorabilia.models import TaskStatus, Document      
        args = "document_id=%s, use_cuda=%s" % (document_id, use_cuda)
        ts = TaskStatus(document_id_id=document_id, task_id=self.request.id, task_name='find_faces_task', task_args=args, task_status=TaskStatus.PENDING)
        ts.save()
        import time
        time_start = time.time()
        # Check if we already have the faces for this document
        from biometric_identification.models import Face
        if len(Face.objects.filter(document_id=document_id)) != 0:
            # This document has already been scanned, so need to remove it and rescan
            # Have to manually delete each object per django docs to insure the 
            # model delete method is run to update the metadata.
            logger.debug("Document %s has already been scanned" % document_id)
            faces = Face.objects.filter(document_id=document_id)
            for face in faces:
                face.delete()
                logger.debug("Deleted face=%s" % face.tag_value.value)
        document = Document.objects.get(document_id=document_id)
        image_file = document.get_default_image_file(settings.DEFAULT_DISPLAY_IMAGE)
        image_path = image_file.path
        time_start_looking = time.time()
        temp_file = open(image_path, 'rb')
        temp_image = Image.open(temp_file)
        logger.debug("fred.mode=%s" % fred.mode)
        width, height = temp_image.size
        image = face_recognition.load_image_file(temp_file)
        # Get the coordinates of each face
        if use_cuda:
            # With CUDA installed
            logger.debug("Using CUDA for face recognition")
            face_locations = face_recognition.face_locations(image, model="cnn", number_of_times_to_upsample=0) 
        else:
            # without CUDA installed
            logger.debug("NOT using CUDA for face recognition")
            face_locations = face_recognition.face_locations(image, model="hog", number_of_times_to_upsample=2)
        time_find_faces = time.time()
        # Get the face encodings for each face in the picture    
        face_encodings = face_recognition.face_encodings(image, known_face_locations=face_locations) 
        logger.debug("Found %s face locations and %s encodings" % (len(face_locations), len(face_encodings)))
        time_face_encodings = time.time()
        # Save the faces found in the database
        for location, encoding in zip(face_locations, face_encodings):
            # Create the new Face object and load in the document, encoding, and location of a face found
            # Locations seem to be of the form (y,x)
            from memorabilia.models import MetaData, MetaDataValue
            tag_type_people = MetaDataValue.objects.filter(metadata_id=MetaData.objects.filter(name='Tag_types')[0].metadata_id, value='People')[0]
            tag_value_unknown = MetaDataValue.objects.filter(metadata_id=MetaData.objects.filter(name='Unknown')[0].metadata_id, value='Unknown')[0]
            new_face = Face(document=document, face_encoding=numpy_to_json(encoding), face_location=location, image_size={'width': width, "height":height}, tag_type=tag_type_people, tag_value=tag_value_unknown)         
            # save the newly found Face object
            new_face.save()
            logger.debug("Saved new_face %s" % new_face.face_file) 
        time_end = time.time()
        logger.debug("total time = {}".format(time_end - time_start))
        logger.debug("time to find faces = {}".format(time_find_faces - time_start_looking))
        logger.debug("time to find encodings = {}".format(time_face_encodings - time_find_faces))
        ts.task_status = TaskStatus.SUCCESS
        ts.comment = "Found %s faces" % len(face_encodings)
        return document_id
    except Exception as e:
        logger.exception("Hit an exception in find_faces_task %s" % str(e))
        ts.task_status = TaskStatus.ERROR
        ts.comment = "An exception while finding faces: %s" % repr(e)
    finally:
        logger.debug("Finally clause in find-faces_task")
        if temp_image:
            temp_image.close()
        if temp_file:
            temp_file.close()
        ts.save(update_fields=['task_status', 'comment'])
        logger.debug("find_faces_task END")

The find_faces task is called as part of a larger chain of tasks that manipulate the images. Each image file goes through this chain, where step_1 and step_2 are chords for different image processing steps:

step_1 = chord( group( clean ), chordfinisher.si() ) # clean creates different image sizes
step_2 = chord( group( jobs ), chordfinisher.si() )  # jobs include find_faces
transaction.on_commit(lambda: chain(step_1, step_2, faces_2, ocr_job, change_state_task.si(document_id, 'ready')).delay())

@app.task
def chordfinisher( *args, **kwargs ):
    return "OK"

The images are large, so it can take up to 30 seconds for the find_faces task to complete. I thought the CELERYD_TASK_SOFT_TIME_LIMIT = 60 would take care of this long processing time.

I am by no means a celery expert, so I assume there is a celery setting or option that I need to enable to make sure the find_faces task completes all the time. I just don't know what that would be.


Solution

  • After some more research, I can up with this suggestion from Lewis Carroll, in this post "Beware the oom-killer, my son! The jaws that bite, the claws that catch!", and this post Chaining Chords produces enormously big messages causing OOM on workers, and this post WorkerLostError: Worker exited prematurely: exitcode 155.

    It seems my celery workers may have been running out of memory, as I did find traces of the dreaded oomkiller in my syslogs. I reconfigured my tasks to just be in a chain (removed all the groups and chords) so each task is run individually in sequence for each image, and the tasks all completed successfully, no matter how many images I processed.