File tree Expand file tree Collapse file tree 1 file changed +50
-0
lines changed Expand file tree Collapse file tree 1 file changed +50
-0
lines changed Original file line number Diff line number Diff line change
1
+ # -*- coding: utf-8 -*-
2
+ """
3
+ Created on Thu Mar 15 10:59:40 2018
4
+
5
+ @author: VB
6
+ """
7
+
8
+ from pyspark import SparkContext ,SparkConf
9
+
10
+ def extract_movies (lines ):
11
+ return lines .split ()[1 ]
12
+
13
+ def flip (x ):
14
+ return (x [1 ],x [0 ])
15
+
16
+ def load_movie_names ():
17
+ movie_names = {}
18
+ with open ('../datasets/ml-100k/u.item' ) as f :
19
+ for line in f :
20
+ fields = line .split ('|' )
21
+ movie_names [fields [0 ]] = fields [1 ]
22
+
23
+ return movie_names
24
+
25
+
26
+
27
+ conf = SparkConf ().setMaster ("local" ).setAppName ("MostPopularMovie" )
28
+ sc = SparkContext (conf = conf )
29
+
30
+ #broadcasting movieid moviename dict
31
+ name_dict = sc .broadcast (load_movie_names ())
32
+
33
+ lines_rdd = sc .textFile ("file:///Github_Projects/Taming_Apache_Spark_With_Python/datasets/ml-100k/u.data" )
34
+ movies = lines_rdd .map (extract_movies ).map (lambda x : (x ,1 ))
35
+ movies_count = movies .reduceByKey (lambda x ,y : x + y )
36
+ movies_sorted = movies_count .map (flip ).sortByKey ()
37
+
38
+ movies_sorted_with_names = movies_sorted .map (lambda x :(name_dict .value [x [1 ]],x [0 ]))
39
+
40
+ results = movies_sorted_with_names .collect ()
41
+
42
+ print (results )
43
+
44
+
45
+
46
+
47
+
48
+
49
+
50
+
You can’t perform that action at this time.
0 commit comments