pythonflasksocket.ioflask-socketio

Sending events from Celery task - Flask-SocketIO + Celery


I'm trying to send events from a Celery task to the server, so I can send data to the Client.

enter image description here

  1. A User submits a form with text
  2. The server receives the form submission
  3. The server sends the form data to the Celery task queue
  4. Celery processes the task
  5. Celery sends the processed data back to the server
  6. The server sends the data to the Client, where it's displayed on the web page

Here is what I have currently:

routes.py

@main_bp.route('/', methods=['GET', 'POST'])
def index():
    form = UserTextForm()
    if request.method == 'POST':
        request_data = request.form.to_dict()

        user_text = request_data['user_text']

        run_processing_task.apply_async(args=[user_text])

    return render_template('index.html', form=form)

@socketio.on('my_event', namespace='/user_text_task_results')
def user_test_task_results(my_event):
    # Sends data to client
    send(my_event, broadcast=True)
celery_tasks.py

@celery.task(bind=True)
def run_processing_task(self, user_text):
    print('Celery Function kicked off')

    local_socketio = SocketIO(message_queue='redis://localhost:6379/0')

    time.sleep(3)
    user_text_reverse = user_text[::-1]
    local_socketio.emit('my_event', {'data': user_text_reverse}, namespace='/user_text_task_results')

    print('Task Completed')

    return 'Finished'
index.html

<head>
    <link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.3.1/css/bootstrap.min.css" integrity="sha384-ggOyR0iXCbMQv3Xipma34MD+dH/1fQ784/j6cY/iJTQUOhcWr7x9JvoRxT2MZw1T" crossorigin="anonymous">
    <script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.0.1/socket.io.min.js"></script>
    <script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>

    <h1>Welcome</h1>
</head>


<form method="POST" action="{{ url_for('main_bp.index') }}" class="col-6">
  {{ form.hidden_tag() }}
    <div class="form-group">
    {{ form.user_text(class="form-control", id="user-text-area") }}
  </div>
  <button type="submit" class="btn btn-primary">Submit</button>
</form>

<div id="test-app">
</div>

<script>
    $(document).ready(function() {
        var socket = io.connect('http://127.0.0.1:5000');

        socket.on('connect', function() {
            socket.send('User Connected')
        });

        socket.on('message', function(results){
            var reversed_text = results['data']
            $("$test-app").append('<p>'+reversed_text+</p>)
        });
    });
</script>

When I run the above, the task is processed, but I don't think the server receives the data local_socketio.emit(...). I can see that the Celery task finishes successfully, but I don't see anything to indicate the data is flowing through to the server and ultimately the client.

I tried installing gevent and gevent-websocket, but then the index won't event load for the initial GET request (it just hangs in a loading state)


Solution

  • Celery workers typically run the same code as the Flask app, but they are not running as flask servers, so websockets from celery to flask aren't easily a thing. (I've never seen it done, but maybe someone has ironed out the tricky parts.)

    Assuming that you want to avoid having the client or app poll for task completion, an alternative is to have the worker signal completion by making an HTTP request to the app. I.e., something like a POST to

    /tasks/complete/<task_id>
    

    Then it's a app issue to do the bookkeeping to associate a task_id with a particular websocket.

    Adding:

    Celery has a post_run signal that is supposed to serve the same purpose. I didn't have luck with it a few years ago, but am now thinking I did something dumb.