I started Memgraph with the Confluent Kafka Platform according to the docker-compose.yml file. After that, I created a Kafka Topic called myTopic, to which the data is streamed. Now I want to implement a consumer called my_stream with a transformation module according to the documentation. The stream settings look like this
When I add my transformation module, attach it to the stream and try to connect the stream, the following Error is thrown:
Transformation Failed
Failed to initialize Kafka consumer my_stream : Local: Broker transport failure
A more detailed description of the error from the terminal where the docker-compose.yml file is running:
Looking at the Confluent Cluster Overview, the broker is running just fine. Anyone knows what the problem is? It doesn't seem like my transformation module is the problem, since the movielens-stream from the documentation connects with the same transformation module without problems (obviously there is an error thrown as soon as the transformation starts, but the stream can be connected and runs). But for the sake of completeness, here is my transformation module:
import mgp
import json
@mgp.transformation
def transfer(messages: mgp.Messages) -> mgp.Record(query=str, parameters=mgp.Nullable[mgp.Map]):
result_queries = []
for i in range(messages.total_messages()):
message = messages.message_at(i)
transfer_dict = json.loads(message.payload().decode('utf8'))
result_queries.append(
mgp.Record(
query=(
"""
MERGE (fromAccount:Account {accountId: $fromId, createTime: $fCreateTime, isBlocked: $fIsBlocked, accountType: $fAccountType, nickname: $fNickname, phonenum: $fPhonenum,
email: $fEmail, freqLoginType: $fFreqLoginType, lastLoginTime: $fLastLoginTime, accountLevel: $fAccountLevel})
MERGE (toAccount:Account {accountId: $toId, createTime: $tCreateTime, isBlocked: $tIsBlocked, accountType: $tAccountType, nickname: $tNickname, phonenum: $tPhonenum,
email: $tEmail, freqLoginType: $tFreqLoginType, lastLoginTime: $tLastLoginTime, accountLevel: $tAccountLevel})
MERGE (fromAccount)-[:TRANSFER {amount: $amount, createTime: $eCreateTime, orderNum: $orderNum, comment: $comment, payType: $payType, goodsType: $goodsType,
insertTime: $insertTime, exportTime: timestamp()}]->(toAccount)
"""),
parameters={
"fromId": transfer_dict["fromAccount"]["fromId"], "fCreateTime": transfer_dict["fromAccount"]["createTime"], "fIsBlocked": transfer_dict["fromAccount"]["isBlocked"],
"fAccountType": transfer_dict["fromAccount"]["accountType"], "fNickname": transfer_dict["fromAccount"]["nickname"], "fPhonenum": transfer_dict["fromAccount"]["phonenum"],
"fEmail": transfer_dict["fromAccount"]["email"], "fFreqLoginType": transfer_dict["fromAccount"]["freqLoginType"], "fLastLoginTime": transfer_dict["fromAccount"]["lastLoginTime"],
"fAccountLevel": transfer_dict["fromAccount"]["accountLevel"],
"toId": transfer_dict["toAccount"]["toId"], "tCreateTime": transfer_dict["toAccount"]["createTime"], "tIsBlocked": transfer_dict["toAccount"]["isBlocked"],
"tAccountType": transfer_dict["toAccount"]["accountType"], "tNickname": transfer_dict["toAccount"]["nickname"], "tPhonenum": transfer_dict["toAccount"]["phonenum"],
"tEmail": transfer_dict["toAccount"]["email"], "tFreqLoginType": transfer_dict["toAccount"]["freqLoginType"], "tLastLoginTime": transfer_dict["toAccount"]["lastLoginTime"],
"tAccountLevel": transfer_dict["toAccount"]["accountLevel"],
"amount": transfer_dict["amount"], "eCreateTime": transfer_dict["createTime"], "orderNum": transfer_dict["orderNum"], "comment": transfer_dict["comment"],
"payType": transfer_dict["payType"], "goodsType": transfer_dict["goodsType"], "insertTime": transfer_dict["insertTime"]
}
)
)
return result_queries
So it turned out, that the Broker Transport Failure was thrown, because the docker container could not reach the port 9092 at localhost. The point is, that the docker-compose.yml file creates an own network, where each container can be reached through its name, as described in the docker documentation. In the docker-compose.yml from the memgraph docs , the following line
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
defines, that the broker is listening at address broker:29092 in the compose network, and at localhost:9092 on the host machine. So the correct server address to configure the stream from the memgraph UI would be
broker:29092
If you are using docker (not docker-compose!), then the following command works, to access kafka through localhost:9092 within the memgraph docker, if a local kafka cluster is running.
docker run -p 7687:7687 -p 7444:7444 -p 3000:3000 --network="host" --name memgraph memgraph/memgraph-platform
Note, that now the docker container and the docker host will share the same network, as described here. So if Kafka runs locally the docker container can access port 9092 on localhost