apache-kafkadebeziummongodb-kafka-connector

How to generate kafka key with mongoID in MongoDB Source Connector


So i want create mongodb source connector and elasticsearch sink connector, the problem is i can't find right configuration for mongodb source connector or elasticsearch sink connector got connected. FYI, im using io.confluent.connect.elasticsearch.ElasticsearchSinkConnector and com.mongodb.kafka.connect.MongoSourceConnector

MongoDB Source Connector

{
    "name": "ais-mongodb-source",
    "config": {
        "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
        "publish.full.document.only": "true",
        "database": "ais-user",
        "output.json.formatter": "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson",
        "offset.partition.name": "ais-user.1",
        "output.format.value": "json",
        "tasks.max": "1",
        "connection.uri": "",
        "value.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "change.stream.full.document": "updateLookup"
    }
}

Elasticsearch Sink Connector

{
    "name": "ais-es-sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "type.name": "_doc",
        "topics": "ais-user.administrator",
        "tasks.max": "1",
        "key.ignore": "true",
        "schema.ignore": "true",
        "key.converter.schemas.enable": "false",
        "name": "ais-es-sink-connector",
        "value.converter.schemas.enable": "false",
        "connection.url": "http://es01:9200",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter"
    }
}

I got this error

Caused by: org.apache.kafka.connect.errors.ConnectException: Indexing record failed -> Response status: BAD_REQUEST,\n Index: ais-user.administrator,\n Document Id: ais-user.administrator+0+12

I think this is caused by index on elasticsearch

I want publish key using mongoID format, so i use transform but got an error somekind of Only MAP Supported


Solution

  • Im using output.format.key to publish mongoID as a key in kafka topic:

    "output.format.key": "schema",
    "output.schema.key": "{\"type\":\"record\",\"name\":\"keySchema\",\"fields\":[{\"name\":\"documentKey._id\",\"type\":\"string\"}]}"