elasticsearchapache-beamdirect-runner

ApacheBeam ElasticsearchIO is not working with latest elasticsearch


I've been trying to use ElasticsearchIO API's in apache beam pipeline. And I'm unable to connect to elasticsearch. Any help would be great.

My JAR versions:

org.apache.beam:beam-sdks-java-core:2.37.0

org.apache.beam:beam-sdks-java-io-elasticsearch:2.37.0

My Elastic version is: 8.1.2

The documentation says, only till v2.x is supported

Link: https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.html

Can I not work with latest version of elastic? Has anyone tried and made it working with latest version?

My Pipeline:

public class Test {
    public static void main(String[] args) {
        PipelineOptions options = PipelineOptionsFactory.create();

        org.apache.beam.sdk.Pipeline pipeline = org.apache.beam.sdk.Pipeline.create(options);

        PCollection<FileIO.ReadableFile> files = pipeline.apply(
                        FileIO.match().filepattern("/path_to_files/**")
                                .continuously(
                                        Duration.standardSeconds(10),
                                        Watch.Growth.never()))
                .apply(FileIO.readMatches());

        PCollection<FileIO.ReadableFile> filteredFiles =  files.apply(Filter.by(new EndsWithFilter("Dummy.txt")));
        
        filteredFiles.apply(ParDo.of(new DummyFileProcessor())).apply(ElasticsearchIO.write().withConnectionConfiguration(ElasticsearchIO.ConnectionConfiguration.create(new String[]{ "http://****:9200" }, "***").withUsername("***").withPassword("***")));

        pipeline.run().waitUntilFinish();
    }
}

Please find the Stacktrace, which says that the the elastic search version cannot be found

Exception in thread "main" org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Cannot get Elasticsearch version
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
    at org.apache.beam.runners.direct.DoFnLifecycleManager.get(DoFnLifecycleManager.java:62)
    at org.apache.beam.runners.direct.ParDoEvaluatorFactory.createEvaluator(ParDoEvaluatorFactory.java:131)
    at org.apache.beam.runners.direct.ParDoEvaluatorFactory.forApplication(ParDoEvaluatorFactory.java:81)
    at org.apache.beam.runners.direct.TransformEvaluatorRegistry.forApplication(TransformEvaluatorRegistry.java:162)
    at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:122)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Cannot get Elasticsearch version
    at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
    at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$DocToBulk$DocToBulkFn$DoFnInvoker.invokeSetup(Unknown Source)
    at org.apache.beam.sdk.transforms.reflect.DoFnInvokers.tryInvokeSetupFor(DoFnInvokers.java:53)
    at org.apache.beam.runners.direct.DoFnLifecycleManager$DeserializingCacheLoader.load(DoFnLifecycleManager.java:107)
    at org.apache.beam.runners.direct.DoFnLifecycleManager$DeserializingCacheLoader.load(DoFnLifecycleManager.java:92)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
    ... 13 more
Caused by: java.lang.IllegalArgumentException: Cannot get Elasticsearch version
    at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion(ElasticsearchIO.java:2592)
    at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$DocToBulk$DocToBulkFn.setup(ElasticsearchIO.java:1647)
Caused by: org.apache.http.ConnectionClosedException: Connection is closed
    at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:839)
    at org.elasticsearch.client.RestClient.performRequest(RestClient.java:259)
    at org.elasticsearch.client.RestClient.performRequest(RestClient.java:246)
    at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion(ElasticsearchIO.java:2579)
    at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$DocToBulk$DocToBulkFn.setup(ElasticsearchIO.java:1647)
    at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$DocToBulk$DocToBulkFn$DoFnInvoker.invokeSetup(Unknown Source)
    at org.apache.beam.sdk.transforms.reflect.DoFnInvokers.tryInvokeSetupFor(DoFnInvokers.java:53)
    at org.apache.beam.runners.direct.DoFnLifecycleManager$DeserializingCacheLoader.load(DoFnLifecycleManager.java:107)
Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Cannot get Elasticsearch version

    at org.apache.beam.runners.direct.DoFnLifecycleManager$DeserializingCacheLoader.load(DoFnLifecycleManager.java:92)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
    at org.apache.beam.runners.direct.DoFnLifecycleManager.get(DoFnLifecycleManager.java:62)
    at org.apache.beam.runners.direct.ParDoEvaluatorFactory.createEvaluator(ParDoEvaluatorFactory.java:131)
    at org.apache.beam.runners.direct.ParDoEvaluatorFactory.forApplication(ParDoEvaluatorFactory.java:81)
    at org.apache.beam.runners.direct.TransformEvaluatorRegistry.forApplication(TransformEvaluatorRegistry.java:162)
    at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:122)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalArgumentException: Cannot get Elasticsearch version

Caused by: org.apache.http.ConnectionClosedException: Connection is closed
    at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.endOfInput(HttpAsyncRequestExecutor.java:356)
    at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:261)
    at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
Caused by: org.apache.http.ConnectionClosedException: Connection is closed

    at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
    at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
    at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
    at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
    at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
    ... 1 more

> Task :IO-TextIO-TextIO_Read:Test.main() FAILED

Execution failed for task ':IO-TextIO-TextIO_Read:Test.main()'.

Solution

  • I just found out that this below issue where ElasticsearchIO doesn't work with Elasticsearch 8.*

    https://github.com/GoogleCloudPlatform/DataflowTemplates/issues/367

    Once I downgraded it to 7.13.2, it worked just fine