elasticsearchlogstashlogstash-configurationlogstash-filter

Injecting json string into different Elasticsearch indices using Logstash


I have these sample data stored in postgresql events table:

{"playhead":0,"metadata":"{\"class\":\"Broadway\\\\Domain\\\\Metadata\",\"payload\":{\"console\":{\"command\":\"Doctrine\\\\Bundle\\\\FixturesBundle\\\\Command\\\\LoadDataFixturesDoctrineCommand\",\"arguments\":\"'doctrine:fixtures:load' --env=prod -n\"}}}","@timestamp":"2020-05-13T18:03:24.295Z","payload":"{\"class\":\"OpenLoyalty\\\\Component\\\\Transaction\\\\Domain\\\\Event\\\\TransactionWasRegistered\",\"payload\":{\"transactionId\":\"91d83147-0280-481e-a984-c899e9720ec2\",\"transactionData\":{\"documentNumber\":\"not-matched\",\"purchasePlace\":\"wroclaw\",\"purchaseDate\":1582622706,\"documentType\":\"sell\"},\"customerData\":{\"name\":\"Jan Nowak\",\"email\":\"not_matched@example.com\",\"nip\":\"aaa\",\"phone\":\"+0954730076810\",\"loyaltyCardNumber\":\"not_matched_card_number\",\"address\":{\"street\":\"Ko\\u015bciuszki\",\"address1\":\"12\",\"city\":\"Warsaw\",\"country\":\"PL\",\"province\":\"Mazowieckie\",\"postal\":\"00-800\"}},\"items\":[{\"sku\":{\"code\":\"SKU1\"},\"name\":\"item 1\",\"quantity\":1,\"grossValue\":1,\"category\":\"aaa\",\"labels\":[{\"key\":\"test\",\"value\":\"label\"},{\"key\":\"test\",\"value\":\"label2\"}],\"maker\":\"sss\"},{\"sku\":{\"code\":\"SKU2\"},\"name\":\"item 2\",\"quantity\":2,\"grossValue\":2,\"category\":\"bbb\",\"labels\":[],\"maker\":\"ccc\"}],\"posId\":null,\"excludedDeliverySKUs\":null,\"excludedLevelSKUs\":null,\"excludedLevelCategories\":null,\"revisedDocument\":null,\"labels\":[]}}","@version":"1","id":1,"type":"OpenLoyalty.Component.Transaction.Domain.Event.TransactionWasRegistered","uuid":"91d83147-0280-481e-a984-c899e9720ec2","recorded_on":"2020-02-24T09:25:06.222986+00:00"}

 {"playhead":1,"metadata":"{\"class\":\"Broadway\\\\Domain\\\\Metadata\",\"payload\":{\"console\":{\"command\":\"Doctrine\\\\Bundle\\\\FixturesBundle\\\\Command\\\\LoadDataFixturesDoctrineCommand\",\"arguments\":\"'doctrine:fixtures:load' --env=prod -n\"}}}","@timestamp":"2020-05-13T18:03:24.430Z","payload":"{\"class\":\"OpenLoyalty\\\\Component\\\\Customer\\\\Domain\\\\Event\\\\AssignedAccountToCustomer\",\"payload\":{\"customerId\":\"00000000-0000-474c-b092-b0dd880c07aa\",\"accountId\":\"3c067099-486a-41a8-87ca-343305126c5e\"}}","@version":"1","id":80,"type":"OpenLoyalty.Component.Customer.Domain.Event.AssignedAccountToCustomer","uuid":"00000000-0000-474c-b092-b0dd880c07aa","recorded_on":"2020-02-24T09:25:20.012550+00:00"}

{"playhead":2,"metadata":"{\"class\":\"Broadway\\\\Domain\\\\Metadata\",\"payload\":{\"console\":{\"command\":\"Doctrine\\\\Bundle\\\\FixturesBundle\\\\Command\\\\LoadDataFixturesDoctrineCommand\",\"arguments\":\"'doctrine:fixtures:load' --env=prod -n\"}}}","@timestamp":"2020-05-13T18:03:24.430Z","payload":"{\"class\":\"OpenLoyalty\\\\Component\\\\Customer\\\\Domain\\\\Event\\\\CustomerWasMovedToLevel\",\"payload\":{\"customerId\":\"00000000-0000-474c-b092-b0dd880c07aa\",\"levelId\":\"e82c96cf-32a3-43bd-9034-4df343e50000\",\"oldLevelId\":null,\"updatedAt\":1582536320,\"manually\":false,\"removeLevelManually\":false}}","@version":"1","id":81,"type":"OpenLoyalty.Component.Customer.Domain.Event.CustomerWasMovedToLevel","uuid":"00000000-0000-474c-b092-b0dd880c07aa","recorded_on":"2020-02-24T09:25:20.014810+00:00"}

and many others with different type values. what I'm looking for is to insert the payloadof each these data into their appropriate index of elasticsearch. for instance data with type = OpenLoyalty.Component.Customer.Domain.Event.AssignedAccountToCustomer has to go into oloy.account_details index. and the index should be like:

    {
    "_index": "oloy.account_details",
    "_type": "OpenLoyalty\Component\Account\Domain\ReadModel\AccountDetails",
    "_id": "AXGjvhDQUCC0J4ATWiCP",
    "_version": 1,
    "_score": 1,
    "_source": {
    "accountId": "5a9bdb83-f7e8-442c-b06f-387e1b1e95a7",
    "customerId": "11111111-0000-474c-b092-b0dd880c07e1"
    }
  }

I wonder what configuration should I set for Logstash. so far I have this:

input {
    jdbc {
        jdbc_connection_string => "jdbc:postgresql://localhost:5432/openloyalty"
        jdbc_user => "openloyalty"
        jdbc_password => "openloyalty"
        jdbc_driver_library => "C:\logstash\postgresql-42.2.12.jar"
        jdbc_driver_class => "org.postgresql.Driver"
        statement => "SELECT * from events"
    }
}

output {
    elasticsearch {
            index => "???"
            document_type => "???"
            document_id => "???"
            hosts => ["localhost:9200"]
    }
}

Solution

  • Your answer is here:

    input {
        file{
            path => "C:/Users/Khatere Tajfar/Desktop/Log/application.log"
     start_position => "beginning"
            ignore_older => 0  
        }
    }
    filter {
        grok {
            match=>{"message"=>"%{GREEDYDATA:part1} INFO %{GREEDYDATA:part2}\{\"indexType\"\:\"%{GREEDYDATA:type}\"\,\"indexName\"\:\"%{GREEDYDATA:index}\"\,\"model\"\:\"%{GREEDYDATA:jsonpart}\"\,\"id\"\:\"%{GREEDYDATA:id}\"\}"}
    
        }
    
        mutate {
                 remove_field => [ "part1","part2","message","path","host","@timestamp","@version"]
             gsub  => ["jsonpart","[\\\\\\]+","X"]
             gsub  => ["jsonpart","[\\]","X"]
             gsub  => ["jsonpart","X",""]
            }
        json {
            source => "jsonpart"
        }
        mutate {
            remove_field => [ "jsonpart"]   
        }
         mutate { add_field => { "[@metadata][type]" => "%{type}" } }
             mutate { remove_field => ["type"] }
    
         mutate { add_field => { "[@metadata][index]" => "%{index}" } }
             mutate { remove_field => ["index"] }
    
         mutate { add_field => { "[@metadata][id]" => "%{id}" } }
             mutate { remove_field => ["id"] }
    
    }
    output {
        stdout { codec => json_lines} 
        elasticsearch {
                index =>  "%{[@metadata][index]}" 
                document_type =>  "%{[@metadata][type]}" 
                document_id => "%{[@metadata][id]}" 
                hosts => ["localhost:9200"]
        }
    }