I have an app written using the Twisted framework, that communicates various data (mostly - log entries in JSON format) to a variety of logging servers using output plug-ins. I would like to make it able to send the data to a Kafka server, too - but I am hitting some kind of problem that I don't know how to solve.
If I send data to the Kafka server in a straightforward way using Python and the kafka-python
module, everything works just fine:
from json import dumps
from kafka import KafkaProducer
site = '<KAFKA SERVER>'
port = 9092
username = '<USERNAME>'
password = '<PASSWORD>'
topic = 'test'
producer = KafkaProducer(
bootstrap_servers='{}:{}'.format(site, port),
value_serializer=lambda v: bytes(str(dumps(v)).encode('utf-8')),
sasl_mechanism='SCRAM-SHA-256',
security_protocol='SASL_SSL',
sasl_plain_username=username,
sasl_plain_password=password
)
event = {
'message': 'Test message'
}
try:
producer.send(topic, event)
producer.flush()
print('Message sent.')
except Exception as e:
print('Error producing message: {}'.format(e))
finally:
producer.close()
However, if I try to send it from my actual Twisted app, using pretty much the same code, it hangs:
from json import dumps
from core import output
from kafka import KafkaProducer
from twisted.python.log import msg
class Output(output.Output):
def start(self):
site = '<KAFKA SERVER>'
port = 9092
username = '<USERNAME>'
password = '<PASSWORD>'
self.topic = 'test'
self.producer = KafkaProducer(
bootstrap_servers='{}:{}'.format(site, port),
value_serializer=lambda v: bytes(str(dumps(v)).encode('utf-8')),
sasl_mechanism='SCRAM-SHA-256',
security_protocol='SASL_SSL',
sasl_plain_username=username,
sasl_plain_password=password
)
def stop(self):
self.producer.flush()
self.producer.close()
def write(self, event):
try:
self.producer.send(self.topic, event)
self.producer.flush()
except Exception as e:
msg('Kafka error: {}'.format(e))
(This won't run out-of-the box; it's just the plug-in and uses a generic Output
class defined elsewhere.)
In particular, it hangs at the self.producer.send(self.topic, event)
line.
I think that the problem comes from the fact that the Kafka producer in kafka-python
is synchronous (blocking) and Twisted requires asynchronous (non-blocking) code. There is an asynchronous Kafka module, called afkak
- but it doesn't seem to provide authentication with the Kafka server, so it is not suitable for my needs.
The way I understand it, the way to get around such problems in Twisted is to use deferreds. However, I have been unable to understand how exactly to do it. If I rewrite the write
method like this
def write(self, event):
d = threads.deferToThread(self.postentry, event)
d.addCallback(self.postentryCallback)
return d
def postentryCallback(self):
reactor.stop()
def postentry(self, event):
try:
self.producer.send(self.topic, event)
self.producer.flush()
except Exception as e:
msg('Kafka error: {}'.format(e))
it no longer hangs when trying to send data to the Kafka server - but it hangs when the application terminates and nothing is send to the Kafka server anyway (which I can verify with a separate, consumer script, written in pure Python).
Any ideas what I am doing wrong and how to fix it?
Twisted has an asynchronous implementation of some typical otherwise blocking code, like making GET/POST requests to a web server - but Kafka is not a web server, it runs on port 9092, so I don't think that I can use that.
aiokafka
is supposed to be a Python module for asynchronous communication with a Kafka server - but I couldn't make it work.
Eventually, I solved my problem with the confluent-kafka
module. Unfortunately, it is not a drop-in replacement for the kafka-python
module, so the code had to be rewritten a bit:
from json import dumps
from core import output
from confluent_kafka import Producer
from twisted.python.log import msg
class Output(output.Output):
def start(self):
site = <KAFKA SERVER>
port = 9092
username = '<USERNAME>'
password = '<PASSWORD>'
self.topic = 'test'
self.producer = Producer({
'bootstrap.servers': '{}:{}'.format(site, port),
'sasl.mechanism': 'SCRAM-SHA-256',
'security.protocol': 'SASL_SSL',
'sasl.username': username,
'sasl.password': password
})
def stop(self):
self.producer.flush()
def write(self, event):
self.postentry(event)
def delivery_callback(self, err, message):
if err:
msg('Kafka error: {}'.format(err))
def postentry(self, event):
try:
self.producer.produce(
self.topic,
bytes(str(dumps(event)).encode('utf-8')),
callback=self.delivery_callback
)
self.producer.poll(0)
self.producer.flush()
except Exception as e:
msg('Kafka error: {}'.format(e))