File tree Expand file tree Collapse file tree 2 files changed +45
-0
lines changed
Expand file tree Collapse file tree 2 files changed +45
-0
lines changed Original file line number Diff line number Diff line change 1+ from pyspark .sql import SparkSession
2+
3+ # Constants
4+ PATH = "../data/data1.csv"
5+
6+ def pprint (ll ):
7+ print ()
8+ for i , l in enumerate (ll ):
9+ print (f"{ i } :" , ", " .join ([str (li ) for li in l ]))
10+ print ()
11+
12+ def preprocess (row ):
13+ prow = row .split ("," )[1 :] # remove first column
14+ prow = list (filter (len , prow )) # remove empty columns
15+ return prow
16+
17+ def cross (row ):
18+ cp = [(i1 , i2 ) for i1 in row for i2 in row if i1 != i2 ]
19+ return cp
20+
21+ if __name__ == "__main__" :
22+
23+ # Start
24+ spark = SparkSession .builder .appName ("q1" ).getOrCreate ()
25+
26+ # Read and preprocess data
27+ g = spark .sparkContext .textFile (PATH )
28+ g = g .map (lambda row : preprocess (row ))
29+
30+ # Remove header
31+ header = g .first ()
32+ g = g .filter (lambda row : row != header )
33+
34+ pprint (g .take (5 ))
35+
36+ # Cross-product per row
37+ ga = g .map (lambda row : cross (row ))
38+ pprint (ga .take (5 ))
39+
40+ # Flatten
41+ gaf = ga .flatMap (lambda row : row )
42+ pprint (gaf .take (5 ))
43+
44+ # Stop
45+ spark .stop ()
You can’t perform that action at this time.
0 commit comments