amazon-web-serviceselasticsearchaws-lambdaamazon-kinesis-firehose

How can AWS Kinesis Firehose lambda send update and delete requests to ElasticSearch?


I'm not seeing how an AWS Kinesis Firehose lambda can send update and delete requests to ElasticSearch (AWS OpenSearch service).

Elasticsearch document APIs provides for CRUD operations: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs.html

The examples I've found deals with the Create case, but doesn't show how to do delete or update requests. https://aws.amazon.com/blogs/big-data/ingest-streaming-data-into-amazon-elasticsearch-service-within-the-privacy-of-your-vpc-with-amazon-kinesis-data-firehose/ https://github.com/amazon-archives/serverless-app-examples/blob/master/python/kinesis-firehose-process-record-python/lambda_function.py

The output format in the examples do not show a way to specify create, update or delete requests:

       output_record = {
           'recordId': record['recordId'],
           'result': 'Ok',
           'data': base64.b64encode(payload)
       }

Apart from the examples, I'm not finding the definition of the output format for what the kinesis firehose lambda handler should return.


Solution

  • Firehose uses lambda function to transform records before they are being delivered to the destination in your case OpenSearch(ES) so they are only used to modify the structure of the data but can't be used to influence CRUD actions. Firehose can only insert records into a specific index. If you need a simple option to remove records from ES index after a certain period of time have a look at "Index rotation" option when specifying destination for your Firehose stream.

    If you want to use CRUD actions with ES and keep using Firehose I would suggest to send records to S3 bucket in the raw format and then trigger a lambda function on object upload event that will perform a CRUD action depending on fields in your payload.

    A good example of performing CRUD actions against ES from lambda https://github.com/chankh/ddb-elasticsearch/blob/master/src/lambda_function.py

    This particular example is built to send data from DynamoDB streams into ES but it should be a good starting point for you