I'm trying to wrap the Confluent kafka proxy api in one class that will handle producing and consuming.
Following this link: https://docs.confluent.io/platform/current/kafka-rest/api.html I tried to implement it as follows:
def send(self, topic, data):
try:
r = requests.post(self._url('/topics/' + topic), json=data, headers=headers_v2)
if not r.ok:
raise Exception("Error: ", r.reason)
except Exception as e:
print(" ")
print('Event streams send request failed')
print(Exception, e)
print(" ")
return e
but I ended up working with 2 versions of the api (v2/v3) cause I didn't find some api's in one implementation and vise versa...
For example I didn't find how to create topic in v2, so I implemented it with v3.
My issue now is with the send
method, I'm getting Internal server error
and I can't find why!
Maybe because the create topic was done with v3 and I'm trying to produce messages with v2.
I changed the data payload for the send to look like:
data = {"records": [{"value": data}]}
and send passed,
poll passed when using:
r = requests.get(self._url('/consumers/' + self.consumer_group + '/instances/' + self.consumer + '/records'), headers={'Accept': 'application/vnd.kafka.json.v2+json'})