Skip to content

Commit e6e38e3

Browse files
committed
improves&fixes runner
1 parent 1341331 commit e6e38e3

File tree

1 file changed

+28
-14
lines changed

1 file changed

+28
-14
lines changed

pkg_popularity_pipeline.py renamed to pkg_popularity_pipeline_local.py

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@
44
import apache_beam as beam
55
import argparse
66

7-
def find_matching_lines(line, search_query):
7+
def find_matching_lines(line, keyword):
88

9-
if line.startswith(term):
9+
if line.startswith(keyword):
1010

1111
yield line
1212

@@ -56,26 +56,40 @@ def compare_by_value(kv1, kv2):
5656

5757
return value1 < value2
5858

59-
if __name__ == '__main__':
59+
def run():
60+
6061
parser = argparse.ArgumentParser(description='Find the most used Java packages')
61-
parser.add_argument('--output_prefix', default='/tmp/output', help='Output prefix')
62-
parser.add_argument('--input', default='../javahelp/src/main/java/com/google/cloud/training/dataanalyst/javahelp/', help='Input directory')
62+
63+
parser.add_argument('--output_prefix',
64+
default='/tmp/output',
65+
help='Output prefix')
66+
67+
parser.add_argument('--input',
68+
default='../javahelp/src/main/java/com/google/cloud/training/dataanalyst/javahelp/',
69+
help='Input files location directory')
6370

6471
options, pipeline_args = parser.parse_known_args()
65-
p = beam.Pipeline(argv=pipeline_args)
72+
73+
pipeline = beam.Pipeline(argv=pipeline_args)
6674

6775
input = '{0}*.java'.format(options.input)
76+
6877
output_prefix = options.output_prefix
78+
6979
keyword = 'import'
7080

7181
# find most used packages
72-
(p
73-
| 'GetJava' >> beam.io.ReadFromText(input)
74-
| 'GetImports' >> beam.FlatMap(lambda line: find_matching_lines(line, keyword))
75-
| 'PackageUse' >> beam.FlatMap(lambda line: resolve_package_usage(line, keyword))
76-
| 'TotalUse' >> beam.CombinePerKey(sum)
77-
| 'Top_5' >> beam.transforms.combiners.Top.Of(5, compare_by_value)
78-
| 'write' >> beam.io.WriteToText(output_prefix)
82+
(pipeline
83+
| 'GetInput' >> beam.io.ReadFromText(input)
84+
| 'Imports' >> beam.FlatMap(lambda line: find_matching_lines(line, keyword))
85+
| 'PackageUsage' >> beam.FlatMap(lambda line: resolve_package_usage(line, keyword))
86+
| 'TotalPackageUse' >> beam.CombinePerKey(sum)
87+
| 'Top5PackageUsage' >> beam.transforms.combiners.Top.Of(5, compare_by_value)
88+
| 'WriteOutput' >> beam.io.WriteToText(output_prefix)
7989
)
8090

81-
p.run().wait_until_finish()
91+
pipeline.run().wait_until_finish()
92+
93+
if __name__ == '__main__':
94+
95+
run()

0 commit comments

Comments
 (0)