apache-nifi

Apache Nifi PutElasticsearchJson 2.0.0 id is missing


I am trying to sync MySQL 8.x data to Elastic Search 8.x using Nifi Flow. I am able to sync data as usual with IndexOperation type insert however if i run it again new docs are created eg with IndexOperation type upsert. My record does have id field sample json as below:

{
  "id" : 1,
  "name" : "Atelier graphique",
  "phone" : "40.32.2555",
  "address" : null,
  "city" : "Nantes",
  "state" : null
}

docker-compose as below:

name: apache-nifi-es-kibana
services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.13.4
    volumes:
      - ./es-data:/usr/share/elasticsearch/data/
    # If you want to expose these ports outside your dev PC,
    # remove the "127.0.0.1:" prefix
    ports:
      - 127.0.0.1:9200:9200
      - 127.0.0.1:9300:9300
    environment:
      - 'ES_JAVA_OPTS=-Xms256m -Xmx256m'
      - 'discovery.type=single-node'
      - 'xpack.security.enabled=false'
#      - 'log4j.rootLogger=ERROR, stdout'
    healthcheck:
      test: ['CMD', 'curl', '-f', 'http://localhost:9200/_cluster/health?wait_for_status=green&timeout=10s']
      interval: 5s
      timeout: 10s
      retries: 10
  kibana:
    depends_on:
      - elasticsearch
    image: docker.elastic.co/kibana/kibana:8.13.4
    ports:
      - 127.0.0.1:5601:5601
    environment:
      - ELASTICSEARCH_HOSTS=["http://elasticsearch:9200"]
#    volumes:
#      - ./kibana-config.yml:/usr/share/kibana/config/kibana.yml
  mysql-8:
    image: mysql:8.4.0
    # If you want to expose these ports outside your dev PC,
    # remove the "127.0.0.1:" prefix
    ports:
      - 127.0.0.1:3306:3306
    environment:
      - 'MYSQL_ROOT_PASSWORD=qwerty'
  nifi:
    depends_on:
      - elasticsearch
      - mysql-8
    image: apache/nifi:latest
    # If you want to expose these ports outside your dev PC,
    # remove the "127.0.0.1:" prefix
    ports:
      - 127.0.0.1:8443:8443
    environment:
      - 'SINGLE_USER_CREDENTIALS_USERNAME=admin'
      - 'SINGLE_USER_CREDENTIALS_PASSWORD=qwerty123456'
    volumes:
      - type: bind
        source: mysql-connector/
        target: /opt/nifi/drivers

Attached is the image ref of Nifi 2.0.0 Config

Nifi Image Reference


Solution

  • You need to extract the ID from the JSON message using the EvaluateJsonPath processor into an attribute(elastic-id), and then pass it to the PutElasticsearchJSON processor.

    EvaluateJsonPath:

    PutElasticsearchJSON: