55import twitter4j .*;
66import twitter4j .conf .*;
77import java .util .List ;
8- import backtype .storm .tuple .Fields ;
9- import backtype .storm .tuple .Values ;
8+ import org . apache .storm .tuple .Fields ;
9+ import org . apache .storm .tuple .Values ;
1010
11- import backtype .storm .task .OutputCollector ;
12- import backtype .storm .task .TopologyContext ;
13- import backtype .storm .topology .IRichBolt ;
14- import backtype .storm .topology .OutputFieldsDeclarer ;
15- import backtype .storm .tuple .Tuple ;
11+ import org . apache .storm .task .OutputCollector ;
12+ import org . apache .storm .task .TopologyContext ;
13+ import org . apache .storm .topology .IRichBolt ;
14+ import org . apache .storm .topology .OutputFieldsDeclarer ;
15+ import org . apache .storm .tuple .Tuple ;
1616
1717import java .io .FileOutputStream ;
1818import java .io .OutputStream ;
@@ -29,7 +29,7 @@ public class TwitterCleanerBolt implements IRichBolt {
2929 boolean useTopicSelector = false ;
3030 String language = new String ("en" );
3131 List <String > topics = Arrays .asList ("politics" ,"entertainment" ,
32- "world" ,"us" ,"business" ,"opinion" ,"tech" ,"science" ,"health" ,
32+ "world" ,"us" ,"business" ,"opinion" ,"tech" ,"science" ,"health" ,
3333 "sports" , "art" , "style" , "food" , "travel" );
3434
3535 /**
@@ -56,7 +56,7 @@ public void prepare(Map conf, TopologyContext context, OutputCollector collector
5656 public void execute (Tuple tuple ) {
5757
5858 Status tweet = (Status ) tuple .getValueByField ("tweet" );
59-
59+
6060 // return criterion
6161 if (!tweet .getLang ().equals (language ))
6262 return ;
@@ -71,15 +71,13 @@ public void execute(Tuple tuple) {
7171 txt = this .removeUrl (txt );
7272 txt = txt .replace ("\n " , "" );
7373 txt = txt .toLowerCase ();
74-
75-
74+
75+
7676 // extract hashtags
7777 String hasht = "\n hashtags: " ;
7878 boolean keep = false ;
7979 for (HashtagEntity hashtage : tweet .getHashtagEntities ()) {
80- this .collector .emit (new Values (hashtage .getText ()));
81-
82- // only select tweets that have hashtags/topics co-occurrence.
80+ // only select tweets that have hashtags/topics co-occurrence.
8381 if (this .useTopicSelector == true ){
8482
8583 for (String s :this .topics ) {
@@ -100,15 +98,17 @@ public void execute(Tuple tuple) {
10098
10199 //removes multiple whitespace, hashtag entries, and tag entries
102100 String finaltext = txt .replaceAll ("#[^\\ s]+" ,"" ).replaceAll ("@[^\\ s]+" ,"" ).replaceAll ("( )+" , " " );
103-
101+
104102 //remove characters we don't want
105- finaltext = preserveASCII (finaltext );
106-
103+ finaltext = preserveASCII (finaltext );
104+
105+ //emit onto the kafka bolt
106+ this .collector .emit (new Values (finaltext + hasht ));
107+
107108 finaltext = "\n \n text: " + finaltext ;
108-
109+
109110 if (finaltext .length ()<60 )
110111 return ;
111-
112112 try {
113113 oStream = new FileOutputStream (System .getProperty ("user.home" )+"/tweetnet/data/dump.txt" , true );
114114 oStream .write (finaltext .getBytes ());
@@ -118,7 +118,6 @@ public void execute(Tuple tuple) {
118118 // TODO Auto-generated catch block
119119 e .printStackTrace ();
120120 }
121-
122121 }
123122
124123 /**
@@ -134,7 +133,7 @@ public void cleanup() {
134133 **/
135134 @ Override
136135 public void declareOutputFields (OutputFieldsDeclarer declarer ) {
137- declarer .declare (new Fields ("postCleanedTweets " ));
136+ declarer .declare (new Fields ("message " ));
138137 }
139138
140139 /**
@@ -181,10 +180,10 @@ public static char[] removeChar( char[] original, int removeLocation) {
181180 public static String removeUrl (String tweet ) {
182181 try {
183182 String urlPattern = "((https?|ftp|gopher|telnet|file|Unsure|http|https):((//)|(\\ \\ ))+[\\ w\\ d:#@%/;$()~_?\\ +-=\\ \\ \\ .&]*)" ;
184-
183+
185184 Pattern p = Pattern .compile (urlPattern ,Pattern .CASE_INSENSITIVE );
186185 Matcher m = p .matcher (tweet );
187-
186+
188187 // re-assigns str while removing URLs
189188 int i = 0 ;
190189 while (m .find ()) {
@@ -199,4 +198,4 @@ public static String removeUrl(String tweet) {
199198 }
200199 }
201200
202- }
201+ }
0 commit comments