hadoopflumesentiment-analysisflume-twitter

How to change configuration file of Apache flume through Java code?


Iam currently working on a big data project for sentiment analysis of twitter's trending topics. I followed the tutorial of cloudera and understood how to get tweets to Hadoop through flume.

http://blog.cloudera.com/blog/2012/09/analyzing-twitter-data-with-hadoop/

flume.conf:

# Licensed to the Apache Software Foundation (ASF) under one

# or more contributor license agreements. See the NOTICE file

# distributed with this work for additional information

# regarding copyright ownership. The ASF licenses this file

# to you under the Apache License, Version 2.0 (the

# "License"); you may not use this file except in compliance

# with the License. You may obtain a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing,

# software distributed under the License is distributed on an

# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

# KIND, either express or implied. See the License for the

# specific language governing permissions and limitations

# under the License.



# The configuration file needs to define the sources, 

# the channels and the sinks.

# Sources, channels and sinks are defined per agent, 

# in this case called 'TwitterAgent'


TwitterAgent.sources = Twitter

TwitterAgent.channels = MemChannel

TwitterAgent.sinks = HDFS


TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource

TwitterAgent.sources.Twitter.channels = MemChannel

TwitterAgent.sources.Twitter.consumerKey = 

TwitterAgent.sources.Twitter.consumerSecret = 

TwitterAgent.sources.Twitter.accessToken =  

TwitterAgent.sources.Twitter.accessTokenSecret =  

TwitterAgent.sources.Twitter.keywords = hadoop, big data, analytics, bigdata, cloudera, data science, data scientiest, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing


TwitterAgent.sinks.HDFS.channel = MemChannel

TwitterAgent.sinks.HDFS.type = hdfs

TwitterAgent.sinks.HDFS.hdfs.path = hdfs://hadoop1:8020/user/flume/tweets/%Y/%m/%d/%H/

TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream

TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text

TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000

TwitterAgent.sinks.HDFS.hdfs.rollSize = 0

TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000


TwitterAgent.channels.MemChannel.type = memory

TwitterAgent.channels.MemChannel.capacity = 10000

TwitterAgent.channels.MemChannel.transactionCapacity = 100

Now to extend this to my application I need keywords sections in flume's configuration file to have trending topics, I figured out Java code to get trending topics, but I have a problem now I don't know, how to connect this code to the flume configuration file or how to make a new file with real-time trending topics added at the keywords section. I searched a lot online for this, as Iam a beginner in this field, it will be of great help if you provide some info or atleast some other alternative for this.


Solution

  • A very interesting problem..!

    I agree with the comment made by @cricket_007 - editing the configuration without restarting the Flume agent is not achievable.

    I won't be able to say much as I haven't seen your java code to get the keyword for trending topics. However, with the information you've supplied there is one alternative (or I should rather say a workaround) I could think of - but haven't tried it yet myself.

    You could potentially modify the TwitterSource.java class like this:

    public void configure(Context context) {
    consumerKey = context.getString(TwitterSourceConstants.CONSUMER_KEY_KEY);
    consumerSecret = context.getString(TwitterSourceConstants.CONSUMER_SECRET_KEY);
    accessToken = context.getString(TwitterSourceConstants.ACCESS_TOKEN_KEY);
    accessTokenSecret = context.getString(TwitterSourceConstants.ACCESS_TOKEN_SECRET_KEY);
    
    //MODIFY THE FOLLOWING PORTION
    String keywordString = context.getString(TwitterSourceConstants.KEYWORDS_KEY, "");
    if (keywordString.trim().length() == 0) {
        keywords = new String[0];
    } else {
      keywords = keywordString.split(",");
      for (int i = 0; i < keywords.length; i++) {
        keywords[i] = keywords[i].trim();
      }
    }
    //UNTIL THIS POINT
    
    ConfigurationBuilder cb = new ConfigurationBuilder();
    cb.setOAuthConsumerKey(consumerKey);
    cb.setOAuthConsumerSecret(consumerSecret);
    cb.setOAuthAccessToken(accessToken);
    cb.setOAuthAccessTokenSecret(accessTokenSecret);
    cb.setJSONStoreEnabled(true);
    cb.setIncludeEntitiesEnabled(true);
    
    twitterStream = new TwitterStreamFactory(cb.build()).getInstance(); 
    }
    

    I have put in the comment above, where you are initialising the keywordString variable - you could invoke your java code (I'm assuming that it is a method from where you can return a comma separated string of keywords) instead of extracting this from the context available in the flume.conf (just remove context.getString() part).

    Along with that just remove the following statement from flume.conf:

    TwitterAgent.sources.Twitter.keywords = hadoop, big data, analytics, bigdata, cloudera, data science, data scientiest, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing
    

    I hope this helps.