I am trying to sync MySQL 8.x data to Elastic Search 8.x using Nifi Flow.
I am able to sync data as usual with IndexOperation
type insert
however if i run it again new docs are created eg with IndexOperation
type upsert
. My record does have id field sample json as below:
{
"id" : 1,
"name" : "Atelier graphique",
"phone" : "40.32.2555",
"address" : null,
"city" : "Nantes",
"state" : null
}
docker-compose as below:
name: apache-nifi-es-kibana
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.13.4
volumes:
- ./es-data:/usr/share/elasticsearch/data/
# If you want to expose these ports outside your dev PC,
# remove the "127.0.0.1:" prefix
ports:
- 127.0.0.1:9200:9200
- 127.0.0.1:9300:9300
environment:
- 'ES_JAVA_OPTS=-Xms256m -Xmx256m'
- 'discovery.type=single-node'
- 'xpack.security.enabled=false'
# - 'log4j.rootLogger=ERROR, stdout'
healthcheck:
test: ['CMD', 'curl', '-f', 'http://localhost:9200/_cluster/health?wait_for_status=green&timeout=10s']
interval: 5s
timeout: 10s
retries: 10
kibana:
depends_on:
- elasticsearch
image: docker.elastic.co/kibana/kibana:8.13.4
ports:
- 127.0.0.1:5601:5601
environment:
- ELASTICSEARCH_HOSTS=["http://elasticsearch:9200"]
# volumes:
# - ./kibana-config.yml:/usr/share/kibana/config/kibana.yml
mysql-8:
image: mysql:8.4.0
# If you want to expose these ports outside your dev PC,
# remove the "127.0.0.1:" prefix
ports:
- 127.0.0.1:3306:3306
environment:
- 'MYSQL_ROOT_PASSWORD=qwerty'
nifi:
depends_on:
- elasticsearch
- mysql-8
image: apache/nifi:latest
# If you want to expose these ports outside your dev PC,
# remove the "127.0.0.1:" prefix
ports:
- 127.0.0.1:8443:8443
environment:
- 'SINGLE_USER_CREDENTIALS_USERNAME=admin'
- 'SINGLE_USER_CREDENTIALS_PASSWORD=qwerty123456'
volumes:
- type: bind
source: mysql-connector/
target: /opt/nifi/drivers
Attached is the image ref of Nifi 2.0.0 Config
You need to extract the ID from the JSON message using the EvaluateJsonPath
processor into an attribute(elastic-id
), and then pass it to the PutElasticsearchJSON
processor.
EvaluateJsonPath:
Destination
: flowfile-attribue
elastic-id
(dynamic property): $.id
PutElasticsearchJSON:
Identifier Attribute
: elastic-id