Skip to content

Commit dc44746

Browse files
committed
changes
1 parent 38fa050 commit dc44746

File tree

9 files changed

+242
-6
lines changed

9 files changed

+242
-6
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
.idea
55
target
6-
*/META-INF/*
6+
META-INF
77

88
# User-specific stuff
99
.idea/**/workspace.xml

1_word_count/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ Now open a terminal and execute the following commands to run the JAR file on Ap
148148

149149
*NOTE: Make sure the paths match your locations on your machine.*
150150

151-
Once the cluster is running on the machine execute the JAR file by executing the following (replace with your paths) `FLINKPATH RUN -c MAINCLASS JARPATH --input INPUTPATH --output OUTPUTPATH`.
151+
Once the cluster is running on the machine execute the JAR file by executing the following (replace with your paths) `FLINKPATH run -c MAINCLASS JARPATH --input INPUTPATH --output OUTPUTPATH`.
152152

153153
On our machine this command was as follows:
154154
`flink/build-target/bin/flink run -c tutorial.WordCount flink-java-tutorials/1_word_count/out/artifacts/1_word_count_jar/1_word_count.jar --input flink-java-tutorials/1_word_count/pride_and_prejudice.txt --output flink-java-tutorials/1_word_count/ouput`.

1_word_count/src/main/java/tutorial/WordCount.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package tutorial;
22

33
// importing packages
4+
45
import org.apache.flink.api.common.functions.FlatMapFunction;
56
import org.apache.flink.api.java.DataSet;
67
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -13,7 +14,7 @@
1314
*/
1415
public class WordCount {
1516

16-
public static void main(String[] args) throws Exception{
17+
public static void main(String[] args) throws Exception {
1718

1819
// returns the execution environment (the context 'Local or Remote' in which a program is executed)
1920
// LocalEnvironment will cause execution in the current JVM
@@ -42,7 +43,7 @@ public static void main(String[] args) throws Exception{
4243

4344
// output the final result
4445
// check that the argument 'output' was passed to save in that path
45-
if(parameters.has("output")){
46+
if (parameters.has("output")) {
4647
// write result as CSV row delimiter is a line break, field delimiter is a space
4748
result.writeAsCsv(parameters.get("output"), "\n", " ");
4849

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,10 @@ sudo tar -xvzf ~/Downloads/apache-maven-3.2.5-bin.tar.gz
6868
sudo nano /etc/environment
6969
7070
# add the following environment variable
71-
M2_HOME="/opt/apache-maven-3.6.0"
71+
M2_HOME="/opt/apache-maven-3.2.5"
7272
7373
# append the bin directory to the PATH variable
74-
/opt/apache-maven-3.6.0/bin
74+
/opt/apache-maven-3.2.5/bin
7575
7676
# so the result should be something similar to the below
7777
PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/opt/apache-maven-3.2.5/bin"

graph_api/ouput

Whitespace-only changes.

graph_api/pom.xml

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>gg</groupId>
8+
<artifactId>gg</artifactId>
9+
<version>1.0-SNAPSHOT</version>
10+
<build>
11+
<plugins>
12+
<plugin>
13+
<groupId>org.apache.maven.plugins</groupId>
14+
<artifactId>maven-compiler-plugin</artifactId>
15+
<configuration>
16+
<source>6</source>
17+
<target>6</target>
18+
</configuration>
19+
</plugin>
20+
</plugins>
21+
</build>
22+
23+
<dependencies>
24+
<dependency>
25+
<groupId>com.google.code.gson</groupId>
26+
<artifactId>gson</artifactId>
27+
<version>2.8.5</version>
28+
</dependency>
29+
<dependency>
30+
<groupId>org.apache.flink</groupId>
31+
<artifactId>flink-java</artifactId>
32+
<version>1.8.0</version>
33+
</dependency>
34+
<dependency>
35+
<groupId>org.apache.flink</groupId>
36+
<artifactId>flink-streaming-java_2.12</artifactId>
37+
<version>1.8.0</version>
38+
<scope>provided</scope>
39+
</dependency>
40+
<dependency>
41+
<groupId>org.apache.flink</groupId>
42+
<artifactId>flink-clients_2.12</artifactId>
43+
<version>1.8.0</version>
44+
</dependency>
45+
<dependency>
46+
<groupId>org.apache.flink</groupId>
47+
<artifactId>flink-gelly_2.12</artifactId>
48+
<version>1.8.0</version>
49+
</dependency>
50+
</dependencies>
51+
</project>
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package tutorial;
2+
3+
public class Author {
4+
5+
public String name;
6+
public String org;
7+
8+
public String getName() {
9+
return name;
10+
}
11+
12+
public void setName(String name) {
13+
this.name = name;
14+
}
15+
16+
public String getOrg() {
17+
return org;
18+
}
19+
20+
public void setOrg(String org) {
21+
this.org = org;
22+
}
23+
}
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package tutorial;
2+
3+
// importing packages
4+
5+
import com.google.gson.Gson;
6+
import org.apache.flink.api.common.functions.FilterFunction;
7+
import org.apache.flink.api.common.functions.FlatMapFunction;
8+
import org.apache.flink.api.common.functions.MapFunction;
9+
import org.apache.flink.api.java.DataSet;
10+
import org.apache.flink.api.java.ExecutionEnvironment;
11+
import org.apache.flink.api.java.utils.ParameterTool;
12+
import org.apache.flink.core.fs.FileSystem;
13+
import org.apache.flink.graph.Edge;
14+
import org.apache.flink.graph.Graph;
15+
import org.apache.flink.graph.Vertex;
16+
import org.apache.flink.graph.library.SingleSourceShortestPaths;
17+
import org.apache.flink.types.NullValue;
18+
import org.apache.flink.util.Collector;
19+
import scala.Tuple2;
20+
21+
import java.util.ArrayList;
22+
23+
// bipartite graph -> use projection -> for recommendations
24+
25+
/*
26+
Implementing Degree of Separation using Flink's Gelly Graph API
27+
*/
28+
public class DegreeSeparation {
29+
final static Gson gson = new Gson();
30+
31+
public static void main(String[] args) throws Exception {
32+
33+
// returns the execution environment (the context 'Local or Remote' in which a program is executed)
34+
// LocalEnvironment will cause execution in the current JVM
35+
// RemoteEnvironment will cause execution on a remote setup
36+
final ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
37+
38+
// provides utility methods for reading and parsing the program arguments
39+
// in this tutorial we will have to provide the input file and the output file as arguments
40+
final ParameterTool parameters = ParameterTool.fromArgs(args);
41+
42+
// register parameters globally so it can be available for each node in the cluster
43+
environment.getConfig().setGlobalJobParameters(parameters);
44+
45+
// read text file from the parameter 'input' passed in args
46+
// line-by-line and returns them as Strings
47+
DataSet<String> textLines = environment.readTextFile(parameters.get("input"));
48+
49+
// Author -> Collaborating Author
50+
DataSet<Tuple2<String, String>> authors = textLines.flatMap(new Tokenizer());
51+
52+
// convert the dataset to edges in a graph
53+
DataSet<Edge<String, NullValue>> edges = authors.map(new MapFunction<Tuple2<String, String>, Edge<String, NullValue>>() {
54+
@Override
55+
public Edge<String, NullValue> map(Tuple2<String, String> value) {
56+
Edge<String, NullValue> edge = new Edge();
57+
edge.setSource(value._1()); // author
58+
edge.setTarget(value._2()); // collaboration
59+
return edge;
60+
}
61+
});
62+
63+
// creates graph from the edges generated
64+
Graph<String, NullValue, NullValue> collaborationGraph = Graph.fromDataSet(edges, environment);
65+
66+
// we need to add weights since we will apply SingleSourceShortestPaths
67+
Graph<String, NullValue, Double> wCollaborationGraph = collaborationGraph.mapEdges(new MapFunction<Edge<String, NullValue>, Double>() {
68+
@Override
69+
public Double map(Edge<String, NullValue> stringNullValueEdge) {
70+
return 1.0;
71+
}
72+
});
73+
74+
// use the SingleSourceShortestPaths to get all the collaboration authors for the collaboration authors
75+
// for a specified authors (similar to friends of friends)
76+
SingleSourceShortestPaths<String, NullValue> singleSourceShortestPaths = new SingleSourceShortestPaths<String, NullValue>(parameters.get("author"), 1000);
77+
DataSet<Vertex<String, Double>> result = singleSourceShortestPaths.run(wCollaborationGraph);
78+
79+
System.out.println(result.count());
80+
81+
// the collaboration authors for the collaboration authors for a specified authors (similar to friends of friends)
82+
DataSet<Vertex<String, Double>> resultAuthor = result.filter(new FilterFunction<Vertex<String, Double>>() {
83+
@Override
84+
public boolean filter(Vertex<String, Double> value) {
85+
if (value.f1 == 2.0) {
86+
return true;
87+
} else {
88+
return false;
89+
}
90+
}
91+
});
92+
93+
// output the final result
94+
// check that the argument 'output' was passed to save in that path
95+
if (parameters.has("output")) {
96+
resultAuthor.writeAsText(parameters.get("output"), FileSystem.WriteMode.OVERWRITE);
97+
environment.execute("Graph API Tutorial");
98+
}
99+
}
100+
101+
public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, String>> {
102+
103+
@Override
104+
public void flatMap(String value, Collector<Tuple2<String, String>> out) {
105+
Publication publication = gson.fromJson(value, Publication.class);
106+
ArrayList<Author> authors = publication.getAuthors();
107+
108+
// no collaboration (one author)
109+
if (authors.size() <= 1) {
110+
return;
111+
}
112+
113+
for (int i = 0; i < authors.size() - 1; i++) {
114+
String currentAuthor = authors.get(i).name;
115+
for (int j = i + 1; j < authors.size(); j++) {
116+
String collaboration = authors.get(j).name;
117+
118+
// must output two tuples since we need to create
119+
// two edges for an undirected edge
120+
out.collect(new Tuple2<String, String>(currentAuthor, collaboration));
121+
out.collect(new Tuple2<String, String>(collaboration, currentAuthor));
122+
}
123+
}
124+
}
125+
}
126+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package tutorial;
2+
3+
import java.util.ArrayList;
4+
5+
public class Publication {
6+
7+
private String id;
8+
private String title;
9+
private ArrayList<Author> authors;
10+
11+
public String getId() {
12+
return id;
13+
}
14+
15+
public void setId(String id) {
16+
this.id = id;
17+
}
18+
19+
public String getTitle() {
20+
return title;
21+
}
22+
23+
public void setTitle(String title) {
24+
this.title = title;
25+
}
26+
27+
public ArrayList<Author> getAuthors() {
28+
return authors;
29+
}
30+
31+
public void setAuthors(ArrayList<Author> authors) {
32+
this.authors = authors;
33+
}
34+
35+
}

0 commit comments

Comments
 (0)