elasticsearchconfluent-cloud

How can I filter data in ElasticSearch Sink Connector?


I am consuming data from the confluent audit log cluster and creating an index into the elastic search using the elastic search sink connector. I'm getting data of all the environments of my internal clusters. I want to apply the filters on the elastic sink connector level so I can filter my data based on the cluster-ID which I provide at the connector level.

Is anyone know how can I apply the filter on the elastic search sink connector?

Thank you!


Solution

  • Finally, I figured out the answer to my question as below. You can use the transforms to filter your data and apply regex based on your data.

    "transforms": "dataFilterByClusterId",
    "transforms.dataFilterByClusterId.type": "io.confluent.connect.transforms.Filter$Value",
    "transforms.dataFilterByClusterId.filter.condition": "$[?(@.subject =~ /.*YourClusterID/)]",
    "transforms.dataFilterByClusterId.filter.type": "include"