Iam currently working on a big data project for sentiment analysis of twitter's trending topics. I followed the tutorial of cloudera and understood how to get tweets to Hadoop through flume.
http://blog.cloudera.com/blog/2012/09/analyzing-twitter-data-with-hadoop/
flume.conf:
# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'TwitterAgent' TwitterAgent.sources = Twitter TwitterAgent.channels = MemChannel TwitterAgent.sinks = HDFS TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource TwitterAgent.sources.Twitter.channels = MemChannel TwitterAgent.sources.Twitter.consumerKey = TwitterAgent.sources.Twitter.consumerSecret = TwitterAgent.sources.Twitter.accessToken = TwitterAgent.sources.Twitter.accessTokenSecret = TwitterAgent.sources.Twitter.keywords = hadoop, big data, analytics, bigdata, cloudera, data science, data scientiest, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing TwitterAgent.sinks.HDFS.channel = MemChannel TwitterAgent.sinks.HDFS.type = hdfs TwitterAgent.sinks.HDFS.hdfs.path = hdfs://hadoop1:8020/user/flume/tweets/%Y/%m/%d/%H/ TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000 TwitterAgent.sinks.HDFS.hdfs.rollSize = 0 TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000 TwitterAgent.channels.MemChannel.type = memory TwitterAgent.channels.MemChannel.capacity = 10000 TwitterAgent.channels.MemChannel.transactionCapacity = 100
Now to extend this to my application I need keywords sections in flume's configuration file to have trending topics, I figured out Java code to get trending topics, but I have a problem now I don't know, how to connect this code to the flume configuration file or how to make a new file with real-time trending topics added at the keywords section. I searched a lot online for this, as Iam a beginner in this field, it will be of great help if you provide some info or atleast some other alternative for this.
A very interesting problem..!
I agree with the comment made by @cricket_007 - editing the configuration without restarting the Flume agent is not achievable.
I won't be able to say much as I haven't seen your java code to get the keyword for trending topics. However, with the information you've supplied there is one alternative (or I should rather say a workaround) I could think of - but haven't tried it yet myself.
You could potentially modify the TwitterSource.java class like this:
public void configure(Context context) {
consumerKey = context.getString(TwitterSourceConstants.CONSUMER_KEY_KEY);
consumerSecret = context.getString(TwitterSourceConstants.CONSUMER_SECRET_KEY);
accessToken = context.getString(TwitterSourceConstants.ACCESS_TOKEN_KEY);
accessTokenSecret = context.getString(TwitterSourceConstants.ACCESS_TOKEN_SECRET_KEY);
//MODIFY THE FOLLOWING PORTION
String keywordString = context.getString(TwitterSourceConstants.KEYWORDS_KEY, "");
if (keywordString.trim().length() == 0) {
keywords = new String[0];
} else {
keywords = keywordString.split(",");
for (int i = 0; i < keywords.length; i++) {
keywords[i] = keywords[i].trim();
}
}
//UNTIL THIS POINT
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setOAuthConsumerKey(consumerKey);
cb.setOAuthConsumerSecret(consumerSecret);
cb.setOAuthAccessToken(accessToken);
cb.setOAuthAccessTokenSecret(accessTokenSecret);
cb.setJSONStoreEnabled(true);
cb.setIncludeEntitiesEnabled(true);
twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
}
I have put in the comment above, where you are initialising the keywordString variable - you could invoke your java code (I'm assuming that it is a method from where you can return a comma separated string of keywords) instead of extracting this from the context available in the flume.conf (just remove context.getString() part).
Along with that just remove the following statement from flume.conf:
TwitterAgent.sources.Twitter.keywords = hadoop, big data, analytics, bigdata, cloudera, data science, data scientiest, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing
I hope this helps.