Skip to content

Commit 227522c

Browse files
committed
improve reactive labs
1 parent 8fbff54 commit 227522c

File tree

4 files changed

+122
-47
lines changed

4 files changed

+122
-47
lines changed

intro-labs/reactive-playground/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,6 @@ dependencies {
2323
testImplementation('org.assertj:assertj-core:3.16.1')
2424
testImplementation("org.junit.jupiter:junit-jupiter-api:${junitVersion}")
2525
testImplementation("org.junit.jupiter:junit-jupiter-params:${junitVersion}")
26+
testImplementation("io.projectreactor.tools:blockhound:1.0.4.RELEASE")
2627
testRuntime("org.junit.jupiter:junit-jupiter-engine:${junitVersion}")
2728
}

intro-labs/reactive-playground/src/test/java/com/example/BasicReactivePlayground.java

Lines changed: 103 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,20 @@
11
package com.example;
22

3+
import org.junit.jupiter.api.BeforeAll;
34
import org.junit.jupiter.api.DisplayName;
45
import org.junit.jupiter.api.Test;
56
import org.slf4j.Logger;
67
import org.slf4j.LoggerFactory;
8+
import reactor.blockhound.BlockHound;
79
import reactor.core.publisher.Flux;
8-
import reactor.core.publisher.GroupedFlux;
910
import reactor.core.publisher.Mono;
11+
import reactor.core.publisher.ParallelFlux;
12+
import reactor.core.scheduler.Schedulers;
1013
import reactor.test.StepVerifier;
1114

1215
import java.time.Duration;
1316
import java.util.Arrays;
17+
import java.util.Objects;
1418
import java.util.Optional;
1519
import java.util.stream.Collectors;
1620
import java.util.stream.Stream;
@@ -20,47 +24,94 @@ class BasicReactivePlayground {
2024

2125
private static final Logger LOGGER = LoggerFactory.getLogger(BasicReactivePlayground.class);
2226

27+
@BeforeAll
28+
static void init() {
29+
BlockHound.install();
30+
}
31+
2332
@DisplayName("create Mono")
2433
@Test
2534
void testCreateMono() {
2635
StepVerifier.create(Mono.empty()).expectNextCount(0).verifyComplete();
2736
StepVerifier.create(Mono.just("Hello")).expectNext("Hello").verifyComplete();
2837
StepVerifier.create(Mono.defer(() -> Mono.just("Hello"))).expectNext("Hello").verifyComplete();
29-
StepVerifier.create(Mono.create(sink -> sink.success("Hello"))).expectNext("Hello").verifyComplete();
30-
StepVerifier.create(Mono.justOrEmpty(Optional.of("Hello"))).expectNext("Hello").verifyComplete();
31-
StepVerifier.create(Mono.error(new IllegalArgumentException("error"))).expectError(IllegalArgumentException.class).verify();
38+
StepVerifier.create(Mono.create(sink -> sink.success("Hello")))
39+
.expectNext("Hello")
40+
.verifyComplete();
41+
StepVerifier.create(Mono.justOrEmpty(Optional.of("Hello")))
42+
.expectNext("Hello")
43+
.verifyComplete();
44+
StepVerifier.create(Mono.error(new IllegalArgumentException("error")))
45+
.expectError(IllegalArgumentException.class)
46+
.verify();
3247
}
3348

3449
@DisplayName("create Flux")
3550
@Test
3651
void testCreateFlux() {
3752
StepVerifier.create(Flux.empty()).expectNextCount(0).verifyComplete();
38-
StepVerifier.create(Flux.just("Hello", "World")).expectNext("Hello").expectNext("World").verifyComplete();
53+
StepVerifier.create(Flux.just("Hello", "World"))
54+
.expectNext("Hello")
55+
.expectNext("World")
56+
.verifyComplete();
3957
StepVerifier.create(Flux.defer(() -> Mono.just("Hello"))).expectNext("Hello").verifyComplete();
40-
StepVerifier.create(Flux.create(sink -> { sink.next("Hello"); sink.complete(); })).expectNext("Hello").verifyComplete();
41-
StepVerifier.create(Flux.fromArray(new String[] {"Hello"})).expectNext("Hello").verifyComplete();
42-
StepVerifier.create(Flux.fromIterable(Arrays.asList("Hello", "World"))).expectNext("Hello").expectNext("World").verifyComplete();
43-
StepVerifier.create(Flux.generate(sink -> {sink.next("Hello"); sink.complete();})).expectNext("Hello").verifyComplete();
44-
StepVerifier.create(Flux.fromStream(Stream.of("Hello", "World"))).expectNext("Hello").expectNext("World").verifyComplete();
58+
StepVerifier.create(
59+
Flux.create(
60+
sink -> {
61+
sink.next("Hello");
62+
sink.complete();
63+
}))
64+
.expectNext("Hello")
65+
.verifyComplete();
66+
StepVerifier.create(Flux.fromArray(new String[] {"Hello"}))
67+
.expectNext("Hello")
68+
.verifyComplete();
69+
StepVerifier.create(Flux.fromIterable(Arrays.asList("Hello", "World")))
70+
.expectNext("Hello")
71+
.expectNext("World")
72+
.verifyComplete();
73+
StepVerifier.create(
74+
Flux.generate(
75+
sink -> {
76+
sink.next("Hello");
77+
sink.complete();
78+
}))
79+
.expectNext("Hello")
80+
.verifyComplete();
81+
StepVerifier.create(Flux.fromStream(Stream.of("Hello", "World")))
82+
.expectNext("Hello")
83+
.expectNext("World")
84+
.verifyComplete();
4585
StepVerifier.create(Flux.range(1, 5)).expectNextCount(5).verifyComplete();
46-
StepVerifier.create(Flux.error(new IllegalArgumentException("error"))).expectError(IllegalArgumentException.class).verify();
86+
StepVerifier.create(Flux.error(new IllegalArgumentException("error")))
87+
.expectError(IllegalArgumentException.class)
88+
.verify();
4789
}
4890

4991
@DisplayName("transformations")
5092
@Test
5193
void testTransform() {
52-
Mono<String> helloMono = Flux.fromStream(Stream.of("Hello", "World"))
53-
.collect(Collectors.joining(" "));
94+
Mono<String> helloMono =
95+
Flux.fromStream(Stream.of("Hello", "World")).collect(Collectors.joining(" "));
5496
StepVerifier.create(helloMono).expectNext("Hello World").verifyComplete();
5597

5698
StepVerifier.create(Flux.range(1, 10).count()).expectNext(10L).verifyComplete();
5799
StepVerifier.create(Flux.range(1, 5).reduce(1, Integer::sum)).expectNext(16).verifyComplete();
58-
StepVerifier.create(Flux.range(1, 5).handle((a,b) -> {if (a > 3) b.next(a);})).expectNextCount(2).verifyComplete();
59-
StepVerifier.create(Flux.range(1, 10).log()
60-
.groupBy(i -> i % 2 == 0 ? "even" : "odd")
61-
.flatMap(s -> Flux.just(s.key())))
62-
.expectNextCount(2).verifyComplete();
63-
100+
StepVerifier.create(
101+
Flux.range(1, 5)
102+
.handle(
103+
(a, b) -> {
104+
if (a > 3) b.next(a);
105+
}))
106+
.expectNextCount(2)
107+
.verifyComplete();
108+
StepVerifier.create(
109+
Flux.range(1, 10)
110+
.log()
111+
.groupBy(i -> i % 2 == 0 ? "even" : "odd")
112+
.flatMap(s -> Flux.just(Objects.requireNonNull(s.key()))))
113+
.expectNextCount(2)
114+
.verifyComplete();
64115
}
65116

66117
@DisplayName("subscription")
@@ -104,4 +155,37 @@ void testDelayWithStepVerifier() {
104155

105156
StepVerifier.create(rangeFlux).expectNextCount(7).verifyComplete();
106157
}
158+
159+
@Test
160+
void testBlockhound() {
161+
Mono.delay(Duration.ofSeconds(1))
162+
.doOnNext(
163+
it -> {
164+
Thread.yield();
165+
/*
166+
try {
167+
Thread.sleep(10);
168+
}
169+
catch (InterruptedException e) {
170+
throw new RuntimeException(e);
171+
}*/
172+
})
173+
.block();
174+
}
175+
176+
@DisplayName("with strings")
177+
@Test
178+
void testWithString() throws InterruptedException {
179+
180+
ParallelFlux<String> hello_reactive = Flux.just("Hello Reactive")
181+
.delayElements(Duration.ofMillis(100))
182+
.parallel()
183+
.runOn(Schedulers.newParallel("test", 5))
184+
.log()
185+
.map(String::toUpperCase)
186+
.flatMap(s -> Flux.just(s.split("")));
187+
hello_reactive.subscribe(System.out::println);
188+
189+
StepVerifier.create(hello_reactive).expectNext("H").expectNextCount(13).verifyComplete();
190+
}
107191
}

intro-labs/reactive-playground/src/test/java/com/example/ImperativeVsReactivePlayground.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55
import reactor.core.publisher.Mono;
66
import reactor.test.StepVerifier;
77

8-
import java.time.Duration;
8+
import java.util.stream.Collectors;
9+
import java.util.stream.Stream;
10+
11+
import static org.assertj.core.api.Assertions.assertThat;
912

1013
@DisplayName("Imperative versus reactive playground")
1114
class ImperativeVsReactivePlayground {
@@ -16,20 +19,25 @@ void testImperative() {
1619
String msg = "World";
1720
String upperCaseMsg = msg.toUpperCase();
1821
String greeting = "Hello " + upperCaseMsg + "!";
19-
System.out.println(greeting);
22+
assertThat(greeting).isEqualTo("Hello WORLD!");
2023
}
2124

22-
@DisplayName("reactive code")
25+
@DisplayName("functional code")
2326
@Test
24-
void testReactive() {
25-
Mono<String> mono = Mono.just("World")
27+
void testFunctional() {
28+
String greeting =
29+
Stream.of("World")
2630
.map(String::toUpperCase)
2731
.map(um -> "Hello " + um + "!")
28-
.map(um -> {System.out.println(um); return um;});
29-
30-
mono.subscribe();
31-
32-
//StepVerifier.create(mono).expectNext("Hello WORLD!").verifyComplete();
32+
.collect(Collectors.joining());
33+
assertThat(greeting).isEqualTo("Hello WORLD!");
34+
}
3335

36+
@DisplayName("reactive code")
37+
@Test
38+
void testReactive() {
39+
Mono<String> greeting =
40+
Mono.just("World").map(String::toUpperCase).map(um -> "Hello " + um + "!");
41+
StepVerifier.create(greeting).expectNext("Hello WORLD!").verifyComplete();
3442
}
3543
}

intro-labs/reactive-playground/src/test/java/com/example/PersonReactivePlayground.java

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -48,24 +48,6 @@ void initializeTestObjects() {
4848
personSet.add(person);
4949
}
5050

51-
@DisplayName("with strings")
52-
@Test
53-
void testWithString() throws InterruptedException {
54-
55-
ParallelFlux<String> hello_reactive = Flux.just("Hello Reactive")
56-
.delayElements(Duration.ofMillis(100))
57-
.parallel()
58-
.runOn(Schedulers.newParallel("test", 5))
59-
.log()
60-
.map(String::toUpperCase)
61-
.flatMap(s -> Flux.just(s.split("")));
62-
hello_reactive.subscribe(System.out::println);
63-
64-
Thread.sleep(500);
65-
66-
//StepVerifier.create(hello_reactive).expectNext("H").expectNextCount(13).verifyComplete();
67-
}
68-
6951
@DisplayName("with persons")
7052
@Test
7153
void testWithPersons() {

0 commit comments

Comments
 (0)