Lets consider this piece of code
from twisted.web import server, resource
from twisted.internet.task import LoopingCall
from confluent_kafka import Consumer, KafkaException
import json
# Function to handle Kafka consumer
def kafka_consumer():
def fetch_data():
def poll_kafka():
msg = consumer.poll(0.1)
if msg is None:
return
if msg.error():
if msg.error().code() == KafkaException._PARTITION_EOF:
return
else:
return
else:
print("message", msg, msg.value())
consumer.commit() # Manually commit the offset
# Execute Kafka polling in a separate thread
d1 = threads.deferToThread(poll_kafka)
def start_loop():
lc = LoopingCall(fetch_data)
lc.start(0.5)
conf = {
'bootstrap.servers': 'kafka_internal-1:29093',
'group.id': 'your_consumer_group-2',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False # Disable autocommit
}
consumer = Consumer(conf)
consumer.subscribe(['jithin_test']) # <-- is it a blocking call??
start_loop()
# Web service handler
class WebService(resource.Resource):
isLeaf = True
def render_GET(self, request):
# You can customize the response according to your needs
response = {
'message': 'Welcome to the Kafka consumer service!'
}
return json.dumps(response).encode('utf-8')
if __name__ == '__main__':
reactor.callWhenRunning(kafka_consumer)
# Run the Twisted web service
root = WebService()
site = server.Site(root)
reactor.listenTCP(8180, site)
reactor.run()
Here I'm instantiating Consumer
object from confluent-kafka
in the reactor thread,
then leave the subsequent poll()
to deferToThread()
,
I've few question regarding this,
Consumer.subscribe()
is it a blocking call? should I be deferTothread
this method when invoking?poll_kafka
using deferToThread
(as per my understanding, every time the thread that we run using deferToThread
would be from the threadpool and there is no guarantee that we will be using the same thread)?Nb: the code is written in python2
it's an integration of some legacy system porting the whole stuff is not possible atm and most of the other libraries are available only support python 3+.
If you want to defer things to a thread that is not the reactor thread, I would recommend using https://docs.twistedmatrix.com/en/stable/api/twisted.internet.threads.html#deferToThreadPool with a custom https://docs.twistedmatrix.com/en/stable/api/twisted.python.threadpool.ThreadPool.html that you manage yourself, with minthreads=1, maxthreads=1
.
You do have to do a bit of annoying lifecycle management of this thread pool, but in this simple example that would just be .start()
in kafka_consumer
and .stop()
in something added to reactor.addSystemEventTrigger("before", "shutdown", ...)
.