mysqlelasticsearchkibana-4fluentd

Push Mysql data to Elasticsearch using fluentd mysql-replicator Plugin


I am working on Fluentd, ElasticSearch and Kibana to push my mysql database data into elasticsearch I'm using plugin fluentd mysql-replicator plugin , But my fluntd throw following error:

  2016-06-08 15:43:56 +0530 [warn]: super was not called in #start: called it forcedly plugin=Fluent::MysqlReplicatorInput
  2016-06-08 15:43:56 +0530 [info]: listening dRuby uri="druby://127.0.0.1:24230" object="Engine"
  2016-06-08 15:43:56 +0530 [info]: listening fluent socket on 0.0.0.0:24224
  2016-06-08 15:43:57 +0530 [error]: mysql_replicator: missing primary_key. :tag=>replicator.livechat.chat_chennaibox.${event}.${primary_key} :primary_key=>chat_id
  2016-06-08 15:43:57 +0530 [error]: mysql_replicator: failed to execute query.
  2016-06-08 15:43:57 +0530 [error]: error: bad value for range
  2016-06-08 15:43:57 +0530 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-mysql-replicator-0.5.2/lib/fluent/plugin/in_mysql_replicator.rb:105:in `block in poll'

This is my configuration File for fluentd:

####
## Output descriptions:
##

# Treasure Data (http://www.treasure-data.com/) provides cloud based data
# analytics platform, which easily stores and processes data from td-agent.
# FREE plan is also provided.
# @see http://docs.fluentd.org/articles/http-to-td
#
# This section matches events whose tag is td.DATABASE.TABLE
#<match td.*.*>
#  type tdlog
#  apikey YOUR_API_KEY

#  auto_create_table
#  buffer_type file
#  buffer_path /var/log/td-agent/buffer/td

#  <secondary>
#    type file
#    path /var/log/td-agent/failed_records
#  </secondary>
#</match>

## match tag=debug.** and dump to console
#<match debug.**>
#  type stdout
#</match>

####
## Source descriptions:
##

## built-in TCP input
## @see http://docs.fluentd.org/articles/in_forward
<source>
  type forward
</source>

## built-in UNIX socket input
#<source>
#  type unix
#</source>

# HTTP input
# POST http://localhost:8888/<tag>?json=<json>
# POST http://localhost:8888/td.myapp.login?json={"user"%3A"me"}
# @see http://docs.fluentd.org/articles/in_http
<source>
  type http
  port 8888
</source>

## live debugging agent
<source>
  type debug_agent
  bind 127.0.0.1
  port 24230
</source>

####
## Examples:
##

## File input
## read apache logs continuously and tags td.apache.access
#<source>
#  type tail
#  format apache
#  path /var/log/httpdaccess.log
#  tag td.apache.access
#</source>

## File output
## match tag=local.** and write to file
#<match local.**>
#  type file
#  path /var/log/td-agent/apache.log
#</match>

## Forwarding
## match tag=system.** and forward to another td-agent server
#<match system.**>
#  type forward
#  host 192.168.0.11
#  # secondary host is optional
#  <secondary>
#    host 192.168.0.12
#  </secondary>
#</match>

## Multiple output
## match tag=td.*.* and output to Treasure Data AND file
#<match td.*.*>
#  type copy
#  <store>
#    type tdlog
#    apikey API_KEY
#    auto_create_table
#    buffer_type file
#    buffer_path /var/log/td-agent/buffer/td
#  </store>
#  <store>
#    type file
#    path /var/log/td-agent/td-%Y-%m-%d/%H.log
#  </store>
#</match>
#<source>
#  @type tail
#  format apache
#  tag apache.access
#  path /var/log/td-agent/apache_log/ssl_access_log.1
#  read_from_head true
# pos_file /var/log/httpd/access_log.pos
#</source>
#<match apache.access*>
#  type stdout
#</match>

#<source>
#  @type tail
#  format magento_system 
#  tag magento.access
#  path /var/log/td-agent/Magento_log/system.log
#  pos_file /tmp/fluentd_magento_system.pos
#  read_from_head true 
#</source>

#<match apache.access
#  type stdout
#</match>
#<source>
#  @type http
#  port 8080
#  bind localhost
#  body_size_limit 32m
#  keepalive_timeout 10s
#</source>
#<match magento.access*>
# type stdout
#</match>
#<match magento.access*>
#  @type elasticsearch
#  logstash_format true
#  host localhost
# port 9200
#</match>
<source>
  type mysql_replicator
  host 127.0.0.1
  username root
  password gworks.mobi2
  database livechat
  query select chat_name from chat_chennaibox;
  #query SELECT t2.firstname,t2.lastname, t1.* FROM status t1 INNER JOIN student_detail t2 ON t1.number = t2.number;
  primary_key chat_id # specify unique key (default: id)
  interval 10s  # execute query interval (default: 1m)
  enable_delete yes
  tag replicator.livechat.chat_chennaibox.${event}.${primary_key}
</source>
#<match replicator.**>
#type stdout
#</match>

<match replicator.**>
  type mysql_replicator_elasticsearch
  host localhost
  port 9200
  tag_format (?<index_name>[^\.]+)\.(?<type_name>[^\.]+)\.(?<event>[^\.]+)\.(?<primary_key>[^\.]+)$
  flush_interval 5s
  max_retry_wait 1800
  flush_at_shutdown yes
  buffer_type file
  buffer_path /var/log/td-agent/buffer/mysql_replicator_elasticsearch.*
</match>

If there anyone who faced the same issue please suggests to me How to solve this problem.


Solution

  • I solve This Problem. My Problem is Missing primary_key in Query.

    wrong Query:

      <source>
      type mysql_replicator
      host 127.0.0.1
      username root
      password xxxxxxx
      database livechat
      query select chat_name from chat_chennaibox;
      primary_key chat_id
      interval 10s  # execute query interval (default: 1m)
      enable_delete yes
      tag replicator.livechat.chat_chennaibox.${event}.${primary_key}
      </source>
    

    In this Query i Mentioned primary_key is chat_id But, doesn't mention in query so, I got Error in mysql_replicator: missing primary_key. :tag=>replicator.livechat.chat_chennaibox.${event}.${primary_key} :primary_key=>chat_id

    Right Query:

       <source>
        @type mysql_replicator
        host localhost
        username root
        password xxxxxxx
        database livechat
        query SELECT chat_id, chat_name FROM chat_chennaibox
        primary_key chat_id 
        interval 10s  
        enable_delete yes
        tag replicator.livechat.chat_chennaibox.${event}.${primary_key}
      </source>
    

    You need to Mention primary_key in Query Field

     query SELECT chat_id, chat_name FROM chat_chennaibox
    

    Its Working For me.