javaelasticsearchspring-data-elasticsearch

Spring Data Elasticsearch Bulk Index/Delete - Millions of Records


I'm using Spring Data Elasticsearch 4.2.5, we have a job that does ETL (extract, transform and load data) to a particular database table. I'm indexing this data using Elasticsearch while the job is running. The data will be in millions of records and more. Currently, I'm doing index on every iteration. I read that, using elasticsearch index on every iteration might take some time. I wanted to use something like bulk-index, but for that I need to add indexQuery object to List. Adding millions of records to list and doing bulk-index may bring memory issues.

I need to apply similar kind of process for deletion. When records are deleted based on some common ID, I need to delete related elastic documents and this will also be in millions and more.

Is there anyway to do indexing/deleting very fast for this requirement? Any help is much appreciated and correct me if my understanding is incorrect.

INDEXING

for (Map.Entry<Integer, ObjectDetails> key : objectDetailsHashMap.entrySet()) {
    indexDocument(elasticsearchOperations, key, oPath);
    // other code to insert data in db table...
 }

private void indexDocument(ElasticsearchOperations elasticsearchOperations,
                              Map.Entry<Integer, ObjectDetails> key, String oPath) {
    String docId = "" + key.getValue().getCatalogId() + key.getValue().getObjectId();

    byte[] nameBytes = key.getValue().getName();
    byte[] physicalNameBytes = key.getValue().getPhysicalName();
    byte[] definitionBytes =  key.getValue().getDefinition();
    byte[] commentBytes = key.getValue().getComment();

    IndexQuery indexQuery = new IndexQueryBuilder()
            .withId(docId)
            .withObject(new MetadataSearch(
                    key.getValue().getObjectId(),
                    key.getValue().getCatalogId(),
                    key.getValue().getParentId(),
                    key.getValue().getTypeCode(),
                    key.getValue().getStartVersion(),
                    key.getValue().getEndVersion(),
                    nameBytes != null ? new String(nameBytes, StandardCharsets.UTF_8) : "-",
                    physicalNameBytes != null ? new String(physicalNameBytes, StandardCharsets.UTF_8) : "-",
                    definitionBytes != null ? new String(definitionBytes, StandardCharsets.UTF_8) : "-",
                    commentBytes != null ? new String(commentBytes, StandardCharsets.UTF_8) : "-",
                    oPath
            ))
            .build();

    elasticsearchOperations.index(indexQuery, IndexCoordinates.of("portal_idx"));
}

DELETING

private void deleteElasticDocuments(String catalogId) {
    String queryText = martServerContext.getQueryCacheInstance().getQuery(QUERY_PORTAL_GET_OBJECTS_IN_PORTAL_BY_MODEL);
    MapSqlParameterSource mapSqlParameterSource = new MapSqlParameterSource();
    mapSqlParameterSource.addValue("cId", Integer.parseInt(catalogId));
    namedParameterJdbcTemplate.query(queryText, mapSqlParameterSource, (resultSet -> {
        int objectId = resultSet.getInt(O_ID);
        String docId = catalogId + objectId;
        elasticsearchOperations.delete(docId, IndexCoordinates.of("portal_idx"));
    }));
}

Solution

  • For adding the documents you could use bulk indexing for example by collecting the documents to index in a list/array or whatever and when a predefined size is reached - like 500 entries - then do a bulk insert of these.

    For deleting there is no bulk operation, but you could collect ids to delete in a list or array again with am maximum size and then use ElasticsearchOperations.idsQuery(List<String>) to create a query for these ids and pass this into the delete(query) method.

    Edit 29.09.2021:

    the idsQuery was just added in the 4.3 branch, it is simplemented like this (https://github.com/spring-projects/spring-data-elasticsearch/blob/main/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchRestTransportTemplate.java#L193-L200):

    @Override
    public Query idsQuery(List<String> ids) {
    
        Assert.notNull(ids, "ids must not be null");
    
        return new NativeSearchQueryBuilder().withQuery(QueryBuilders.idsQuery().addIds(ids.toArray(new String[] {})))
                .build();
    }