elasticsearchapache-kafkaapache-flinkkibana

Realtime data doesn't show up on elasticsearch, kibana


I tried realtime data streams project and use kafka, elasticsearch, kibana, postgres with docker compose and flink.
My data streams like this :
kafka -> flink -> elasticsearch and postgres.

When I tried to writing kafka streams data into elasticsearch but on kibana dev tools console(GET index/_search or GET index) I can't find new data until cancel flink job.

flink job start -> can't find new data on kibana -> cancel flink job -> now I can see new data on kibana.

Part of my code is

DataStream<Transaction> transactionStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka source");

        transactionStream.sinkTo(
                new Elasticsearch7SinkBuilder<Transaction>()
                        .setHosts(new HttpHost("localhost", 9200, "http"))
                        .setEmitter((transaction, runtimeContext, requestIndexer) -> {
                            String json = convertTransactionToJson(transaction);

                            IndexRequest indexRequest = Requests.indexRequest()
                                    .index("transactions")
                                    .id(transaction.getTransactionId())
                                    .source(json, XContentType.JSON);
                            requestIndexer.add(indexRequest);
                        })
                        .build()
        ).name("Elasticsearch Sink");

Postgres DB update is fine.

I use Mac and
Java version: 11
flink : 1.18.0
flink connector kafka : 3.0.1-1.18
flink sql connector elasticsearch7 : 3.0.1-1.17

What I tried:

  1. attach setBulkFlushInterval(30000) option Because I found this log WARN org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter - Writer was closed before all records were acknowledged by Elasticsearch.

but another error occurs
Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 200 OK}

  1. Clone original code repository

My code exactly same with this repository https://github.com/airscholar/FlinkCommerce
https://www.youtube.com/watch?v=deepQRXnniM

So I clone this, try execute, but same problem happens. On his youtube this problem doesn't happen.

What can I do for this?


Solution

  • The Elasticsearch sink either commits transactions during checkpointing, or it flushes its buffer once it contains 1000 actions. Enabling checkpointing is probably the best solution.

    https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/elasticsearch/#elasticsearch-sinks-and-fault-tolerance