So i want create mongodb source connector and elasticsearch sink connector, the problem is i can't find right configuration for mongodb source connector or elasticsearch sink connector got connected. FYI, im using io.confluent.connect.elasticsearch.ElasticsearchSinkConnector and com.mongodb.kafka.connect.MongoSourceConnector
MongoDB Source Connector
{
"name": "ais-mongodb-source",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"publish.full.document.only": "true",
"database": "ais-user",
"output.json.formatter": "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson",
"offset.partition.name": "ais-user.1",
"output.format.value": "json",
"tasks.max": "1",
"connection.uri": "",
"value.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"change.stream.full.document": "updateLookup"
}
}
Elasticsearch Sink Connector
{
"name": "ais-es-sink-connector",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type.name": "_doc",
"topics": "ais-user.administrator",
"tasks.max": "1",
"key.ignore": "true",
"schema.ignore": "true",
"key.converter.schemas.enable": "false",
"name": "ais-es-sink-connector",
"value.converter.schemas.enable": "false",
"connection.url": "http://es01:9200",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}
I got this error
Caused by: org.apache.kafka.connect.errors.ConnectException: Indexing record failed -> Response status: BAD_REQUEST,\n Index: ais-user.administrator,\n Document Id: ais-user.administrator+0+12
I think this is caused by index on elasticsearch
I want publish key using mongoID format, so i use transform but got an error somekind of Only MAP Supported
Im using output.format.key
to publish mongoID as a key in kafka topic:
"output.format.key": "schema",
"output.schema.key": "{\"type\":\"record\",\"name\":\"keySchema\",\"fields\":[{\"name\":\"documentKey._id\",\"type\":\"string\"}]}"