55import java .util .List ;
66import java .util .Map ;
77import java .util .Map .Entry ;
8+ import java .util .Optional ;
9+ import java .util .concurrent .ForkJoinPool ;
810import java .util .function .Function ;
911import java .util .function .Predicate ;
12+ import java .util .function .BiFunction ;
1013import java .util .function .BinaryOperator ;
1114
1215import java .util .stream .Collector ;
1316
1417import static java .lang .System .out ;
1518
1619import static java .util .stream .Collectors .toList ;
20+
1721import static java .util .stream .Collectors .counting ;
1822import static java .util .stream .Collectors .groupingBy ;
1923
2024public class StreamsBasics {
2125
22- static List < String > namesOfTheCitizensInTheCity (List <Citizen > data , String city ) {
26+ public static void namesOfTheCitizensInTheCity (List <Citizen > data , String city ) {
2327Predicate <Citizen > filterPredicate = x -> x .getCity ().equals (city );
2428Function <? super Citizen , ? extends String > mapFunction = Citizen ::getName ;
2529Collector <String , ?, List <String >> listCollector = toList ();
26- return data .parallelStream ().filter (filterPredicate ).map (mapFunction ).collect (listCollector );
30+ List <String > names = data .parallelStream ().filter (filterPredicate ).map (mapFunction ).collect (listCollector );
31+ names .forEach ((el ) -> out .println ("from " + city + ": " + el ));
2732}
2833
29- static long maxNumberOfCitizensInCity (List <Citizen > data ) {
34+ public static void maxNumberOfCitizensInCity (List <Citizen > data ) {
3035Map <String , Long > map = data .parallelStream ().collect (groupingBy (Citizen ::getCity , counting ()));
31- return map .values ().parallelStream ().reduce ((x , y ) -> Long .max (x , y )).get ();
36+ Long maxNumber = map .values ().parallelStream ().reduce ((x , y ) -> Long .max (x , y )).get ();
37+ out .println ("max number of citizens in one city: " + maxNumber );
3238}
3339
34- static void mostCommonNameAndTheCity (List <Citizen > citizens ) {
40+ public static void mostCommonNameAndTheCity (List <Citizen > citizens ) {
3541Map <String , Long > collect = citizens .parallelStream ().collect (groupingBy (Citizen ::toKey , counting ()));
3642BinaryOperator <Entry <String , Long >> reduceFunction = (e1 , e2 ) -> e1 .getValue () > e2 .getValue () ? e1 : e2 ;
3743Entry <String , Long > entry = collect .entrySet ().parallelStream ().reduce (reduceFunction ).get ();
@@ -40,22 +46,63 @@ static void mostCommonNameAndTheCity(List<Citizen> citizens) {
4046out .println ("most common name: " + name + ", city: " + city + " (" + entry .getValue () + ")" );
4147}
4248
49+ public static void parallelism (List <Citizen > citizens ) {
50+
51+ ForkJoinPool commonPool = ForkJoinPool .commonPool ();
52+
53+ Predicate <Citizen > notNullFilter = c -> {
54+ logWithThread ("filter " + c );
55+ return Optional .of (c ).isPresent ();
56+ };
57+
58+ Function <Citizen , String > cityMapper = c -> {
59+ logWithThread ("mapper " + c );
60+ return c .getCity ();
61+ };
62+
63+ BiFunction <String , String , String > cityAccumulator = (l , c ) -> {
64+ logWithThread ("accumulator " + c );
65+ return l + "," + c ;
66+ };
67+
68+ BinaryOperator <String > cityCombiner = (l1 , l2 ) -> {
69+ logWithThread ("combiner " + l1 + " / " + l2 );
70+ return l1 + "," + l2 ;
71+ };
72+
73+ String reduce = citizens .parallelStream ().filter (notNullFilter ).map (cityMapper ).distinct ().reduce ("" ,
74+ cityAccumulator , cityCombiner );
75+
76+ reduce = reduce .startsWith ("," ) ? reduce .substring (1 ) : reduce ;
77+ reduce = reduce .endsWith ("," ) ? reduce .substring (0 , reduce .length () - 1 ) : reduce ;
78+ reduce = reduce .replace (",," , "," );
79+
80+ out .println ("\n parallelism:" + commonPool .getParallelism () + "\n " );
81+ out .println ("\n all city names: " + reduce );
82+ }
83+
84+ public static void logWithThread (String logLine ) {
85+ out .printf ("%s\t %s" , "(thread: " + Thread .currentThread ().getId () + ")" , logLine );
86+ }
87+
4388public static void main (String [] args ) {
4489
4590Citizen citizen1 = new Citizen ("mark" , "newyork" );
4691Citizen citizen2 = new Citizen ("john" , "london" );
4792Citizen citizen3 = new Citizen ("tom" , "berlin" );
48- Citizen citizen3_2 = new Citizen ("tom" , "berlin" );
49- Citizen citizen4 = new Citizen ("carl" , "london" );
50- Citizen citizen5 = new Citizen ("sharlote" , "london" );
93+ Citizen citizen4 = new Citizen ("tom" , "berlin" );
94+ Citizen citizen5 = new Citizen ("carl" , "london" );
95+ Citizen citizen6 = new Citizen ("sharlote" , "london" );
5196
52- List <Citizen > citizenData = Arrays .asList (citizen1 , citizen2 , citizen3 , citizen3_2 , citizen4 , citizen5 );
97+ List <Citizen > citizenData = Arrays .asList (citizen1 , citizen2 , citizen3 , citizen4 , citizen5 , citizen6 );
5398
54- StreamsBasics .namesOfTheCitizensInTheCity (citizenData , "london" ). forEach (( el ) -> out . println ( "from london: " + el )) ;
99+ StreamsBasics .namesOfTheCitizensInTheCity (citizenData , "london" );
55100
56- System . out . println ( "max number of citizens in one city: " + StreamsBasics .maxNumberOfCitizensInCity (citizenData ) );
101+ StreamsBasics .maxNumberOfCitizensInCity (citizenData );
57102
58103StreamsBasics .mostCommonNameAndTheCity (citizenData );
104+
105+ StreamsBasics .parallelism (citizenData );
59106}
60107
61108/**
@@ -83,6 +130,11 @@ public String toKey() {
83130return name + "_" + city ;
84131}
85132
133+ @ Override
134+ public String toString () {
135+ return name + " (" + city + ")" ;
136+ }
137+
86138}
87139
88140}
0 commit comments