flaskstreamyieldflask-smorest

Issues in Streaming response from flask-smorest


I'm encountering issues in streaming response in flask-smorest. I'm following the guidance here - https://flask.palletsprojects.com/en/2.3.x/patterns/streaming/ for streaming responses from my flask-smorest application. Below is the MRE version of my code. Say my application is fetching foreign exchange rates for the past 1000 days for any currency requested by the end user.

This is the version without using streaming. It works perfectly and returns a list of json responses:

from flask import request, Response
from flask.views import MethodView
from flask_smorest import Blueprint, abort
from marshmallow import Schema, fields
import asyncio

class CurrencySchema(Schema):
    name = fields.Str()
    rate = fields.Str()
    date = fields.Str()
    source = fields.Str()

blp = Blueprint("test",__name__, description="test")

@blp.route("/test")
class Test(MethodView):
    @blp.response(200, CurrencySchema(many=True))
    def get(self):
        currency = request.args.get('currency')
        results = asyncio.run(func_that_fetches_currency_rates_from_three_APIs(
            currency))  # returns a list of dictionaries
        return results

When I run this, it successfully runs and returns a list of json responses on my browser, like:

[{'name': 'USD', 'rate': '1.2333', 'date': 'Mar 21, 2024', 'source': 'currency.com'}, 
 {'name': 'USD', 'rate': '1.2121', 'date': 'Mar 22, 2024', 'source': 'currency.com'}, 
 .................so on and so forth up to 1000 jsons]

Now, comes the part when I try streaming the responses. I make the below changes to my code:

@blp.route("/test")
class Test(MethodView):
    @blp.response(200, CurrencySchema(many=True))
    def get(self):
        currency = request.args.get('currency')        
        results = asyncio.run(func_that_fetches_currency_rates_from_three_APIs(
            currency))  # returns a list of dictionaries
        def generate_rates():
             batch_size = 100
             for i in range(0, len(results), batch_size):
                  yield results[i:i+batch_size]
        return generate_rates()

This strangely returns a list of 50 empty json responses:

[{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {},
 {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {},
 {}, {}, {}, {}, {}, {}, {}, {}]

I also tried this, but with the same result i.e. list of empty json responses, but additionally flask-smorest gave me:

AssertionError: applications must write bytes

Seems like the werkzeug serving.py file was throwing issues.

@blp.route("/test")
class Test(MethodView):
    @blp.response(200, CurrencySchema(many=True))
    def get(self):
        currency = request.args.get('currency')        
        results = asyncio.run(func_that_fetches_currency_rates_from_three_APIs(
            currency))  # returns a list of dictionaries
        def generate_rates():
             batch_size = 100
             for i in range(0, len(results), batch_size):
                  yield results[i:i+batch_size]
        return Response(generate_rates(), mimetype = 'application/json')

My entire application is ready and this is the last bit that is giving issues. I want to stream the responses, and there is something in flask-smorest that is causing the issue. Would really appreciate your support. Thanks!


Solution

  • So I was finally able to resolve this issue. The root cause of the issue was the @blp.response decorator that was not accepting generator responses. Finally ended up removing the decorator from my code and manually serializing the results. Here's the final code-

    @blp.route("/test")
    class Test(MethodView):
        #@blp.response(200, CurrencySchema(many=True))           #removing the decorator
        def get(self):
            currency = request.args.get('currency')
            schema = CurrencySchema()
    
            results = asyncio.run(func_that_fetches_currency_rates_from_three_APIs(
                currency))  # returns a list of dictionaries 
    
            @stream_with_context
            def generate_results():
                yield '['
                for result in results[:-1]:
                    yield schema.dumps(result)
                    yield ', '
                yield schema.dumps(results[-1])
                yield ']'
    
            return Response(generate_results(), mimetype='application/json')