2

Iam currently working on a big data project for sentiment analysis of twitter's trending topics. I followed the tutorial of cloudera and understood how to get tweets to Hadoop through flume.

http://blog.cloudera.com/blog/2012/09/analyzing-twitter-data-with-hadoop/

flume.conf:

# Licensed to the Apache Software Foundation (ASF) under one

# or more contributor license agreements. See the NOTICE file

# distributed with this work for additional information

# regarding copyright ownership. The ASF licenses this file

# to you under the Apache License, Version 2.0 (the

# "License"); you may not use this file except in compliance

# with the License. You may obtain a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing,

# software distributed under the License is distributed on an

# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

# KIND, either express or implied. See the License for the

# specific language governing permissions and limitations

# under the License.



# The configuration file needs to define the sources, 

# the channels and the sinks.

# Sources, channels and sinks are defined per agent, 

# in this case called 'TwitterAgent'


TwitterAgent.sources = Twitter

TwitterAgent.channels = MemChannel

TwitterAgent.sinks = HDFS


TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource

TwitterAgent.sources.Twitter.channels = MemChannel

TwitterAgent.sources.Twitter.consumerKey = 

TwitterAgent.sources.Twitter.consumerSecret = 

TwitterAgent.sources.Twitter.accessToken =  

TwitterAgent.sources.Twitter.accessTokenSecret =  

TwitterAgent.sources.Twitter.keywords = hadoop, big data, analytics, bigdata, cloudera, data science, data scientiest, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing


TwitterAgent.sinks.HDFS.channel = MemChannel

TwitterAgent.sinks.HDFS.type = hdfs

TwitterAgent.sinks.HDFS.hdfs.path = hdfs://hadoop1:8020/user/flume/tweets/%Y/%m/%d/%H/

TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream

TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text

TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000

TwitterAgent.sinks.HDFS.hdfs.rollSize = 0

TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000


TwitterAgent.channels.MemChannel.type = memory

TwitterAgent.channels.MemChannel.capacity = 10000

TwitterAgent.channels.MemChannel.transactionCapacity = 100

Now to extend this to my application I need keywords sections in flume's configuration file to have trending topics, I figured out Java code to get trending topics, but I have a problem now I don't know, how to connect this code to the flume configuration file or how to make a new file with real-time trending topics added at the keywords section. I searched a lot online for this, as Iam a beginner in this field, it will be of great help if you provide some info or atleast some other alternative for this.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245

1 Answers1

1

A very interesting problem..!

I agree with the comment made by @cricket_007 - editing the configuration without restarting the Flume agent is not achievable.

I won't be able to say much as I haven't seen your java code to get the keyword for trending topics. However, with the information you've supplied there is one alternative (or I should rather say a workaround) I could think of - but haven't tried it yet myself.

You could potentially modify the TwitterSource.java class like this:

public void configure(Context context) {
consumerKey = context.getString(TwitterSourceConstants.CONSUMER_KEY_KEY);
consumerSecret = context.getString(TwitterSourceConstants.CONSUMER_SECRET_KEY);
accessToken = context.getString(TwitterSourceConstants.ACCESS_TOKEN_KEY);
accessTokenSecret = context.getString(TwitterSourceConstants.ACCESS_TOKEN_SECRET_KEY);

//MODIFY THE FOLLOWING PORTION
String keywordString = context.getString(TwitterSourceConstants.KEYWORDS_KEY, "");
if (keywordString.trim().length() == 0) {
    keywords = new String[0];
} else {
  keywords = keywordString.split(",");
  for (int i = 0; i < keywords.length; i++) {
    keywords[i] = keywords[i].trim();
  }
}
//UNTIL THIS POINT

ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setOAuthConsumerKey(consumerKey);
cb.setOAuthConsumerSecret(consumerSecret);
cb.setOAuthAccessToken(accessToken);
cb.setOAuthAccessTokenSecret(accessTokenSecret);
cb.setJSONStoreEnabled(true);
cb.setIncludeEntitiesEnabled(true);

twitterStream = new TwitterStreamFactory(cb.build()).getInstance(); 
}

I have put in the comment above, where you are initialising the keywordString variable - you could invoke your java code (I'm assuming that it is a method from where you can return a comma separated string of keywords) instead of extracting this from the context available in the flume.conf (just remove context.getString() part).

Along with that just remove the following statement from flume.conf:

TwitterAgent.sources.Twitter.keywords = hadoop, big data, analytics, bigdata, cloudera, data science, data scientiest, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing

I hope this helps.

Lalit
  • 1,944
  • 12
  • 20
  • I tried this by following this code to get trends https://milindjagre.wordpress.com/2016/10/19/top-10-twitter-trending-topics-using-java-twitter4j-api/ but failed to build the package as the API getTrends() may throw TwitterException, which has to be handled, but it can't be handled in configurable () function as it is an overriding function. To get trends, I have to somehow handle the exceptiops, but due to overriding I can't. – Mohammed Zubair Khan Apr 05 '18 at 03:13
  • Ahh Ok. I didn't think of this. But is your build failing even if you use try-catch block or just on throws declaration? I just tried an experiment with my own code and added a dummy try-catch block and I was able to build the package. Just to add another note here that I am using the doConfigure() method which is in AbstractEventDrivenSource as opposed to the configure() method in Configurable interface. – Lalit Apr 05 '18 at 11:30
  • Thanks, it worked by adding a try catch block and Iam able to get Jason of tweets of trending topics in the Hadoop file system. Now I want to carry out sentimental analysis on this tweets, do you have suggestions for that. – Mohammed Zubair Khan Apr 05 '18 at 19:24
  • I'm glad it worked for you. :) On Sentiment analysis however, I haven't really tried that myself yet but I observed that there is quite a lot of references available on this subject. Since you're doing this in Java, here's one API - https://www.lexalytics.com/support that offers a Java SDK for this purpose. So, I would recommend you to just make a start and post up another question if you run into any issues. The community here is very helpful and you'll certainly manage to get through it. – Lalit Apr 05 '18 at 22:59