elasticsearchapache-kafkaapache-kafka-connect

Kafka Elasticsearch Sink Connector: Connection Error


I am new to using Kafka and Kafka Connectors. I have been trying to use Elastic Search as a sink to stream data from my application using Kafka Connectors. I am able to see the messages in Kafka, but my connector keeps throwing this error below:

    ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: Couldn't start ElasticsearchSinkTask due to connection error:
    at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:159)
    at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:142)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:122)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:51)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:300)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:189)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: io.searchbox.client.config.exception.CouldNotConnectException: Could not connect to http://elasticsearch:9200
    at io.searchbox.client.http.JestHttpClient.execute(JestHttpClient.java:73)
    at io.searchbox.client.http.JestHttpClient.execute(JestHttpClient.java:63)
    at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.getServerVersion(JestElasticsearchClient.java:247)
    at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:151)
    ... 12 more
Caused by: org.apache.http.conn.HttpHostConnectException: Connect to elasticsearch:9200 [elasticsearch/172.20.0.7] failed: Connection refused (Connection refused)
    at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:159)
    at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:359)
    at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:381)
    at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:237)
    at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
    at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)
    at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:111)
    at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108)
    at io.searchbox.client.http.JestHttpClient.executeRequest(JestHttpClient.java:136)
    at io.searchbox.client.http.JestHttpClient.execute(JestHttpClient.java:70)
    ... 15 more
Caused by: java.net.ConnectException: Connection refused (Connection refused)
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at org.apache.http.conn.socket.PlainConnectionSocketFactory.connectSocket(PlainConnectionSocketFactory.java:75)
    at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
    ... 26 more
[2020-09-22 08:16:32,656] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)

I have dockerised everything, I have 4 containers one for my application, one for elasticsearch and one for kafka and one for a connector, as shown below:

zookeeper:
    container_name: zookeeper
    image: wurstmeister/zookeeper:latest
    env_file:
      - ".env"
    ports:
      - 2181:2181
    networks:
      - "main_net"

  kafka:
    container_name: kafka
    image: wurstmeister/kafka:2.11-1.0.2
    env_file:
      - ".env"
    depends_on:
      - zookeeper
    links:
      - zookeeper
    ports:
      - 9092:9092
      - 9094:9094
    networks:
      - "main_net"

  connector_standalone:
    container_name: container_standalone
    build:
      context: kafka/
      dockerfile: Dockerfile
    depends_on:
      - kafka
    ports:
      - 8083:8083
    networks:
      - "main_net"

elasticsearch:
    container_name: elasticsearch
    build:
      context: elasticsearch/
    ports:
      - "9200:9200"
      - "9300:9300"
    env_file:
      - ".env"
    environment:
      discovery.type: single-node
    networks:
      - "main_net"

website:
    container_name: "application"
    build: "./"
    command: >
    volumes:
      - "./application:/app"
    ports:
      - "8000:8000"
    networks:
      - "main_net"

As far as I understood, I have given the connection URL, based on the host it should connect to, which is the elasticsearch container. But I am confused based on the error whats wrong. The below is my configuration file:

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=vehicle
topic.index=vehicles
connection.url=http://elasticsearch:9200/
connection.user=elastic
connection.password=changeme
type.name=log
key.ignore=true
schema.ignore=true

Solution

  • It's failing because of this

    Connect to elasticsearch:9200 [elasticsearch/172.20.0.7] failed: Connection refused
    

    So either Elasticsearch is not up yet, or it is but not accepting connections.

    Before creating your Kafka Connect connector make sure that Elasticsearch is available from your Kafka Connect worker by running this

    docker exec connector_standalone curl -sS elasticsearch:9200/
    

    You should get something like this back

    {
      "name" : "216261a864bd",
      "cluster_name" : "docker-cluster",
      "cluster_uuid" : "tGHje8KSTPiafT7CLt77uQ",
      "version" : {
        "number" : "7.6.2",
        "build_flavor" : "default",
        "build_type" : "docker",
        "build_hash" : "ef48eb35cf30adf4db14086e8aabd07ef6fb113f",
        "build_date" : "2020-03-26T06:34:37.794943Z",
        "build_snapshot" : false,
        "lucene_version" : "8.4.0",
        "minimum_wire_compatibility_version" : "6.8.0",
        "minimum_index_compatibility_version" : "6.0.0-beta1"
      },
      "tagline" : "You Know, for Search"
    }