|
4 | 4 | import logging |
5 | 5 | import ConfigParser |
6 | 6 | import pandas as pd |
| 7 | +import csv |
| 8 | +import atexit |
7 | 9 |
|
8 | 10 | from pyspark import SparkContext, SparkConf |
9 | 11 | from pyspark.sql import SparkSession |
10 | | -from pyspark.ml.feature import HashingTF, IDF, Tokenizer |
11 | | -from pyspark.mllib.regression import LabeledPoint |
12 | | -from pyspark.mllib.linalg import SparseVector |
| 12 | +from pyspark.ml.feature import HashingTF, IDF, Tokenizer, IDFModel |
| 13 | +from pyspark.ml.linalg import Vectors |
13 | 14 | from pyspark.ml.classification import NaiveBayes |
14 | 15 | from pyspark.ml.evaluation import MulticlassClassificationEvaluator |
| 16 | +from pyspark.sql.functions import udf, col |
15 | 17 |
|
16 | 18 |
|
17 | 19 | logging.basicConfig() |
|
26 | 28 | tags_file=config.get('io', 'tags_file') |
27 | 29 | selected_tags_file=config.get('io', 'selected_tags_file') |
28 | 30 |
|
| 31 | +idf_model_file=config.get('io','idf_model_file') |
| 32 | +nb_model_file=config.get('io','nb_model_file') |
| 33 | + |
| 34 | +def shutdown_hook(spark_session): |
| 35 | +try: |
| 36 | +spark_session.close() |
| 37 | +logger.debug("Successfully stop spark session and spark context") |
| 38 | +except: |
| 39 | +logger.debug("Fail to stop spark session and spark context") |
29 | 40 |
|
30 | 41 | if __name__ == '__main__': |
31 | 42 |
|
|
47 | 58 | # Input the dataset |
48 | 59 | try: |
49 | 60 | logger.debug("Start to read the input dataset") |
50 | | -posts_df=spark.read.csv(posts_file, header=True) |
| 61 | +posts_df=spark.read.json(posts_file) |
51 | 62 | tags_df=spark.read.csv(tags_file, header=True) |
52 | 63 | selected_tags=pd.read_csv(selected_tags_file, header=None) |
53 | 64 | local_tags_to_catId=dict(zip(selected_tags[0], list(selected_tags.index))) |
|
56 | 67 | catId_to_tags=sc.broadcast(local_catId_to_tags) |
57 | 68 | tags_set=sc.broadcast(set(selected_tags[0])) |
58 | 69 | logger.debug("Read in dataset successfully") |
| 70 | + |
59 | 71 | except: |
60 | 72 | logger.error("Can't input dataset") |
61 | 73 |
|
62 | 74 | # Join posts_df and tags_df together and prepare training dataset |
63 | | -selected_tags_df=tags_df.filter(tags_df.Tag.isin(tags_set.value)) |
64 | | -tags_questions_df=posts_df.join(selected_tags_df, posts_df.Id==selected_tags_df.Id) |
65 | | -training_df=tags_questions_df.select(['Tag', 'Body']) |
| 75 | +selected_tags_df=tags_df.filter(tags_df.Tag.isin(tags_set.value)).na.drop(how = 'any') |
| 76 | +tags_questions_df=selected_tags_df.join(posts_df, "Id") |
| 77 | +training_df=tags_questions_df.select(['Tag', 'Body','Id']).na.drop(how = 'any') |
| 78 | +logger.debug("successfully get training_df") |
66 | 79 |
|
67 | 80 | # tokenize post texts and get term frequency and inverted document frequency |
| 81 | +logger.debug("Start to generate TFIDF features") |
68 | 82 | tokenizer=Tokenizer(inputCol="Body", outputCol="Words") |
69 | | -tokenized_words=tokenizer.transform(training_df) |
70 | | -hashing_TF=HashingTF(inputCol="Words", outputCol="Features", numFeatures=200) |
71 | | -TFfeatures=hashing_TF.transform(tokenized_words) |
| 83 | +tokenized_words=tokenizer.transform(training_df.na.drop(how = 'any')) |
| 84 | +hashing_TF=HashingTF(inputCol="Words", outputCol="Features")#, numFeatures=200 |
| 85 | +TFfeatures=hashing_TF.transform(tokenized_words.na.drop(how = 'any')) |
72 | 86 |
|
73 | 87 | idf=IDF(inputCol="Features", outputCol="IDF_features") |
74 | | -idfModel=idf.fit(TFfeatures) |
75 | | -TFIDFfeatures=idfModel.transform(TFfeatures) |
| 88 | +idfModel=idf.fit(TFfeatures.na.drop()) |
| 89 | +idfModel.save(idf_model_file) |
| 90 | +TFIDFfeatures=idfModel.transform(TFfeatures.na.drop(how = 'any')) |
| 91 | +logger.debug("Get TFIDF features successfully") |
76 | 92 |
|
77 | | -for feature in TFIDFfeatures.select("IDF_features", "Tag").take(3): |
78 | | -logger.info(feature) |
| 93 | +# for feature in TFIDFfeatures.select("IDF_features", "Tag").take(3): |
| 94 | +# logger.info(feature) |
| 95 | + |
| 96 | +# register shutdown_hook |
| 97 | +atexit.register(shutdown_hook, spark_session=spark) |
79 | 98 |
|
80 | 99 | # Row(IDF_features=SparseVector(200, {7: 2.3773, 9: 2.1588, 32: 2.0067, 37: 1.7143, 49: 2.6727, 59: 2.9361, 114: 1.0654, 145: 2.9522, 167: 2.3751}), Tag=u'asp.net') |
81 | 100 | # Trasfer data to be in labeled point format |
82 | | -labeled_points=TFIDFfeatures.rdd.map(lambda row: LabeledPoint(label=tags_to_catId.value[row.Tag], features=SparseVector(row.IDF_features.size, row.IDF_features.indices, row.IDF_features.values))) |
| 101 | + |
| 102 | +labeled_points=TFIDFfeatures.rdd.map(lambda row: (float(tags_to_catId.value[row.Tag]), row.IDF_features, row.Id)).toDF() |
83 | 103 | training, test=labeled_points.randomSplit([0.7, 0.3], seed=0) |
84 | 104 |
|
85 | 105 | # Train Naive Bayes model |
86 | | -print training.take(3) |
87 | | -nb=NaiveBayes(smoothing=1.0, modelType="multinomial")# |
| 106 | +nb=NaiveBayes(smoothing=1.0, modelType="multinomial", labelCol='_1', featuresCol='_2') |
88 | 107 | nb_model=nb.fit(training) |
| 108 | +nb_model.save(nb_model_file) |
| 109 | + |
| 110 | +# Evaluation the model |
| 111 | +# test_df=test.rdd.map(lambda row: ((row._2, row._3),[row._1])).reduceByKey(lambda a,b: a+b) |
| 112 | +# print test_df.collect() |
89 | 113 |
|
90 | | - # Evaluation the model |
91 | 114 | predictions=nb_model.transform(test) |
92 | | -print predictions.take(10) |
93 | | -# evaluator=MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction") |
| 115 | +evaluator=MulticlassClassificationEvaluator(labelCol="_1", predictionCol="prediction", metricName="accuracy") |
| 116 | +accuracy = evaluator.evaluate(predictions) |
| 117 | +print("Test set accuracy = " + str(accuracy)) |
| 118 | + |
94 | 119 | # prediction_and_label = test.map(lambda point : (nb_model.predict(point.features), point.label)) |
95 | 120 | # accuracy = 1.0 * prediction_and_label.filter(lambda x: 1.0 if x[0] == x[1] else 0.0).count() / test.count() |
96 | 121 |
|
| 122 | + |
| 123 | + |
| 124 | + |
97 | 125 |
|
98 | 126 |
|
99 | 127 |
|
|
0 commit comments