mavenelasticsearchapache-kafkaapache-storm

Why are my storm topology not acking when i send tuple to ElasticSearch


I'm new to using Storm, I've just started a Data Architect training course and it's in this context that I'm facing the problem that brings me to you today.

I'm receiving messages from kakfa via a KafkaSpout named CurrentPriceSpout. So far, everything is working. Then, in my CurrentPriceBolt, I re-issued a tuple so that my data is written in ElasticSearch using the EsCurrentPriceBolt . The problem is here. I can't write my data into ElasticSearch directly, it is only written when I delete my topology.

Is there a Storm parameter that can force the writing of tuples by retrieving acknowledgments?

I tried by adding the option ".addConfiguration(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5)", the tuples are well written in ElasticSearch but not acknowledged. So Storm rewrites them indefinitely.

Thanks for your help Thierry


Solution

  • I managed to find the answer to my problem. The main problem was that ES is not designed to ingest as little data as is generated in a study project. ES writes, by default, data in batches of 1000 entries. With this project, I generate one data every 30 seconds, or a batch of 1000 every 500 minutes (or 8h20).

    so I reviewed in detail the configuration of my topology and played with the following options:

    And now it goes like this:

    ...
    ...
    
    public class App 
    {
        ...    
        ...    
    
        public static void main( String[] args ) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException
        {
            ...
            ...
    
            StormTopology topology  = topologyBuilder.createTopology();                 // je crée ma topologie Storm
            String topologyName     = properties.getProperty("storm.topology.name");    // je nomme ma topologie
            StormSubmitter.submitTopology(topologyName, getTopologyConfig(properties), topology);               // je démarre ma topologie sur mon cluster storm
            System.out.println( "Topology on remote cluster : Started!" );              
        }
    
    
        private static Config getTopologyConfig(Properties properties)
        {
            Config stormConfig = new Config();
            stormConfig.put("topology.workers",                 Integer.parseInt(properties.getProperty("topology.workers")));
            stormConfig.put("topology.enable.message.timeouts", Boolean.parseBoolean(properties.getProperty("topology.enable.message.timeouts")));
            stormConfig.put("topology.message.timeout.secs",    Integer.parseInt(properties.getProperty("topology.message.timeout.secs")));
            stormConfig.put("topology.transfer.batch.size",     Integer.parseInt(properties.getProperty("topology.transfer.batch.size")));
            stormConfig.put("topology.producer.batch.size",     Integer.parseInt(properties.getProperty("topology.producer.batch.size")));      
            return stormConfig;
        }
    
        ...    
        ...    
        ...    
    }
    

    And now it works!!!