javaspring-bootlogstashelasticsearch-5kibana-6

How to Index our CSV file into Elastic Search using Java?


Is there any way to index our CSV file into Elastic Search using Java , I was doing it using logstash earlier but now I need to code it in Java and run dynamically .. I tried with Index API but it doesn't work for my condition Can someone help me with that .. My CSV Data looks something like this this is just a sample I have this object in bulk ..

sample CSV Data is something like this ..

 id  profile_id  hier_name       attri_name     item
  1   1          CUSTOMER        CUSTOMER        C001
  2   1          CUSTOMER        CUSTOMER        C002
  3   1          CUSTOMER        CUSTOMER        C003

This is what I was trying for Bulk Insertion but it doesnt seem to be working with my current version of Elastic Search 7.12.0 ,

     package com.javadeveloperzone;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;

import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

public class ESBulkIndexingExample {

    String indexName, indexTypeName;
    TransportClient client = null;

    public static void main(String[] args) {
        ESBulkIndexingExample esExample = new ESBulkIndexingExample();
        try {
            esExample.initEStransportClinet();
            System.out.println("init done");
            esExample.CSVbulkImport(true);
            System.out.println("bulkimport done");
            esExample.refreshIndices();

            esExample.search();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            esExample.closeTransportClient(); // close transport client
        }
    }

    public ESBulkIndexingExample() {
        indexName = "document";
        indexTypeName = "bulkindexing";
    }

    public boolean initEStransportClinet() {
        try {
            client = new PreBuiltTransportClient(Settings.EMPTY)
                    .addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));

            return true;
        } catch (Exception ex) {
            ex.printStackTrace();
            return false;
        }
    }

    public void CSVbulkImport(boolean isHeaderIncluded) throws IOException, ExecutionException, InterruptedException {

        BulkRequestBuilder bulkRequest = client.prepareBulk();

        File file = new File(
                "/home/niteshb/Documents/workspace-spring-tool-suite-4-4.10.0.RELEASE/ElasticSearchService/src/main/resources/elasticdata.csv");
        BufferedReader bufferedReader = new BufferedReader(new FileReader(file));

        String line = null;
        int count = 0, noOfBatch = 1;
        if (bufferedReader != null && isHeaderIncluded) {
            bufferedReader.readLine();
        }
        while ((line = bufferedReader.readLine()) != null) {

            if (line.trim().length() == 0) {
                continue;
            }
            String data[] = line.split(",");
            if (data.length == 3) {

                try {
                    XContentBuilder xContentBuilder = jsonBuilder().startObject().field("tenant_id", data[0])
                            .field("hierarchy_name", data[1]).field("attribute_name", data[2]).field("item_pk", data[3])
                            .endObject();

                    BulkRequestBuilder add = bulkRequest
                            .add(client.prepareIndex(indexName, indexTypeName, data[0]).setSource(xContentBuilder));

                    System.out.println(add);
                    if ((count + 1) % 500 == 0) {
                        count = 0;
                        addDocumentToESCluser(bulkRequest, noOfBatch, count);
                        noOfBatch++;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            } else {
                System.out.println("Invalid data : " + line);
            }
            count++;
        }
        bufferedReader.close();
        addDocumentToESCluser(bulkRequest, noOfBatch, count);

    }

    public void addDocumentToESCluser(BulkRequestBuilder bulkRequest, int noOfBatch, int count) {

        if (count == 0) {
            return;
        }
        BulkResponse bulkResponse = bulkRequest.execute().actionGet();
        if (bulkResponse.hasFailures()) {
            System.out.println("Bulk Indexing failed for Batch : " + noOfBatch);

            int numberOfDocFailed = 0;
            Iterator<BulkItemResponse> iterator = bulkResponse.iterator();
            while (iterator.hasNext()) {
                BulkItemResponse response = iterator.next();
                if (response.isFailed()) {
                    numberOfDocFailed++;
                }
            }
            System.out.println("Out of " + count + " documents, " + numberOfDocFailed + " documents failed");
            System.out.println(bulkResponse.buildFailureMessage());
        } else {
            System.out.println("Bulk Indexing Completed for batch : " + noOfBatch);
        }
    }

    public void refreshIndices() {
        client.admin().indices().prepareRefresh(indexName).get();
    }

    public void search() {

        SearchResponse response = client.prepareSearch(indexName).setTypes(indexTypeName).get();
        System.out.println("Total Hits : " + response.getHits().getTotalHits());
        System.out.println(response);
    }

    public void closeTransportClient() {
        if (client != null) {
            client.close();
        }
    }
}

getting here error as

org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: no requests added;
at org.elasticsearch.action.ValidateActions.addValidationError(ValidateActions.java:15)
at org.elasticsearch.action.bulk.BulkRequest.validate(BulkRequest.java:425)
at org.elasticsearch.action.TransportActionNodeProxy.execute(TransportActionNodeProxy.java:31)
at org.elasticsearch.client.transport.TransportProxyClient.lambda$execute$0(TransportProxyClient.java:44)

can someone help me with this ?


Solution

  • My data in csv file looks something like this :-

     id  profile_id  hier_name        attri_name     item
      1   1          CUSTOMER         CUSTOMER        C001
      2   1          CUSTOMER         CUSTOMER        C002
      3   1           CUSTOMER        CUSTOMER        C003
    

    The dependency which can be added is

    <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>transport</artifactId>
                <version>7.12.1</version>
            </dependency>
            
            <!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch -->
            <dependency>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch</artifactId>
                <version>7.12.1</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.elasticsearch.plugin/transport-netty4-client -->
            <dependency>
                <groupId>org.elasticsearch.plugin</groupId>
                <artifactId>transport-netty4-client</artifactId>
                <version>7.12.1</version>
            </dependency>
    
    
    
    
    import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
    
    import java.io.BufferedReader;
    import java.io.File;
    import java.io.FileReader;
    import java.io.IOException;
    import java.net.InetAddress;
    import java.net.UnknownHostException;
    import java.util.Iterator;
    import java.util.concurrent.ExecutionException;
    
    import org.elasticsearch.action.bulk.BulkItemResponse;
    import org.elasticsearch.action.bulk.BulkRequestBuilder;
    import org.elasticsearch.action.bulk.BulkResponse;
    import org.elasticsearch.client.transport.TransportClient;
    import org.elasticsearch.common.settings.Settings;
    import org.elasticsearch.common.transport.TransportAddress;
    import org.elasticsearch.common.xcontent.XContentBuilder;
    import org.elasticsearch.transport.client.PreBuiltTransportClient;
    import org.springframework.stereotype.Component;
    
    @Component
    public class ESBulkIndexing {
    
        String indexName, indexTypeName;
        TransportClient client = null;
    
        public ESBulkIndexing() {
    
            indexTypeName = "bulkindexing";
        }
    
        public boolean initEStransportClinet() throws UnknownHostException {
            client = new PreBuiltTransportClient(Settings.EMPTY)
                    .addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));
            return true;
        }
    
        public void CSVbulkImport(boolean isHeaderIncluded, String index, String filename)
                throws IOException, ExecutionException, InterruptedException {
            BulkRequestBuilder bulkRequest = client.prepareBulk();
    
            File file = new File(filename + "/elastic.csv");
            BufferedReader bufferedReader = new BufferedReader(new FileReader(file));
    
            String line = null;
            int count = 0, noOfBatch = 1;
            if (bufferedReader != null && isHeaderIncluded) {
                bufferedReader.readLine();
            }
            while ((line = bufferedReader.readLine()) != null) {
    
                if (line.trim().length() == 0) {
                    continue;
                }
                String data[] = line.split(",");
                try {
                    XContentBuilder xContentBuilder = jsonBuilder().startObject().field("tenant_id", data[1])
                            .field("hierarchy_name", data[2]).field("attribute_name", data[3]).field("item_pk", data[4])
                            .endObject();
                    bulkRequest.add(client.prepareIndex(index, indexTypeName, data[0]).setSource(xContentBuilder));
    
                    addDocumentToESCluser(bulkRequest, noOfBatch, count);
                    noOfBatch++;
                } catch (Exception e) {
                    e.printStackTrace();
                }
                addDocumentToESCluser(bulkRequest, noOfBatch, count);
                count++;
            }
            bufferedReader.close();
            addDocumentToESCluser(bulkRequest, noOfBatch, count);
    
        }
    
        public void addDocumentToESCluser(BulkRequestBuilder bulkRequest, int noOfBatch, int count) {
    
            if (count == 0) {
                return;
            }
            BulkResponse bulkResponse = bulkRequest.execute().actionGet();
            if (bulkResponse.hasFailures()) {
    
                int numberOfDocFailed = 0;
                Iterator<BulkItemResponse> iterator = bulkResponse.iterator();
    
                while (iterator.hasNext()) {
                    BulkItemResponse response = iterator.next();
                    if (response.isFailed()) {
                        numberOfDocFailed++;
                    }
                }
                System.out.println(bulkResponse.buildFailureMessage());
            } else {
                System.out.println("Bulk Indexing Completed for batch : " + noOfBatch);
            }
        }
    
        public void refreshIndices(String index) {
            client.admin().indices().prepareRefresh(index).get();
        }
    
        public void closeTransportClient() {
            if (client != null) {
                client.close();
            }
        }
    }