Is there any way to index our CSV file into Elastic Search using Java , I was doing it using logstash earlier but now I need to code it in Java and run dynamically .. I tried with Index API but it doesn't work for my condition Can someone help me with that .. My CSV Data looks something like this this is just a sample I have this object in bulk ..
sample CSV Data is something like this ..
id profile_id hier_name attri_name item
1 1 CUSTOMER CUSTOMER C001
2 1 CUSTOMER CUSTOMER C002
3 1 CUSTOMER CUSTOMER C003
This is what I was trying for Bulk Insertion but it doesnt seem to be working with my current version of Elastic Search 7.12.0 ,
package com.javadeveloperzone;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
public class ESBulkIndexingExample {
String indexName, indexTypeName;
TransportClient client = null;
public static void main(String[] args) {
ESBulkIndexingExample esExample = new ESBulkIndexingExample();
try {
esExample.initEStransportClinet();
System.out.println("init done");
esExample.CSVbulkImport(true);
System.out.println("bulkimport done");
esExample.refreshIndices();
esExample.search();
} catch (Exception e) {
e.printStackTrace();
} finally {
esExample.closeTransportClient(); // close transport client
}
}
public ESBulkIndexingExample() {
indexName = "document";
indexTypeName = "bulkindexing";
}
public boolean initEStransportClinet() {
try {
client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));
return true;
} catch (Exception ex) {
ex.printStackTrace();
return false;
}
}
public void CSVbulkImport(boolean isHeaderIncluded) throws IOException, ExecutionException, InterruptedException {
BulkRequestBuilder bulkRequest = client.prepareBulk();
File file = new File(
"/home/niteshb/Documents/workspace-spring-tool-suite-4-4.10.0.RELEASE/ElasticSearchService/src/main/resources/elasticdata.csv");
BufferedReader bufferedReader = new BufferedReader(new FileReader(file));
String line = null;
int count = 0, noOfBatch = 1;
if (bufferedReader != null && isHeaderIncluded) {
bufferedReader.readLine();
}
while ((line = bufferedReader.readLine()) != null) {
if (line.trim().length() == 0) {
continue;
}
String data[] = line.split(",");
if (data.length == 3) {
try {
XContentBuilder xContentBuilder = jsonBuilder().startObject().field("tenant_id", data[0])
.field("hierarchy_name", data[1]).field("attribute_name", data[2]).field("item_pk", data[3])
.endObject();
BulkRequestBuilder add = bulkRequest
.add(client.prepareIndex(indexName, indexTypeName, data[0]).setSource(xContentBuilder));
System.out.println(add);
if ((count + 1) % 500 == 0) {
count = 0;
addDocumentToESCluser(bulkRequest, noOfBatch, count);
noOfBatch++;
}
} catch (Exception e) {
e.printStackTrace();
}
} else {
System.out.println("Invalid data : " + line);
}
count++;
}
bufferedReader.close();
addDocumentToESCluser(bulkRequest, noOfBatch, count);
}
public void addDocumentToESCluser(BulkRequestBuilder bulkRequest, int noOfBatch, int count) {
if (count == 0) {
return;
}
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
System.out.println("Bulk Indexing failed for Batch : " + noOfBatch);
int numberOfDocFailed = 0;
Iterator<BulkItemResponse> iterator = bulkResponse.iterator();
while (iterator.hasNext()) {
BulkItemResponse response = iterator.next();
if (response.isFailed()) {
numberOfDocFailed++;
}
}
System.out.println("Out of " + count + " documents, " + numberOfDocFailed + " documents failed");
System.out.println(bulkResponse.buildFailureMessage());
} else {
System.out.println("Bulk Indexing Completed for batch : " + noOfBatch);
}
}
public void refreshIndices() {
client.admin().indices().prepareRefresh(indexName).get();
}
public void search() {
SearchResponse response = client.prepareSearch(indexName).setTypes(indexTypeName).get();
System.out.println("Total Hits : " + response.getHits().getTotalHits());
System.out.println(response);
}
public void closeTransportClient() {
if (client != null) {
client.close();
}
}
}
getting here error as
org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: no requests added;
at org.elasticsearch.action.ValidateActions.addValidationError(ValidateActions.java:15)
at org.elasticsearch.action.bulk.BulkRequest.validate(BulkRequest.java:425)
at org.elasticsearch.action.TransportActionNodeProxy.execute(TransportActionNodeProxy.java:31)
at org.elasticsearch.client.transport.TransportProxyClient.lambda$execute$0(TransportProxyClient.java:44)
can someone help me with this ?
My data in csv file looks something like this :-
id profile_id hier_name attri_name item
1 1 CUSTOMER CUSTOMER C001
2 1 CUSTOMER CUSTOMER C002
3 1 CUSTOMER CUSTOMER C003
The dependency which can be added is
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>7.12.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.12.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.elasticsearch.plugin/transport-netty4-client -->
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>transport-netty4-client</artifactId>
<version>7.12.1</version>
</dependency>
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.springframework.stereotype.Component;
@Component
public class ESBulkIndexing {
String indexName, indexTypeName;
TransportClient client = null;
public ESBulkIndexing() {
indexTypeName = "bulkindexing";
}
public boolean initEStransportClinet() throws UnknownHostException {
client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));
return true;
}
public void CSVbulkImport(boolean isHeaderIncluded, String index, String filename)
throws IOException, ExecutionException, InterruptedException {
BulkRequestBuilder bulkRequest = client.prepareBulk();
File file = new File(filename + "/elastic.csv");
BufferedReader bufferedReader = new BufferedReader(new FileReader(file));
String line = null;
int count = 0, noOfBatch = 1;
if (bufferedReader != null && isHeaderIncluded) {
bufferedReader.readLine();
}
while ((line = bufferedReader.readLine()) != null) {
if (line.trim().length() == 0) {
continue;
}
String data[] = line.split(",");
try {
XContentBuilder xContentBuilder = jsonBuilder().startObject().field("tenant_id", data[1])
.field("hierarchy_name", data[2]).field("attribute_name", data[3]).field("item_pk", data[4])
.endObject();
bulkRequest.add(client.prepareIndex(index, indexTypeName, data[0]).setSource(xContentBuilder));
addDocumentToESCluser(bulkRequest, noOfBatch, count);
noOfBatch++;
} catch (Exception e) {
e.printStackTrace();
}
addDocumentToESCluser(bulkRequest, noOfBatch, count);
count++;
}
bufferedReader.close();
addDocumentToESCluser(bulkRequest, noOfBatch, count);
}
public void addDocumentToESCluser(BulkRequestBuilder bulkRequest, int noOfBatch, int count) {
if (count == 0) {
return;
}
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
int numberOfDocFailed = 0;
Iterator<BulkItemResponse> iterator = bulkResponse.iterator();
while (iterator.hasNext()) {
BulkItemResponse response = iterator.next();
if (response.isFailed()) {
numberOfDocFailed++;
}
}
System.out.println(bulkResponse.buildFailureMessage());
} else {
System.out.println("Bulk Indexing Completed for batch : " + noOfBatch);
}
}
public void refreshIndices(String index) {
client.admin().indices().prepareRefresh(index).get();
}
public void closeTransportClient() {
if (client != null) {
client.close();
}
}
}