python-3.xdjangocelerydjango-celerycelery-task

How to get realtime stdout from a celery task?


I have a Django project in which i'm running some background tasks with celery, one of my goal is to show real-time stdout of the running task so that the user can see the progress. the issue is that i get the stdout of the process only when it is finished executing (all is dumped at once).

Views.py

def run_script(request, file_type, file_id):
    run_features.delay(arguments)
    return

Tasks.py

@shared_task
def run_features(arguments):
    process = subprocess.Popen(arguments, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)

        channel_layer = get_channel_layer()

        while True:
            output = process.stdout.readline()
            if output == '' and process.poll() is not None:
                break
            if output:
                print(output)
                async_to_sync(channel_layer.group_send)(
                    'output_group',
                    {
                        'type': 'send_output',
                        'output': output.strip()
                    }
                )

        process.wait()
        return process.returncode

Celery INFO

 -------------- celery@ritik v5.3.1 (emerald-rush)
--- ***** ----- 
-- ******* ---- Linux-6.5.0-25-generic-x86_64-with-glibc2.35 2024-03-21 09:30:54
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         Automation:0x7e2bb4073280
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

Expectation - I get the stdout, line by line, while the task is running. Actual result - I get the stdout, all at once, after the subprocess is finished.

Also, I'm not sure if this is a django problem or celery problem.

Thankyou for any help! This is my first stackOverflow question.


Solution

  • Found the solution! just use Pexpect.spawn() instead of subprocess.Popen()

    # Start the process
    child = pexpect.spawn(command, args, encoding='utf-8', timeout=180)
    
    # Log the output
    while True:
        try:
            line = child.readline()
            if not line:
                break
            print(line.strip())
        except pexpect.EOF:
            break
    
    child.wait()
    child.close()
    
    return None