javaelasticsearchelasticsearch-java-apielasticsearch-java-api-client

Bulk insert in elasticsearch using elasticsearch-java


I'm trying to bulk insert documents in elasticsearch using java with (elasticsearch-java).

I checked official documentation of elasticsearch-java and found information around bulk indexing in index, and i'm able to insert bulk data in index. https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/current/indexing-bulk.html

List<Product> products = fetchProducts();

BulkRequest.Builder br = new BulkRequest.Builder();

for (Product product : products) {
    br.operations(op -> op           
        .index(idx -> idx            
            .index("products")       
            .id(product.getSku())
            .document(product)
        )
    );
}

BulkResponse result = esClient.bulk(br.build());

// Log errors, if any
if (result.errors()) {
    logger.error("Bulk had errors");
    for (BulkResponseItem item: result.items()) {
        if (item.error() != null) {
            logger.error(item.error().reason());
        }
    }
}

But when I'm trying to insert data in datastream using this code i'm getting below error: only write ops with an op type of create are allowed in data-streams

class Product
{
   String name;
   Map<String, Object> info;
}

How can we bulk insert data from java object.

Thanks in advance.

I tried below solution, but getting error while inserting in data stream

List<Product> products = fetchProducts();

BulkRequest.Builder br = new BulkRequest.Builder();

for (Product product : products) {
    br.operations(op -> op           
        .index(idx -> idx            
            .index("products")       
            .id(product.getSku())
            .document(product)
        )
    );
}

BulkResponse result = esClient.bulk(br.build());

// Log errors, if any
if (result.errors()) {
    logger.error("Bulk had errors");
    for (BulkResponseItem item: result.items()) {
        if (item.error() != null) {
            logger.error(item.error().reason());
        }
    }
}

Solution

  • Could you try replacing index with create like this?

    br.operations(op -> op           
            .create(c -> c            
                .index("products")       
                .id(product.getSku())
                .document(product)
            )
        );
    

    Reading the Bulk API doc about data streams, index does update existing documents if they exist, and it's probably why you can't use that. Using create should create new indexes, as expected of data streams.