apache-kafkamemgraphdbmemgraph

Memgraph Broker Transport Failure registering Kafka Stream Consumer


I started Memgraph with the Confluent Kafka Platform according to the docker-compose.yml file. After that, I created a Kafka Topic called myTopic, to which the data is streamed. Now I want to implement a consumer called my_stream with a transformation module according to the documentation. The stream settings look like this Stream Settings

When I add my transformation module, attach it to the stream and try to connect the stream, the following Error is thrown:

Transformation Failed
Failed to initialize Kafka consumer my_stream : Local: Broker transport failure 

A more detailed description of the error from the terminal where the docker-compose.yml file is running:

MemgraphError

Looking at the Confluent Cluster Overview, the broker is running just fine. Anyone knows what the problem is? It doesn't seem like my transformation module is the problem, since the movielens-stream from the documentation connects with the same transformation module without problems (obviously there is an error thrown as soon as the transformation starts, but the stream can be connected and runs). But for the sake of completeness, here is my transformation module:

    import mgp 
    import json
    
    @mgp.transformation
    def transfer(messages: mgp.Messages) -> mgp.Record(query=str, parameters=mgp.Nullable[mgp.Map]):
      result_queries = []
    
      for i in range(messages.total_messages()):
        message = messages.message_at(i)
        transfer_dict = json.loads(message.payload().decode('utf8'))
        result_queries.append(
          mgp.Record(
            query=(
              """
            MERGE (fromAccount:Account {accountId: $fromId, createTime: $fCreateTime, isBlocked: $fIsBlocked, accountType: $fAccountType, nickname: $fNickname, phonenum: $fPhonenum, 
            email: $fEmail, freqLoginType: $fFreqLoginType, lastLoginTime: $fLastLoginTime, accountLevel: $fAccountLevel})
            MERGE (toAccount:Account {accountId: $toId, createTime: $tCreateTime, isBlocked: $tIsBlocked, accountType: $tAccountType, nickname: $tNickname, phonenum: $tPhonenum, 
            email: $tEmail, freqLoginType: $tFreqLoginType, lastLoginTime: $tLastLoginTime, accountLevel: $tAccountLevel})
            MERGE (fromAccount)-[:TRANSFER {amount: $amount, createTime: $eCreateTime, orderNum: $orderNum, comment: $comment, payType: $payType, goodsType: $goodsType, 
            insertTime: $insertTime, exportTime: timestamp()}]->(toAccount)
            """),
            parameters={
              "fromId": transfer_dict["fromAccount"]["fromId"], "fCreateTime": transfer_dict["fromAccount"]["createTime"], "fIsBlocked": transfer_dict["fromAccount"]["isBlocked"],
              "fAccountType": transfer_dict["fromAccount"]["accountType"], "fNickname": transfer_dict["fromAccount"]["nickname"], "fPhonenum": transfer_dict["fromAccount"]["phonenum"], 
              "fEmail": transfer_dict["fromAccount"]["email"], "fFreqLoginType": transfer_dict["fromAccount"]["freqLoginType"], "fLastLoginTime": transfer_dict["fromAccount"]["lastLoginTime"], 
              "fAccountLevel": transfer_dict["fromAccount"]["accountLevel"],
              "toId": transfer_dict["toAccount"]["toId"], "tCreateTime": transfer_dict["toAccount"]["createTime"], "tIsBlocked": transfer_dict["toAccount"]["isBlocked"],
              "tAccountType": transfer_dict["toAccount"]["accountType"], "tNickname": transfer_dict["toAccount"]["nickname"], "tPhonenum": transfer_dict["toAccount"]["phonenum"],
              "tEmail": transfer_dict["toAccount"]["email"], "tFreqLoginType": transfer_dict["toAccount"]["freqLoginType"], "tLastLoginTime": transfer_dict["toAccount"]["lastLoginTime"],
              "tAccountLevel": transfer_dict["toAccount"]["accountLevel"],
              "amount": transfer_dict["amount"], "eCreateTime": transfer_dict["createTime"], "orderNum": transfer_dict["orderNum"], "comment": transfer_dict["comment"], 
              "payType": transfer_dict["payType"], "goodsType": transfer_dict["goodsType"], "insertTime": transfer_dict["insertTime"]
            }
          )
        )
        return result_queries


Solution

  • So it turned out, that the Broker Transport Failure was thrown, because the docker container could not reach the port 9092 at localhost. The point is, that the docker-compose.yml file creates an own network, where each container can be reached through its name, as described in the docker documentation. In the docker-compose.yml from the memgraph docs , the following line

    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
    

    defines, that the broker is listening at address broker:29092 in the compose network, and at localhost:9092 on the host machine. So the correct server address to configure the stream from the memgraph UI would be

    broker:29092

    If you are using docker (not docker-compose!), then the following command works, to access kafka through localhost:9092 within the memgraph docker, if a local kafka cluster is running.

    docker run -p 7687:7687 -p 7444:7444 -p 3000:3000 --network="host" --name memgraph memgraph/memgraph-platform
    

    Note, that now the docker container and the docker host will share the same network, as described here. So if Kafka runs locally the docker container can access port 9092 on localhost