Skip to content

Commit 849b718

Browse files
committed
update streaming process
1 parent 3c1b628 commit 849b718

File tree

6 files changed

+360
-87
lines changed

6 files changed

+360
-87
lines changed

evaluator.py

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,10 @@ def shutdown_hook(spark_session):
8181

8282
# tokenize post texts and get term frequency and inverted document frequency
8383
logger.debug("Start to generate TFIDF features")
84-
tokenizer=Tokenizer.load(tokenizer_file)
84+
tokenizer=Tokenizer.load(tokenizer_file)h
8585
tokenized_words=tokenizer.transform(training_df.na.drop(how = 'any'))
8686
hashing_TF=HashingTF.load(hashing_tf_file)
87-
TFfeatures=hashing_TF.transform(tokenized_words.na.drop(how = 'any'))
87+
TFfeatures=hashing_TF.transform(tokenized_words.na.drop(how = 'any'))i
8888

8989
idfModel=IDFModel.load(idf_model_file)
9090
TFIDFfeatures=idfModel.transform(TFfeatures.na.drop(how = 'any'))
@@ -118,16 +118,3 @@ def shutdown_hook(spark_session):
118118

119119

120120

121-
122-
123-
124-
125-
126-
127-
128-
129-
130-
131-
132-
133-

model_generation.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[io]
2-
post_file = /Users/QiaoLiu1/Autotag/dataset/Questions.json
2+
post_file = /Users/QiaoLiu1/Autotag/test/Questions.json
33
tags_file = /Users/QiaoLiu1/Autotag/dataset/Tags.csv
44
selected_tags_file = /Users/QiaoLiu1/Autotag/dataset/top100Tags.csv
55
idf_model_file = /Users/QiaoLiu1/Autotag/models/idfModel

model_generation.py

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from pyspark.ml.classification import NaiveBayes, NaiveBayesModel
1515
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
1616
from pyspark.sql.functions import udf, col
17+
from pyspark.sql.types import FloatType, StringType
1718

1819

1920
logging.basicConfig()
@@ -85,7 +86,7 @@ def shutdown_hook(spark_session):
8586
tokenizer=Tokenizer(inputCol="Body", outputCol="Words")
8687
tokenized_words=tokenizer.transform(training_df.na.drop(how = 'any'))
8788
tokenizer.save(tokenizer_file)
88-
hashing_TF=HashingTF(inputCol="Words", outputCol="Features", numFeatures=200000)#, numFeatures=200
89+
hashing_TF=HashingTF(inputCol="Words", outputCol="Features", numFeatures=200)#, numFeatures=200
8990
hashing_TF.save(hashing_tf_file)
9091
TFfeatures=hashing_TF.transform(tokenized_words.na.drop(how = 'any'))
9192

@@ -104,11 +105,14 @@ def shutdown_hook(spark_session):
104105
# Row(IDF_features=SparseVector(200, {7: 2.3773, 9: 2.1588, 32: 2.0067, 37: 1.7143, 49: 2.6727, 59: 2.9361, 114: 1.0654, 145: 2.9522, 167: 2.3751}), Tag=u'asp.net')
105106
# Trasfer data to be in labeled point format
106107

107-
labeled_points=TFIDFfeatures.rdd.map(lambda row: (float(tags_to_catId.value[row.Tag]), row.IDF_features, row.Id)).toDF()
108+
tags_to_catId_transform = udf(lambda tag: float(tags_to_catId.value[tag]), FloatType())
109+
catId_to_tags_transform = udf(lambda catId: catId_to_tags.value[int(catId)], StringType())
110+
111+
labeled_points=TFIDFfeatures.withColumn('CatId', tags_to_catId_transform('Tag')) # rdd.map(lambda row: (float(tags_to_catId.value[row.Tag]), row.IDF_features, row.Id)).toDF()
108112
training, test=labeled_points.randomSplit([0.7, 0.3], seed=0)
109113

110114
# Train Naive Bayes model
111-
nb=NaiveBayes(smoothing=1.0, modelType="multinomial", labelCol='_1', featuresCol='_2')
115+
nb=NaiveBayes(smoothing=1.0, modelType="multinomial", labelCol='TagId', featuresCol='IDF_features')
112116
nb_model=nb.fit(training)
113117
nb_model.save(nb_model_file)
114118

@@ -117,7 +121,7 @@ def shutdown_hook(spark_session):
117121
# print test_df.collect()
118122

119123
predictions=nb_model.transform(test)
120-
evaluator=MulticlassClassificationEvaluator(labelCol="_1", predictionCol="prediction", metricName="accuracy")
124+
evaluator=MulticlassClassificationEvaluator(labelCol="TagId", predictionCol="prediction", metricName="accuracy")
121125
accuracy = evaluator.evaluate(predictions)
122126
print("Test set accuracy = " + str(accuracy/0.6023699978752843))
123127

@@ -126,16 +130,3 @@ def shutdown_hook(spark_session):
126130

127131

128132

129-
130-
131-
132-
133-
134-
135-
136-
137-
138-
139-
140-
141-

streaming_prediction.cfg

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
[spark]
2+
master =
3+
4+
[io]
5+
tokenizer_file =
6+
hashing_tf_file =
7+
idf_model_file =
8+
nb_model_file
9+
10+
[kafka]
11+
broker_ip =
12+
kafka_topic =
13+
kafka_output_topic =

streaming_prediction.py

Lines changed: 145 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,84 +1,175 @@
11
import logging
22
import ConfigParser
3-
import pandas as pd
43
import atexit
4+
import json
5+
import pandas as pd
56

6-
from pyspark import SparkContext, SparkConf
7+
from pyspark import SparkContext, SparkConf
78
from pyspark.streaming import StreamingContext
8-
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, IDFModel
9-
from pyspark.ml.linalg import Vectors
10-
from pyspark.ml.classification import NaiveBayes, NaiveBayesModel
11-
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
12-
from pyspark.sql.functions import udf, col
9+
from pyspark.sql import SparkSession
1310
from kafka import KafkaProducer
1411
from pyspark.streaming.kafka import KafkaUtils
1512
from kafka.errors import KafkaError
16-
17-
18-
logging.basicConfig()
19-
logger=logging.getLogger('model_generation')
13+
from pyspark.ml.feature import Tokenizer, HashingTF, IDFModel
14+
from pyspark.ml.classification import NaiveBayesModel
15+
from pyspark.sql.functions import udf
16+
from pyspark.sql.types import StringType, FloatTypex
17+
18+
#Set up logger
19+
logging.BasicConfig()
20+
logger = logging.getLogger('Streaming_prediction')
2021
logger.setLevel(logging.DEBUG)
2122

22-
config=ConfigParser.ConfigParser()
23-
config.read('model_generation.cfg')
24-
25-
master=config.get('spark','master')
26-
27-
idf_model_file=config.get('io','idf_model_file')
28-
nb_model_file=config.get('io','nb_model_file')
29-
hashing_tf_file=config.get('io', 'hashing_tf_file')
30-
tokenizer_file=config.get('io', 'tokenizer_file')
23+
# set up configuration file parser
24+
config = ConfigParser.ConfigParser()
25+
config.read('streaming_prediction')
26+
27+
master = config.get('spark', 'master')
28+
broker_ip = config.get('kafka', 'broker_ip')
29+
kafka_topic = config.get('kafka', 'kafka_topic')
30+
kafka_output_topic = config.get('kafka', 'kafka_output_topic')
31+
32+
tokenizer_file = config.get('io', 'tokenizer_file')
33+
hashing_tf_file = config.get('io', 'hashing_tf_file')
34+
idf_model_file = config.get('io', 'idf_model_file')
35+
nb_model_file = config.get('io', 'nb_model_file')
36+
37+
idf_model = None
38+
nb_model = None
39+
hashing_tf = None
40+
tokenizer = None
41+
42+
tags_to_catId_transform = None
43+
catId_to_tags_transform = None
44+
45+
46+
def process_data(dStream, kafka_producer):
47+
48+
def features_extraction(df):
49+
# Extract featrues
50+
try:
51+
logger.debug('Extracting features from data')
52+
words_df = tokenizer.transform(df)
53+
tf_features_df = hashing_tf.transform(words_df)
54+
tf_idf_features_df = idf_model.transform(tf_features_df)
55+
logger.debug('Extract features successfully')
56+
return tf_features_df
57+
except:
58+
logger.warn('Fail to extract features from Questions')
59+
60+
def predict_tag(df):
61+
# Predict the tags according to extracted features
62+
try:
63+
logger.debug('Predicting data tag')
64+
post_data = df.withColumn('CatId', tags_to_catId_transform('Tag'))
65+
prediction = nb_model.transform(post_data)
66+
output_data = prediction.withColumn('Predicted_tag', catId_to_tags('CatId'))
67+
logger.debug('Predicted tags are generated')
68+
return output_data
69+
except:
70+
logger.warn('Fail to predict tags')
71+
72+
# Write data back to Kafka producer
73+
def persist_data(row):
74+
tagged_data = json.dumps(row.asDict())
75+
try:
76+
logger.debug('Wrting data to Kafka topic %s' % kafka_output_topic)
77+
kafka_producer.send(kafka_output_topic, value=tagged_data)
78+
logger.info('sent data successfully')
79+
except:
80+
logger.debug('Fail to send stock data %s' % tagged_data)
81+
82+
stream_df = spark.read.json(dStream)
83+
features_df = features_extraction(stream_df)
84+
predictions = predict_tag(features_df)
85+
predictions.foreach(persist_data)
86+
87+
88+
# Create shut down hook
89+
def shutdown_hook(kafka_producer, spark):
90+
# Shut down kafka producer
91+
try:
92+
logger.debug('Closing kafka producer')
93+
kafka_producer.flush(10)
94+
kafka_producer.close()
95+
logger.debug('Stop kafka producer successfully')
96+
except KafkaError as ke:
97+
logger.warn('Fail to stop kafka producer, caused by %s' % ke.message)
3198

32-
def process_dStream(dStream):
99+
try;
100+
logger.debug('Shut down spark context')
101+
spark.close()
102+
logger.debug('Stop spark successfully')
103+
except:
104+
logger.warn('Fail to stop spark')
33105

34106

35107
if __name__ == '__main__':
36-
37-
# Try to initialize a spark cluster with master, master can be local or mesos URL, which is configurable in config file
108+
109+
#build spark context
38110
try:
39-
logger.debug("Initializing Spark cluster")
40-
conf=SparkConf().setAppName('model_generation').setMaster(master)
41-
sc=SparkContext(conf=conf)
111+
logger.debug('Set up sparkcontext and sparkstreamingcontext')
112+
conf = SparkConf().setAppName('Streaming_prediction').setMaster(master)
113+
sc = SparkContext(conf=conf)
42114
sc.setLogLevel('INFO')
43-
ssc=StreamingContext(sc, 5)
44-
logger.debug("Created Spark cluster successfully")
45-
except:
46-
logger.error("Fail to initialize spark cluster")
47-
115+
ssc = StreamingContext(sc, 5)
116+
logger.debug('Initialize spark context and sparkstreamingcontext successfully')
117+
except Exception as e:
118+
logger.debug('Fail to start spark context and sparkstreamingcontext')
119+
raise
120+
finally:
121+
sc.close()
122+
123+
# Start a sparksession
48124
try:
49-
spark=SparkSession.builder.config(conf=conf).getOrCreate()
50-
logger.debug("Initialized spark session successfully")
125+
logger.debug('Set up SparkSession')
126+
spark = SparkSession.builder.getOrCreate()
127+
logger.debug('Start spark session successfully')
51128
except:
52-
logger.error("Fail to start spark session")
129+
logger.debug('Fail to start sparksession')
53130

131+
# Connect to Kafka broker
54132
try:
55-
# Cread Dstream from multiple kafka topics and create a microbatch every 5 seconds
56-
directKafkaStream=KafkaUtils.createDirectStream(ssc, READ_TOPICS, {'metadata.broker.list':BROKER})
57-
logger.info('Create spark direct stream successfully')
133+
# Create kafka producer
134+
logger.debug('Initialize kafka producer')
135+
kafka_producer = KafkaProducer(bootstrap_servers=broker_ip)
136+
logger.debug('Start kafka producer successfully')
137+
except KafkaError as ke:
138+
logger.debug('Fail to start kafka producer, caused by %s' % ke.message)
58139

140+
try:
141+
# Create dstream from kafka topic
142+
directKafkaStream = KafkaUtils.createDirectStream(ssc, kafka_topic, {'metadata.broker.list' = broker_ip})
143+
logger.debug('Create direct dstream from kafka successfully')
59144
except:
60-
logger.debug('Fail to create direct stream')
145+
logger.debug('Unable to create dstream from kafka')
61146

62-
logger.info('Start to process data')
63-
process_dStream(directKafkaStream)
147+
atexit.register(shutdown_hook, kafka_producer, spark)
64148

149+
# Load in idf_model, nb_model, hashing_tf, idf_model and tag_catId map
65150
try:
66-
# Create Kafka producer
67-
KAFKA_PRODUCER=KafkaProducer(bootstrap_servers=BROKER)
68-
logger.info('Create kafka producer successfully')
69-
70-
except KafkaError as ke:
71-
logger.debug('Fail to create kafka producer, caused by %s' % ke.message)
72-
73-
74-
75-
76-
77-
78-
79-
151+
logger.debug('Loading models')
152+
tokenizer = Tokenizer.load(tokenizer_file)
153+
hashing_tf = HashingTF.load(hashing_tf_file)
154+
idf_model = IDFModel.load(idf_model_file)
155+
nb_model = NaiveBayesModel.load(nb_model_file)
156+
selected_tags = pd.read_csv(selected_tags_file, header=None)
157+
local_catId_to_tags = dict(zip(list(selected_tags.index), selected_tags[0]))
158+
local_tags_to_catId=dict(zip(selected_tags[0], list(selected_tags.index)))
159+
catId_to_tags = sc.broadcast(local_catId_to_tags)
160+
tags_to_catId = sc.broadcast(local_tags_to_catId)
161+
tags_to_catId_transform = udf(lambda tag: float(tags_to_catId.value[tag]), FloatType())
162+
catId_to_tags_transform = udf(lambda catId: catId_to_tags.value[catId], StringType())
163+
logger.debug('loaded models successfully')
164+
except:
165+
logger.debug('Fail to load models')
80166

81167

168+
logger.debug('Start to process data')
169+
process_data(directKafkaStream, kafka_producer)
170+
ssc.start()
171+
ssc.awaitTermination()
82172

173+
83174

84175

0 commit comments

Comments
 (0)