pythonapache-kafkafaust

"encoding without a string argument" error in Faust agent that sends data to a sink?


I'm trying to send some data from one Kafka topic to another, using sink in Faust agent, and I keep receiving this error:

File "/.../venv/lib/python3.8/site-packages/schema_registry/serializers/message_serializer.py", line 49, in <lambda>
    return lambda record, fp: schemaless_writer(fp, avro_schema.schema, record)
Crashed reason=TypeError('encoding without a string argument')

I can see in debug that it is happening somewhere in schemaless_writer function in fastavro, but its code is not clear in the source code, so I'm not sure what exactly is happening there: https://github.com/fastavro/fastavro/blob/357543aebb94a4fd593b035f8501b20ac66d3b17/fastavro/_write.pyi

My thinking is, it is trying to “encode” every field in sent data, and it doesn't work for every field, which isn't a string (I have some numbers, booleans, datetime and a dict there). I tried to convert every field into string before sending it, but it didn't work too because it would conflict with my schema, for example:

Crashed reason=ValueError("'True' (type <class 'str'>) do not match ['null', 'boolean']") 

Also tied to do this with

await converted_payment_topic.send(value=result)

but got the same result.

Here is my current code:

app = faust.App(f'testConvertedPayments',
          version=1,
          key_serializer='raw',
          value_serializer='raw',
          broker=KAFKA_URL,
          web_port=9090)

payment_topic = app.topic(payment_topic)
converted_payment_topic = app.topic(converted_payment_topic)

@app.agent(payment_topic, sink=[converted_payment_topic], concurrency=3)
async def payment_stream(payment):
  async for part in payment.take(1000, within=30):
    for x in part:
      result = make_payment_row(x) #just some data manipulation and making a dict
      result = {k: v for k, v in result.items() if v is not None}
      yield result

Similar to all the examples that I saw in the documentation, the difference is that I have not only strings in my data, is there a way around this problem, without changing my schema and all data types to strings?

Thank you!


Solution

  • Figured it out, I needed to send a faust.Record model, not a dict. from_dict method of faust.Record class can be used to generate a record from dict.