pythondjangocelerydjango-celerycelery-task

how to get task status in celery


I have a celery task and I want the status of the task using task ID. I have read the previous answers but couldn't make it work. I used the command

celery -A proj inspect query_task e9f6c8f0-fec9-4ae8-a8c6-cf8c8451d4f8pe

command result

celery.py

import os
from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "coutoEditor.settings")
app = Celery("coutoEditor")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

settings.py

CELERY_BROKER_URL = "redis://localhost:6379"
CELERY_RESULT_BACKEND = "redis://localhost:6379"

tasks.py

@shared_task()
def speed_up_vid_task(input_path, speed_factor, start, end):


    '''
    Method Params:
    input_path: The video file url or directory path file name
    speed_factor: The factor for the speed up process
    start: start of the video part that needs to be sped up (in secs)
    end: end of the video part that needs to be sped up (in secs)
    '''

    start = convert_to_sec(start)
    end = convert_to_sec(end)

    filename = str(uuid.uuid4())
    print(filename, "new")
    temporary_dir = BASE_DIR + '/' + editor_speedUp_temp_dir  # editor_temp_dir = media/editor/speed_vid/temp/"
    output_dir = BASE_DIR + '/' + editor_speedUp_output_dir  # editor_speedUp_output_dir = media/editor/speed_vid/

    # Check for broken url
    r = requests.get(input_path, stream=True)
    if not r.status_code == 200:
        return Response({
            'message': "media file is corrupted",
            'data': "broken url process could not be completed",
            'status': False
        }, status=status.HTTP_400_BAD_REQUEST)

    if not os.path.exists(output_dir):
        os.mkdir(output_dir)

    if not os.path.exists(temporary_dir):
        os.mkdir(temporary_dir)

    stream = os.popen(
        "ffmpeg.ffprobe -loglevel error -select_streams a -show_entries stream=codec_type -of csv=p=0 '{}'".format(
            input_path))
    output = stream.read()
    if len(output) == 0:
        input_path_vid = os.path.join(BASE_DIR, temporary_dir) + filename + "_temp_video.mp4"
        cmd = "ffmpeg -f lavfi -i anullsrc=channel_layout=stereo:sample_rate=44100 -i '{}' -c:v copy -c:a aac -shortest {}".format(
            input_path, input_path_vid)
        os.system(cmd)
    else:
        # check if it's a directory or a url
        if(os.path.isfile(input_path)):
            input_path_vid = BASE_DIR + input_path
            pass
        else:
            ext_name = filename + '_temp_video.mp4'
            ext_path = temporary_dir + ext_name
            r = requests.get(input_path)
            with open(ext_path, 'wb') as outfile:
                outfile.write(r.content)
            outfile.close()
            input_path_vid = ext_path

    output_path = os.path.join(BASE_DIR, editor_speedUp_output_dir + filename + ".mp4")

    cmd = 'ffmpeg -i ' + input_path_vid + ' \
            -filter_complex \
            "[0:v]trim=0:' + str(start) + ',setpts=PTS-STARTPTS[v1]; \
            [0:v]trim=' + str(start) + ':' + str(end) + ',setpts=1/' + str(speed_factor) + '*(PTS-STARTPTS)[v2]; \
            [0:v]trim=' + str(end) + ',setpts=PTS-STARTPTS[v3]; \
            [0:a]atrim=0:' + str(start) + ',asetpts=PTS-STARTPTS[a1]; \
            [0:a]atrim=' + str(start) + ':' + str(end) + ',asetpts=PTS-STARTPTS,atempo=' + str(speed_factor) + '[a2]; \
            [0:a]atrim=' + str(end) + ',asetpts=PTS-STARTPTS[a3]; \
            [v1][a1][v2][a2][v3][a3]concat=n=3:v=1:a=1" \
            -preset superfast -profile:v baseline ' + output_path

    os.system(cmd)

    generated_video = open(output_path, "rb")
    generated_video_file = TemporaryFiles.objects.create(temp_file=File(generated_video, name=filename + ".mp4"),
                                                         created_at=datetime.utcnow())
    generated_video.close()

    if os.path.exists(input_path_vid):
        os.remove(input_path_vid)

    if os.path.exists(output_path):
        os.remove(output_path)

    res_dict = {}
    res_dict["video_url"] = os.path.join(BASE_URL, generated_video_file.temp_file.url[1:])

    return res_dict

views.py

class speed_up_video(APIView):
    def post(self,request):
        video_url = request.data["video_url"]
        speed_factor = request.data["speed_factor"]
        start = request.data["start"]
        end = request.data["end"]
        result_vid = speed_up_vid_task.delay(video_url, speed_factor, start, end)
        return Response(result_vid.get())```



I was trying to implement celery on django and after some trials it worked and showed the output in the celery terminal. I want to get the status of a task from terminal using the task id. The django, redis and celery servers are running fine. I am learning to implement celery and I am stuck on getting this part. I am using redis as a broker and backend database. I have seen a blog about getting task status from redis but couln't make it work. If there is any other way to make the query then please answer.

when I do bind=True in task decorator my django app throws argument error.

Thanks.


Solution

  • Django have django-celery-results have model TaskResult for keep result of task. You can query for check status.

    But it run in celery, you must wait task queue, run and finished before can get data. One way todo it is loop and wait. Like this:

    all_tasks = []
    task = speed_up_vid_task.delay(video_url, speed_factor, start, end)
    all_tasks.append(task.task_id)
    while len(all_tasks) > 0:
        time.sleep(1)
        for task in TaskResult.objects.filter(task_id__in=all_tasks):
            if task.status in ['SUCCESS', 'FAILURE']:
                # do anything you want in this
                all_tasks.remove(task.task_id)