apache-stormreal-time-datareal-time-systems

(Twitter) Storm's Window On Aggregation


I'm playing around with Storm, and I'm wondering where Storm specifies (if possible) the (tumbling/sliding) window size upon an aggregation. E.g. If we want to find the trending topics for the previous hour on Twitter. How do we specify that a bolt should return results for every hour? Is this done programatically inside each bolt? Or is it some way to specify a "window" ?


Solution

  • Disclaimer: I wrote the Trending Topics with Storm article referenced by gakhov in his answer above.

    I'd say the best practice is to use the so-called tick tuples in Storm 0.8+. With these you can configure your own spouts/bolts to be notified at certain time intervals (say, every ten seconds or every minute).

    Here's a simple example that configures the component in question to receive tick tuples every ten seconds:

    // in your spout/bolt
    @Override
    public Map<String, Object> getComponentConfiguration() {
        Config conf = new Config();
        int tickFrequencyInSeconds = 10;
        conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickFrequencyInSeconds);
        return conf;
    }
    

    You can then use a conditional switch in your spout/bolt's execute() method to distinguish "normal" incoming tuples from the special tick tuples. For instance:

    // in your spout/bolt
    @Override
    public void execute(Tuple tuple) {
        if (isTickTuple(tuple)) {
            // now you can trigger e.g. a periodic activity
        }
        else {
            // do something with the normal tuple
        }
    }
    
    private static boolean isTickTuple(Tuple tuple) {
        return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
            && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
    }
    

    Again, I wrote a pretty detailed blog post about doing this in Storm a few days ago as gakhov pointed out (shameless plug!).