Skip to content

Commit 7b8e2c4

Browse files
committed
Updated POM file to include Kafka dependencies
1 parent e28f9de commit 7b8e2c4

File tree

7 files changed

+254
-72
lines changed

7 files changed

+254
-72
lines changed

scripts/startNimbus.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
#!/bin/bash
2-
sudo ~/apache-storm-0.9.5/bin/storm nimbus
2+
sudo ~/apache-storm-1.0.3/bin/storm nimbus

scripts/startStormUI.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
#!/bin/bash
2-
sudo ~/apache-storm-0.9.5/bin/storm ui
2+
sudo ~/apache-storm-1.0.3/bin/storm ui

scripts/startSupervisor.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
#!/bin/bash
2-
sudo ~/apache-storm-0.9.5/bin/storm supervisor
2+
sudo ~/apache-storm-1.0.3/bin/storm supervisor

src/storm/TwitterCleanerBolt.java

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@
55
import twitter4j.*;
66
import twitter4j.conf.*;
77
import java.util.List;
8-
import backtype.storm.tuple.Fields;
9-
import backtype.storm.tuple.Values;
8+
import org.apache.storm.tuple.Fields;
9+
import org.apache.storm.tuple.Values;
1010

11-
import backtype.storm.task.OutputCollector;
12-
import backtype.storm.task.TopologyContext;
13-
import backtype.storm.topology.IRichBolt;
14-
import backtype.storm.topology.OutputFieldsDeclarer;
15-
import backtype.storm.tuple.Tuple;
11+
import org.apache.storm.task.OutputCollector;
12+
import org.apache.storm.task.TopologyContext;
13+
import org.apache.storm.topology.IRichBolt;
14+
import org.apache.storm.topology.OutputFieldsDeclarer;
15+
import org.apache.storm.tuple.Tuple;
1616

1717
import java.io.FileOutputStream;
1818
import java.io.OutputStream;
@@ -29,7 +29,7 @@ public class TwitterCleanerBolt implements IRichBolt {
2929
boolean useTopicSelector = false;
3030
String language = new String("en");
3131
List<String> topics = Arrays.asList("politics","entertainment",
32-
"world","us","business","opinion","tech","science","health",
32+
"world","us","business","opinion","tech","science","health",
3333
"sports", "art", "style", "food", "travel");
3434

3535
/**
@@ -56,7 +56,7 @@ public void prepare(Map conf, TopologyContext context, OutputCollector collector
5656
public void execute(Tuple tuple) {
5757

5858
Status tweet = (Status) tuple.getValueByField("tweet");
59-
59+
6060
// return criterion
6161
if(!tweet.getLang().equals(language))
6262
return;
@@ -71,15 +71,13 @@ public void execute(Tuple tuple) {
7171
txt = this.removeUrl(txt);
7272
txt = txt.replace("\n", "");
7373
txt = txt.toLowerCase();
74-
75-
74+
75+
7676
// extract hashtags
7777
String hasht = "\nhashtags: ";
7878
boolean keep = false;
7979
for(HashtagEntity hashtage : tweet.getHashtagEntities()) {
80-
this.collector.emit(new Values(hashtage.getText()));
81-
82-
// only select tweets that have hashtags/topics co-occurrence.
80+
// only select tweets that have hashtags/topics co-occurrence.
8381
if(this.useTopicSelector == true){
8482

8583
for(String s:this.topics) {
@@ -100,15 +98,17 @@ public void execute(Tuple tuple) {
10098

10199
//removes multiple whitespace, hashtag entries, and tag entries
102100
String finaltext = txt.replaceAll("#[^\\s]+","").replaceAll("@[^\\s]+","").replaceAll("( )+", " ");
103-
101+
104102
//remove characters we don't want
105-
finaltext = preserveASCII(finaltext);
106-
103+
finaltext = preserveASCII(finaltext);
104+
105+
//emit onto the kafka bolt
106+
this.collector.emit(new Values(finaltext + hasht));
107+
107108
finaltext = "\n\ntext: " + finaltext;
108-
109+
109110
if(finaltext.length()<60)
110111
return;
111-
112112
try {
113113
oStream = new FileOutputStream(System.getProperty("user.home")+"/tweetnet/data/dump.txt", true);
114114
oStream.write(finaltext.getBytes());
@@ -118,7 +118,6 @@ public void execute(Tuple tuple) {
118118
// TODO Auto-generated catch block
119119
e.printStackTrace();
120120
}
121-
122121
}
123122

124123
/**
@@ -134,7 +133,7 @@ public void cleanup() {
134133
**/
135134
@Override
136135
public void declareOutputFields(OutputFieldsDeclarer declarer) {
137-
declarer.declare(new Fields("postCleanedTweets"));
136+
declarer.declare(new Fields("message"));
138137
}
139138

140139
/**
@@ -181,10 +180,10 @@ public static char[] removeChar( char[] original, int removeLocation) {
181180
public static String removeUrl(String tweet) {
182181
try{
183182
String urlPattern = "((https?|ftp|gopher|telnet|file|Unsure|http|https):((//)|(\\\\))+[\\w\\d:#@%/;$()~_?\\+-=\\\\\\.&]*)";
184-
183+
185184
Pattern p = Pattern.compile(urlPattern,Pattern.CASE_INSENSITIVE);
186185
Matcher m = p.matcher(tweet);
187-
186+
188187
// re-assigns str while removing URLs
189188
int i = 0;
190189
while (m.find()) {
@@ -199,4 +198,4 @@ public static String removeUrl(String tweet) {
199198
}
200199
}
201200

202-
}
201+
}

src/storm/TwitterStorm.java

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11

22
import java.util.*;
3-
import backtype.storm.tuple.Fields;
4-
import backtype.storm.tuple.Values;
5-
import backtype.storm.Config;
6-
import backtype.storm.LocalCluster;
7-
import backtype.storm.topology.TopologyBuilder;
8-
3+
import org.apache.storm.tuple.Fields;
4+
import org.apache.storm.tuple.Values;
5+
import org.apache.storm.Config;
6+
import org.apache.storm.LocalCluster;
7+
import org.apache.storm.topology.TopologyBuilder;
8+
import org.apache.storm.kafka.bolt.KafkaBolt;
9+
import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
10+
import org.apache.storm.kafka.bolt.mapper.*;
911
/**
10-
* Main class for storm topology.
12+
* Main class for storm topology.
1113
*/
14+
15+
1216
public class TwitterStorm {
1317

1418
/**
@@ -29,19 +33,36 @@ public static void main(String[] args) throws Exception{
2933
String[] arguments = args.clone();
3034
String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length);
3135

32-
//create a new Storm configuration.
36+
//create a new Storm configuration.
3337
Config config = new Config();
3438
config.setDebug(true);
3539

36-
//create a new topology.
40+
//create a new topology.
3741
TopologyBuilder builder = new TopologyBuilder();
3842

39-
builder.setSpout("streamSpout", new TwitterStreamSpout(
40-
consumerKey,consumerSecret, accessToken, accessTokenSecret, keyWords));
43+
TwitterStreamSpout streamSpout = new TwitterStreamSpout(
44+
consumerKey,consumerSecret, accessToken, accessTokenSecret, keyWords);
45+
46+
// streamSpout.scheme = new SchemeAsMultiScheme(new KafkaBoltKeyValueScheme());
47+
builder.setSpout("streamSpout", streamSpout);
48+
49+
TwitterCleanerBolt cleanerBolt = new TwitterCleanerBolt();
50+
51+
builder.setBolt("cleanerBolt", cleanerBolt).shuffleGrouping("streamSpout");
52+
53+
Properties props = new Properties();
54+
props.put("bootstrap.servers", "localhost:9092");
55+
props.put("acks", "1");
56+
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
57+
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
58+
59+
KafkaBolt kafkaBolt = new KafkaBolt()
60+
.withProducerProperties(props)
61+
.withTopicSelector(new DefaultTopicSelector("twitterstorm"))
62+
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
63+
64+
builder.setBolt("forwardToKafka", kafkaBolt).shuffleGrouping("cleanerBolt");
4165

42-
builder.setBolt("cleanerBolt", new TwitterCleanerBolt())
43-
.shuffleGrouping("streamSpout");
44-
4566
//submit topology to local cluster.
4667
LocalCluster cluster = new LocalCluster();
4768
cluster.submitTopology("TwitterHashtagStorm", config,

src/storm/TwitterStreamSpout.java

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
1+
22
import java.util.Map;
33
import java.util.concurrent.LinkedBlockingQueue;
44

@@ -13,16 +13,16 @@
1313
import twitter4j.auth.AccessToken;
1414
import twitter4j.conf.ConfigurationBuilder;
1515

16-
import backtype.storm.Config;
17-
import backtype.storm.spout.SpoutOutputCollector;
16+
import org.apache.storm.Config;
17+
import org.apache.storm.spout.SpoutOutputCollector;
1818

19-
import backtype.storm.task.TopologyContext;
20-
import backtype.storm.topology.OutputFieldsDeclarer;
21-
import backtype.storm.topology.base.BaseRichSpout;
22-
import backtype.storm.tuple.Fields;
23-
import backtype.storm.tuple.Values;
19+
import org.apache.storm.task.TopologyContext;
20+
import org.apache.storm.topology.OutputFieldsDeclarer;
21+
import org.apache.storm.topology.base.BaseRichSpout;
22+
import org.apache.storm.tuple.Fields;
23+
import org.apache.storm.tuple.Values;
2424

25-
import backtype.storm.utils.Utils;
25+
import org.apache.storm.utils.Utils;
2626

2727
/**
2828
* this class talks directly the the twitterAPI using the user credentials
@@ -34,13 +34,13 @@ public class TwitterStreamSpout extends BaseRichSpout {
3434
SpoutOutputCollector _collector;
3535
LinkedBlockingQueue<Status> queue = null;
3636
TwitterStream _twitterStream;
37-
37+
3838
String consumerKey;
3939
String consumerSecret;
4040
String accessToken;
4141
String accessTokenSecret;
4242
String[] keyWords;
43-
43+
4444
/**
4545
* Constructor.
4646
* @param consumerKey Twitter API credential
@@ -58,14 +58,14 @@ public TwitterStreamSpout(String consumerKey, String consumerSecret,
5858
this.accessTokenSecret = accessTokenSecret;
5959
this.keyWords = keyWords;
6060
}
61-
61+
6262
/**
6363
* TO DO: default constructor is a stub.
6464
*/
6565
public TwitterStreamSpout() {
6666
// TODO Auto-generated constructor stub
6767
}
68-
68+
6969
/**
7070
* creates a new status blockingQueue and a statusListener.
7171
* @param conf Storm configuration
@@ -85,66 +85,66 @@ public void open(Map conf, TopologyContext context,
8585
public void onStatus(Status status) {
8686
queue.offer(status);
8787
}
88-
88+
8989
@Override
9090
public void onDeletionNotice(StatusDeletionNotice sdn) {}
91-
91+
9292
@Override
9393
public void onTrackLimitationNotice(int i) {}
94-
94+
9595
@Override
9696
public void onScrubGeo(long l, long l1) {}
97-
97+
9898
@Override
9999
public void onException(Exception ex) {}
100-
100+
101101
@Override
102102
public void onStallWarning(StallWarning arg0) {
103103
// TODO Auto-generated method stub
104104
}
105105
};
106-
107-
ConfigurationBuilder cb = new ConfigurationBuilder();
108-
106+
107+
ConfigurationBuilder cb = new ConfigurationBuilder();
108+
109109
cb.setDebugEnabled(true)
110110
.setOAuthConsumerKey(consumerKey)
111111
.setOAuthConsumerSecret(consumerSecret)
112112
.setOAuthAccessToken(accessToken)
113113
.setOAuthAccessTokenSecret(accessTokenSecret);
114-
114+
115115
_twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
116116
_twitterStream.addListener(listener);
117-
117+
118118
if (keyWords.length == 0) {
119119
_twitterStream.sample();
120120
}else {
121121
FilterQuery query = new FilterQuery().track(keyWords);
122122
_twitterStream.filter(query);
123123
}
124124
}
125-
125+
126126
/**
127127
* polls from the blocking queue to get next status.
128128
*/
129129
@Override
130130
public void nextTuple() {
131131
Status ret = queue.poll();
132-
132+
133133
if (ret == null) {
134134
Utils.sleep(50);
135135
} else {
136136
_collector.emit(new Values(ret));
137137
}
138138
}
139-
139+
140140
/**
141-
* closes twitter stream.
141+
* closes twitter stream.
142142
*/
143143
@Override
144144
public void close() {
145145
_twitterStream.shutdown();
146146
}
147-
147+
148148
/**
149149
* worker node configurator. Default set to 1 local machine.
150150
*/
@@ -154,13 +154,13 @@ public Map<String, Object> getComponentConfiguration() {
154154
ret.setMaxTaskParallelism(1);
155155
return ret;
156156
}
157-
157+
158158
@Override
159159
public void ack(Object id) {}
160-
160+
161161
@Override
162162
public void fail(Object id) {}
163-
163+
164164
/**
165165
* Declare output field type.
166166
*/

0 commit comments

Comments
 (0)