Skip to content

Commit 3c1b628

Browse files
committed
add streaming
1 parent c4c572a commit 3c1b628

File tree

4 files changed

+234
-10
lines changed

4 files changed

+234
-10
lines changed

evaluator.py

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
# Perform batch process to generate a classfication model
2+
# Extract TF-IDF features using spark and then train naive bayes classifier to do classification
3+
4+
import logging
5+
import ConfigParser
6+
import pandas as pd
7+
import csv
8+
import atexit
9+
10+
from pyspark import SparkContext, SparkConf
11+
from pyspark.sql import SparkSession
12+
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, IDFModel
13+
from pyspark.ml.linalg import Vectors
14+
from pyspark.ml.classification import NaiveBayes, NaiveBayesModel
15+
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
16+
from pyspark.sql.functions import udf, col
17+
18+
19+
logging.basicConfig()
20+
logger=logging.getLogger('model_evaluate')
21+
logger.setLevel(logging.DEBUG)
22+
23+
config=ConfigParser.ConfigParser()
24+
config.read('model_generation.cfg')
25+
26+
master=config.get('spark','master')
27+
posts_file=config.get('io', 'post_file')
28+
tags_file=config.get('io', 'tags_file')
29+
selected_tags_file=config.get('io', 'selected_tags_file')
30+
31+
idf_model_file=config.get('io','idf_model_file')
32+
nb_model_file=config.get('io','nb_model_file')
33+
hashing_tf_file=config.get('io', 'hashing_tf_file')
34+
tokenizer_file=config.get('io', 'tokenizer_file')
35+
36+
def shutdown_hook(spark_session):
37+
try:
38+
spark_session.close()
39+
logger.debug("Successfully stop spark session and spark context")
40+
except:
41+
logger.debug("Fail to stop spark session and spark context")
42+
43+
if __name__ == '__main__':
44+
45+
# Try to initialize a spark cluster with master, master can be local or mesos URL, which is configurable in config file
46+
try:
47+
logger.debug("Initializing Spark cluster")
48+
conf=SparkConf().setAppName('model_generation').setMaster(master)
49+
sc=SparkContext(conf=conf)
50+
logger.debug("Created Spark cluster successfully")
51+
except:
52+
logger.error("Fail to initialize spark cluster")
53+
54+
try:
55+
spark=SparkSession.builder.config(conf=conf).getOrCreate()
56+
logger.debug("Initialized spark session successfully")
57+
except:
58+
logger.error("Fail to start spark session")
59+
60+
# Input the dataset
61+
try:
62+
logger.debug("Start to read the input dataset")
63+
posts_df=spark.read.json(posts_file)
64+
tags_df=spark.read.csv(tags_file, header=True)
65+
selected_tags=pd.read_csv(selected_tags_file, header=None)
66+
local_tags_to_catId=dict(zip(selected_tags[0], list(selected_tags.index)))
67+
local_catId_to_tags=dict(zip(list(selected_tags.index), selected_tags[0]))
68+
tags_to_catId=sc.broadcast(local_tags_to_catId)
69+
catId_to_tags=sc.broadcast(local_catId_to_tags)
70+
tags_set=sc.broadcast(set(selected_tags[0]))
71+
logger.debug("Read in dataset successfully")
72+
73+
except:
74+
logger.error("Can't input dataset")
75+
76+
# Join posts_df and tags_df together and prepare training dataset
77+
selected_tags_df=tags_df.filter(tags_df.Tag.isin(tags_set.value)).na.drop(how = 'any')
78+
tags_questions_df=selected_tags_df.join(posts_df, "Id")
79+
training_df=tags_questions_df.select(['Tag', 'Body','Id']).na.drop(how = 'any')
80+
logger.debug("successfully get training_df")
81+
82+
# tokenize post texts and get term frequency and inverted document frequency
83+
logger.debug("Start to generate TFIDF features")
84+
tokenizer=Tokenizer.load(tokenizer_file)
85+
tokenized_words=tokenizer.transform(training_df.na.drop(how = 'any'))
86+
hashing_TF=HashingTF.load(hashing_tf_file)
87+
TFfeatures=hashing_TF.transform(tokenized_words.na.drop(how = 'any'))
88+
89+
idfModel=IDFModel.load(idf_model_file)
90+
TFIDFfeatures=idfModel.transform(TFfeatures.na.drop(how = 'any'))
91+
logger.debug("Get TFIDF features successfully")
92+
93+
# for feature in TFIDFfeatures.select("IDF_features", "Tag").take(3):
94+
# logger.info(feature)
95+
96+
# register shutdown_hook
97+
atexit.register(shutdown_hook, spark_session=spark)
98+
99+
# Row(IDF_features=SparseVector(200, {7: 2.3773, 9: 2.1588, 32: 2.0067, 37: 1.7143, 49: 2.6727, 59: 2.9361, 114: 1.0654, 145: 2.9522, 167: 2.3751}), Tag=u'asp.net')
100+
# Trasfer data to be in labeled point format
101+
102+
test=TFIDFfeatures.rdd.map(lambda row: (float(tags_to_catId.value[row.Tag]), row.IDF_features, row.Id)).toDF()
103+
104+
# Train Naive Bayes model
105+
nb_model=NaiveBayesModel.load(nb_model_file)
106+
107+
# Evaluation the model
108+
# test_df=test.rdd.map(lambda row: ((row._2, row._3),[row._1])).reduceByKey(lambda a,b: a+b)
109+
# print test_df.collect()
110+
111+
predictions=nb_model.transform(test)
112+
evaluator=MulticlassClassificationEvaluator(labelCol="_1", predictionCol="prediction", metricName="accuracy")
113+
accuracy = evaluator.evaluate(predictions)
114+
print("Test set accuracy = " + str(accuracy/0.6023699978752843))
115+
116+
# prediction_and_label = test.map(lambda point : (nb_model.predict(point.features), point.label))
117+
# accuracy = 1.0 * prediction_and_label.filter(lambda x: 1.0 if x[0] == x[1] else 0.0).count() / test.count()
118+
119+
120+
121+
122+
123+
124+
125+
126+
127+
128+
129+
130+
131+
132+
133+

model_generation.cfg

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
[io]
2-
post_file = /Autotag/dataset/Questions.json
3-
tags_file = /Autotag/dataset/Tags.csv
4-
selected_tags_file = /Autotag/dataset/top100Tags.csv
5-
idf_model_file = /Autotag/idfModel
6-
nb_model_file = /Autotag/nbModel
2+
post_file = /Users/QiaoLiu1/Autotag/dataset/Questions.json
3+
tags_file = /Users/QiaoLiu1/Autotag/dataset/Tags.csv
4+
selected_tags_file = /Users/QiaoLiu1/Autotag/dataset/top100Tags.csv
5+
idf_model_file = /Users/QiaoLiu1/Autotag/models/idfModel
6+
nb_model_file = /Users/QiaoLiu1/Autotag/models/nbModel
7+
hashing_tf_file = /Users/QiaoLiu1/Autotag/models/hashingTF
8+
tokenizer_file = /Users/QiaoLiu1/Autotag/models/tokenizer
79

810
[spark]
9-
master = local[2]
11+
master = local[6]
1012

model_generation.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from pyspark.sql import SparkSession
1212
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, IDFModel
1313
from pyspark.ml.linalg import Vectors
14-
from pyspark.ml.classification import NaiveBayes
14+
from pyspark.ml.classification import NaiveBayes, NaiveBayesModel
1515
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
1616
from pyspark.sql.functions import udf, col
1717

@@ -30,6 +30,8 @@
3030

3131
idf_model_file=config.get('io','idf_model_file')
3232
nb_model_file=config.get('io','nb_model_file')
33+
hashing_tf_file=config.get('io', 'hashing_tf_file')
34+
tokenizer_file=config.get('io', 'tokenizer_file')
3335

3436
def shutdown_hook(spark_session):
3537
try:
@@ -45,6 +47,7 @@ def shutdown_hook(spark_session):
4547
logger.debug("Initializing Spark cluster")
4648
conf=SparkConf().setAppName('model_generation').setMaster(master)
4749
sc=SparkContext(conf=conf)
50+
sc.setLogLevel('INFO')
4851
logger.debug("Created Spark cluster successfully")
4952
except:
5053
logger.error("Fail to initialize spark cluster")
@@ -81,7 +84,9 @@ def shutdown_hook(spark_session):
8184
logger.debug("Start to generate TFIDF features")
8285
tokenizer=Tokenizer(inputCol="Body", outputCol="Words")
8386
tokenized_words=tokenizer.transform(training_df.na.drop(how = 'any'))
84-
hashing_TF=HashingTF(inputCol="Words", outputCol="Features")#, numFeatures=200
87+
tokenizer.save(tokenizer_file)
88+
hashing_TF=HashingTF(inputCol="Words", outputCol="Features", numFeatures=200000)#, numFeatures=200
89+
hashing_TF.save(hashing_tf_file)
8590
TFfeatures=hashing_TF.transform(tokenized_words.na.drop(how = 'any'))
8691

8792
idf=IDF(inputCol="Features", outputCol="IDF_features")
@@ -91,7 +96,7 @@ def shutdown_hook(spark_session):
9196
logger.debug("Get TFIDF features successfully")
9297

9398
# for feature in TFIDFfeatures.select("IDF_features", "Tag").take(3):
94-
# logger.info(feature)
99+
# logger.info(feature) =
95100

96101
# register shutdown_hook
97102
atexit.register(shutdown_hook, spark_session=spark)
@@ -114,7 +119,7 @@ def shutdown_hook(spark_session):
114119
predictions=nb_model.transform(test)
115120
evaluator=MulticlassClassificationEvaluator(labelCol="_1", predictionCol="prediction", metricName="accuracy")
116121
accuracy = evaluator.evaluate(predictions)
117-
print("Test set accuracy = " + str(accuracy))
122+
print("Test set accuracy = " + str(accuracy/0.6023699978752843))
118123

119124
# prediction_and_label = test.map(lambda point : (nb_model.predict(point.features), point.label))
120125
# accuracy = 1.0 * prediction_and_label.filter(lambda x: 1.0 if x[0] == x[1] else 0.0).count() / test.count()

streaming_prediction.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import logging
2+
import ConfigParser
3+
import pandas as pd
4+
import atexit
5+
6+
from pyspark import SparkContext, SparkConf
7+
from pyspark.streaming import StreamingContext
8+
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, IDFModel
9+
from pyspark.ml.linalg import Vectors
10+
from pyspark.ml.classification import NaiveBayes, NaiveBayesModel
11+
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
12+
from pyspark.sql.functions import udf, col
13+
from kafka import KafkaProducer
14+
from pyspark.streaming.kafka import KafkaUtils
15+
from kafka.errors import KafkaError
16+
17+
18+
logging.basicConfig()
19+
logger=logging.getLogger('model_generation')
20+
logger.setLevel(logging.DEBUG)
21+
22+
config=ConfigParser.ConfigParser()
23+
config.read('model_generation.cfg')
24+
25+
master=config.get('spark','master')
26+
27+
idf_model_file=config.get('io','idf_model_file')
28+
nb_model_file=config.get('io','nb_model_file')
29+
hashing_tf_file=config.get('io', 'hashing_tf_file')
30+
tokenizer_file=config.get('io', 'tokenizer_file')
31+
32+
def process_dStream(dStream):
33+
34+
35+
if __name__ == '__main__':
36+
37+
# Try to initialize a spark cluster with master, master can be local or mesos URL, which is configurable in config file
38+
try:
39+
logger.debug("Initializing Spark cluster")
40+
conf=SparkConf().setAppName('model_generation').setMaster(master)
41+
sc=SparkContext(conf=conf)
42+
sc.setLogLevel('INFO')
43+
ssc=StreamingContext(sc, 5)
44+
logger.debug("Created Spark cluster successfully")
45+
except:
46+
logger.error("Fail to initialize spark cluster")
47+
48+
try:
49+
spark=SparkSession.builder.config(conf=conf).getOrCreate()
50+
logger.debug("Initialized spark session successfully")
51+
except:
52+
logger.error("Fail to start spark session")
53+
54+
try:
55+
# Cread Dstream from multiple kafka topics and create a microbatch every 5 seconds
56+
directKafkaStream=KafkaUtils.createDirectStream(ssc, READ_TOPICS, {'metadata.broker.list':BROKER})
57+
logger.info('Create spark direct stream successfully')
58+
59+
except:
60+
logger.debug('Fail to create direct stream')
61+
62+
logger.info('Start to process data')
63+
process_dStream(directKafkaStream)
64+
65+
try:
66+
# Create Kafka producer
67+
KAFKA_PRODUCER=KafkaProducer(bootstrap_servers=BROKER)
68+
logger.info('Create kafka producer successfully')
69+
70+
except KafkaError as ke:
71+
logger.debug('Fail to create kafka producer, caused by %s' % ke.message)
72+
73+
74+
75+
76+
77+
78+
79+
80+
81+
82+
83+
84+

0 commit comments

Comments
 (0)