Skip to content

Commit aa7a83b

Browse files
committed
Finished tutorial 1.
1 parent c39052a commit aa7a83b

File tree

6 files changed

+20326
-1
lines changed

6 files changed

+20326
-1
lines changed

1_word_count/README.md

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ Main resources used in this tutorial;
55
* [WordCount.java](https://github.com/achmand/flink-java-tutorials/blob/master/1_word_count/src/main/java/tutorial/WordCount.java)
66
* [pom.xml](https://github.com/achmand/flink-java-tutorials/blob/master/1_word_count/pom.xml)
77

8+
## Tutorial
89
### Step 1: Setting up a new Maven project.
910
In your IDE create a new Maven Project. Once the project is created, create a new package in `src/main/java` and name it `tutorial`. Inside the newly created package, create a new class and name it `WordCount`.
1011

@@ -44,13 +45,21 @@ Once you added these dependencies, clean Maven project and install. The image be
4445
### Step 3: Implementing the WordCount class.
4546
Open the `WordCount` class and import the following packages.
4647
```
48+
// importing packages
49+
import org.apache.flink.api.common.functions.FlatMapFunction;
50+
import org.apache.flink.api.java.DataSet;
51+
import org.apache.flink.api.java.ExecutionEnvironment;
52+
import org.apache.flink.api.java.tuple.Tuple2;
53+
import org.apache.flink.api.java.utils.ParameterTool;
54+
import org.apache.flink.util.Collector;
4755
```
4856

4957
After importing the packages write the `main(String[] args)` function. This is the entry point for our Java program.
5058
```
5159
public static void main(String[] args) throws Exception{}
5260
```
53-
Inside the `main(String[] args)` function include the following lines of code. The comments inside this code explains the use of each line of code. If the explanation in the comments are not enough to understand the following code, we suggest to read through the following sources.
61+
#### Step 3.1: Setting up Execution Environment & Global Job Params.
62+
Inside the `main(String[] args)` function include the following lines of code. The comments inside this code explains the use of each line. If the explanation in the comments is not enough to understand the following code, we suggest to read through the following sources.
5463
* [ExecutionEnvironment](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/java/ExecutionEnvironment.html)
5564
* [ParameterTool](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/java/utils/ParameterTool.html)
5665
* [setGlobalJobParameters](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/ExecutionConfig.html#setGlobalJobParameters-org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters-)
@@ -69,8 +78,93 @@ final ParameterTool parameters = ParameterTool.fromArgs(args);
6978
// register parameters globally so it can be available for each node in the cluster
7079
environment.getConfig().setGlobalJobParameters(parameters);
7180
```
81+
#### Step 3.2: Dataset Transformations.
82+
The next step now is to read the text file from the given input path. One of the parameters passed to the program will be `input` which will be the input path for our text file. The following code will read the text file from the `input` specified and it will convert it into a `DataSet<String>` instance.
83+
```
84+
// read text file from the parameter 'input' passed in args
85+
// line-by-line and returns them as Strings
86+
DataSet<String> textLines = environment.readTextFile(parameters.get("input"));
87+
```
88+
Now we need to take the `DataSet<String>` instance and tokenize each word as a tuple of `(word, 1)`. To do so we need to apply the `map` operation to the `DataSet<String>` instance. The `map` operation takes a class which implements the `MapFunction` interface as an argument. So inside the `WordCount` class we will implement a new class called `Tokenizer` which implements the `MapFunction` interface.
89+
90+
*NOTE: In our implementation we used the `FlatMapFunction` interface, since operations that produce multiple result elements from a single input element are implemented using this interface.*
7291

92+
For more information on the `MapFunction` interface take a look at this [source](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/MapFunction.html) and for the `FlatMapFunction` interface take a look at this [source](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/FlatMapFunction.html).
93+
```
94+
// user-defined tokenizer, splits the lines into words and emits tuple of (word, 1)
95+
public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
96+
97+
@Override
98+
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
99+
// normalize/convert to lower case and split the line into words
100+
String[] words = value.toLowerCase().split("\\W+");
101+
102+
// emit the pairs (word, 1)
103+
for (String word : words) {
104+
// check that the length is greater than 0
105+
if (word.length() > 0) {
106+
// append tuple (word, 1)
107+
out.collect(new Tuple2<String, Integer>(word, 1));
108+
}
109+
}
110+
}
111+
}
112+
```
113+
Now we call the `map` operation and pass an instance of the newly created `Tokenizer` class as an argument. Then we need to group by the word and sum up by the value which is shown in the following code.
73114

115+
```
116+
// tokenize the lines from textLines to (word, 1), groups and then counts
117+
DataSet<Tuple2<String, Integer>> counts =
118+
// split up the lines in pairs (2-tuples) containing: (word,1)
119+
textLines.flatMap(new Tokenizer())
120+
// group by the tuple field "0" which is the word
121+
.groupBy(0)
122+
// sum up tuple field "1"
123+
.sum(1);
124+
```
125+
#### Step 3.3: Output the Result.
126+
The final bit for this implementation is to output the results.
127+
```
128+
// output the final result
129+
// check that the argument 'output' was passed to save in that path
130+
if(parameters.has("output")){
131+
// write result as CSV row delimiter is a line break, field delimiter is a space
132+
result.writeAsCsv(parameters.get("output"), "\n", " ");
133+
134+
// execute program, triggers execution, 'Tutorial_1' is the job name
135+
environment.execute("Tutorial_1");
136+
}
137+
```
138+
### Step 4: Building JAR and Execute on Flink.
139+
To build the JAR using IntelliJ follow these steps;
140+
* File -> Project Structure -> Project Settings -> Artifacts -> Click green plus sign -> Jar -> From modules with dependencies...
141+
* Select 'Extract to the target Jar'
142+
* Click on 'OK'
143+
* Build | Build Artifact
144+
145+
Once the JAR is built, we need a `txt` file to read as an input. A file located in this repository called `pride_and_prejudice.txt` will be used as an input. This file can be downloaded from [Project Gutenberg](https://www.gutenberg.org/ebooks/1342).
146+
147+
Now open a terminal and execute the following commands to run the JAR file on Apache Flink. First start a new cluster by typing the following in the terminal `flink/build-target/bin/start-cluster.sh`. After open a browser and go to `http://localhost:8081/` to make sure it's running.
148+
149+
*NOTE: Make sure the paths match your locations on your machine.*
150+
151+
Once the cluster is running on the machine execute the JAR file by executing the following (replace with your paths) `FLINKPATH RUN -c MAINCLASS JARPATH --input INPUTPATH --output OUTPUTPATH`.
152+
153+
On our machine this command was as follows:
154+
`flink/build-target/bin/flink run -c tutorial.WordCount flink-java-tutorials/1_word_count/out/artifacts/1_word_count_jar/1_word_count.jar --input flink-java-tutorials/1_word_count/pride_and_prejudice.txt --output flink-java-tutorials/1_word_count/ouput`.
155+
156+
Once the job is finished the following output is shown in the terminal and the output is saved in the specified path passed in `--output`.
157+
```
158+
Starting execution of program
159+
Program execution finished
160+
Job with JobID 7a64e43b07fa0711fbb6285fb95837e2 has finished.
161+
Job Runtime: 2174 ms
162+
```
163+
To stop a running cluster execute the following command `flink/build-target/bin/stop-cluster.sh`.
74164

165+
## Exercises
166+
* Re-run the code but provide another input file, you can download another source from [Project Gutenberg](https://www.gutenberg.org/ebooks/1342).
167+
* Clean the words (numbers, punctuation, stop-words, cases, etc..)
168+
* Count only words starting with a specific letter, this letter must be passed as an argument similiar to the arguments passed for the input/output paths.
75169

76170

0 commit comments

Comments
 (0)