Skip to content

Commit 76370b7

Browse files
sazzerpivovarit
authored andcommitted
Examples of Quasar in Kotlin (eugenp#7045)
1 parent 823cd0b commit 76370b7

File tree

7 files changed

+477
-0
lines changed

7 files changed

+477
-0
lines changed

kotlin-quasar/pom.xml

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4+
<modelVersion>4.0.0</modelVersion>
5+
<groupId>com.baeldung</groupId>
6+
<artifactId>kotlin-quasar</artifactId>
7+
<version>1.0.0-SNAPSHOT</version>
8+
<name>kotlin-quasar</name>
9+
<packaging>jar</packaging>
10+
11+
<dependencies>
12+
<dependency>
13+
<groupId>org.jetbrains.kotlin</groupId>
14+
<artifactId>kotlin-stdlib-jdk8</artifactId>
15+
<version>${kotlin.version}</version>
16+
</dependency>
17+
<dependency>
18+
<groupId>org.jetbrains.kotlin</groupId>
19+
<artifactId>kotlin-test</artifactId>
20+
<version>${kotlin.version}</version>
21+
<scope>test</scope>
22+
</dependency>
23+
<dependency>
24+
<groupId>co.paralleluniverse</groupId>
25+
<artifactId>quasar-core</artifactId>
26+
<version>${quasar.version}</version>
27+
</dependency>
28+
<dependency>
29+
<groupId>co.paralleluniverse</groupId>
30+
<artifactId>quasar-actors</artifactId>
31+
<version>${quasar.version}</version>
32+
</dependency>
33+
<dependency>
34+
<groupId>co.paralleluniverse</groupId>
35+
<artifactId>quasar-reactive-streams</artifactId>
36+
<version>${quasar.version}</version>
37+
</dependency>
38+
<dependency>
39+
<groupId>co.paralleluniverse</groupId>
40+
<artifactId>quasar-kotlin</artifactId>
41+
<version>${quasar.version}</version>
42+
</dependency>
43+
<dependency>
44+
<groupId>junit</groupId>
45+
<artifactId>junit</artifactId>
46+
<version>4.12</version>
47+
</dependency>
48+
</dependencies>
49+
50+
<build>
51+
<sourceDirectory>src/main/kotlin</sourceDirectory>
52+
<testSourceDirectory>src/test/kotlin</testSourceDirectory>
53+
<plugins>
54+
<plugin>
55+
<groupId>org.jetbrains.kotlin</groupId>
56+
<artifactId>kotlin-maven-plugin</artifactId>
57+
<version>${kotlin.version}</version>
58+
<executions>
59+
<execution>
60+
<id>compile</id>
61+
<phase>compile</phase>
62+
<goals>
63+
<goal>compile</goal>
64+
</goals>
65+
</execution>
66+
<execution>
67+
<id>test-compile</id>
68+
<phase>test-compile</phase>
69+
<goals>
70+
<goal>test-compile</goal>
71+
</goals>
72+
</execution>
73+
</executions>
74+
<configuration>
75+
<jvmTarget>1.8</jvmTarget>
76+
</configuration>
77+
</plugin>
78+
<plugin>
79+
<artifactId>maven-dependency-plugin</artifactId>
80+
<version>3.1.1</version>
81+
<executions>
82+
<execution>
83+
<id>getClasspathFilenames</id>
84+
<goals>
85+
<goal>properties</goal>
86+
</goals>
87+
</execution>
88+
</executions>
89+
</plugin>
90+
<plugin>
91+
<groupId>org.apache.maven.plugins</groupId>
92+
<artifactId>maven-surefire-plugin</artifactId>
93+
<version>2.22.1</version>
94+
<configuration>
95+
<argLine>-Dco.paralleluniverse.fibers.verifyInstrumentation=true</argLine>
96+
<argLine>-javaagent:${co.paralleluniverse:quasar-core:jar}</argLine>
97+
</configuration>
98+
</plugin>
99+
<plugin>
100+
<groupId>org.codehaus.mojo</groupId>
101+
<artifactId>exec-maven-plugin</artifactId>
102+
<version>1.3.2</version>
103+
<configuration>
104+
<workingDirectory>target/classes</workingDirectory>
105+
<executable>echo</executable>
106+
<arguments>
107+
<argument>-javaagent:${co.paralleluniverse:quasar-core:jar}</argument>
108+
<argument>-classpath</argument> <classpath/>
109+
<argument>com.baeldung.quasar.QuasarHelloWorldKt</argument>
110+
</arguments>
111+
</configuration>
112+
</plugin>
113+
</plugins>
114+
</build>
115+
116+
<properties>
117+
<quasar.version>0.8.0</quasar.version>
118+
<kotlin.version>1.3.31</kotlin.version>
119+
</properties>
120+
</project>
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.baeldung.quasar
2+
3+
import co.paralleluniverse.fibers.Fiber
4+
import co.paralleluniverse.strands.SuspendableRunnable
5+
6+
7+
/**
8+
* Entrypoint into the application
9+
*/
10+
fun main(args: Array<String>) {
11+
class Runnable : SuspendableRunnable {
12+
override fun run() {
13+
println("Hello")
14+
}
15+
}
16+
val result = Fiber<Void>(Runnable()).start()
17+
result.join()
18+
println("World")
19+
}
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
package com.baeldung.quasar
2+
3+
import co.paralleluniverse.fibers.Suspendable
4+
import co.paralleluniverse.kotlin.fiber
5+
import co.paralleluniverse.strands.channels.Channels
6+
import co.paralleluniverse.strands.channels.Selector
7+
import com.google.common.base.Function
8+
import org.junit.Test
9+
10+
class ChannelsTest {
11+
@Test
12+
fun createChannel() {
13+
Channels.newChannel<String>(0, // The size of the channel buffer
14+
Channels.OverflowPolicy.BLOCK, // The policy for when the buffer is full
15+
true, // Whether we should optimize for a single message producer
16+
true) // Whether we should optimize for a single message consumer
17+
}
18+
19+
@Test
20+
fun blockOnMessage() {
21+
val channel = Channels.newChannel<String>(0, Channels.OverflowPolicy.BLOCK, true, true)
22+
23+
fiber @Suspendable {
24+
while (!channel.isClosed) {
25+
val message = channel.receive()
26+
println("Received: $message")
27+
}
28+
println("Stopped receiving messages")
29+
}
30+
31+
channel.send("Hello")
32+
channel.send("World")
33+
34+
channel.close()
35+
}
36+
37+
@Test
38+
fun selectReceiveChannels() {
39+
val channel1 = Channels.newChannel<String>(0, Channels.OverflowPolicy.BLOCK, true, true)
40+
val channel2 = Channels.newChannel<String>(0, Channels.OverflowPolicy.BLOCK, true, true)
41+
42+
fiber @Suspendable {
43+
while (!channel1.isClosed && !channel2.isClosed) {
44+
val received = Selector.select(Selector.receive(channel1), Selector.receive(channel2))
45+
46+
println("Received: $received")
47+
}
48+
}
49+
50+
fiber @Suspendable {
51+
for (i in 0..10) {
52+
channel1.send("Channel 1: $i")
53+
}
54+
}
55+
56+
fiber @Suspendable {
57+
for (i in 0..10) {
58+
channel2.send("Channel 2: $i")
59+
}
60+
}
61+
}
62+
63+
@Test
64+
fun selectSendChannels() {
65+
val channel1 = Channels.newChannel<String>(0, Channels.OverflowPolicy.BLOCK, true, true)
66+
val channel2 = Channels.newChannel<String>(0, Channels.OverflowPolicy.BLOCK, true, true)
67+
68+
fiber @Suspendable {
69+
for (i in 0..10) {
70+
Selector.select(
71+
Selector.send(channel1, "Channel 1: $i"),
72+
Selector.send(channel2, "Channel 2: $i")
73+
)
74+
}
75+
}
76+
77+
fiber @Suspendable {
78+
while (!channel1.isClosed) {
79+
val msg = channel1.receive()
80+
println("Read: $msg")
81+
}
82+
}
83+
84+
fiber @Suspendable {
85+
while (!channel2.isClosed) {
86+
val msg = channel2.receive()
87+
println("Read: $msg")
88+
}
89+
}
90+
}
91+
92+
@Test
93+
fun tickerChannel() {
94+
val channel = Channels.newChannel<String>(3, Channels.OverflowPolicy.DISPLACE)
95+
96+
for (i in 0..10) {
97+
val tickerConsumer = Channels.newTickerConsumerFor(channel)
98+
fiber @Suspendable {
99+
while (!tickerConsumer.isClosed) {
100+
val message = tickerConsumer.receive()
101+
println("Received on $i: $message")
102+
}
103+
println("Stopped receiving messages on $i")
104+
}
105+
}
106+
107+
for (i in 0..50) {
108+
channel.send("Message $i")
109+
}
110+
111+
channel.close()
112+
}
113+
114+
115+
@Test
116+
fun transformOnSend() {
117+
val channel = Channels.newChannel<String>(0, Channels.OverflowPolicy.BLOCK, true, true)
118+
119+
fiber @Suspendable {
120+
while (!channel.isClosed) {
121+
val message = channel.receive()
122+
println("Received: $message")
123+
}
124+
println("Stopped receiving messages")
125+
}
126+
127+
val transformOnSend = Channels.mapSend(channel, Function<String, String> { msg: String? -> msg?.toUpperCase() })
128+
129+
transformOnSend.send("Hello")
130+
transformOnSend.send("World")
131+
132+
channel.close()
133+
}
134+
135+
@Test
136+
fun transformOnReceive() {
137+
val channel = Channels.newChannel<String>(0, Channels.OverflowPolicy.BLOCK, true, true)
138+
139+
val transformOnReceive = Channels.map(channel, Function<String, String> { msg: String? -> msg?.reversed() })
140+
141+
fiber @Suspendable {
142+
while (!transformOnReceive.isClosed) {
143+
val message = transformOnReceive.receive()
144+
println("Received: $message")
145+
}
146+
println("Stopped receiving messages")
147+
}
148+
149+
150+
channel.send("Hello")
151+
channel.send("World")
152+
153+
channel.close()
154+
}
155+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package com.baeldung.quasar
2+
3+
import co.paralleluniverse.strands.dataflow.Val
4+
import co.paralleluniverse.strands.dataflow.Var
5+
import org.junit.Assert
6+
import org.junit.Test
7+
import java.util.concurrent.TimeUnit
8+
9+
class DataflowTest {
10+
@Test
11+
fun testValVar() {
12+
val a = Var<Int>()
13+
val b = Val<Int>()
14+
15+
val c = Var<Int> { a.get() + b.get() }
16+
val d = Var<Int> { a.get() * b.get() }
17+
18+
// (a*b) - (a+b)
19+
val initialResult = Val<Int> { d.get() - c.get() }
20+
val currentResult = Var<Int> { d.get() - c.get() }
21+
22+
a.set(2)
23+
b.set(4)
24+
25+
Assert.assertEquals(2, initialResult.get())
26+
Assert.assertEquals(2, currentResult.get())
27+
28+
a.set(3)
29+
30+
TimeUnit.SECONDS.sleep(1)
31+
32+
Assert.assertEquals(2, initialResult.get())
33+
Assert.assertEquals(5, currentResult.get())
34+
}
35+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package com.baeldung.quasar
2+
3+
import co.paralleluniverse.fibers.Fiber
4+
import co.paralleluniverse.fibers.FiberAsync
5+
import co.paralleluniverse.fibers.Suspendable
6+
import co.paralleluniverse.kotlin.fiber
7+
import co.paralleluniverse.strands.Strand
8+
import org.junit.Assert
9+
import org.junit.Test
10+
import java.math.BigDecimal
11+
import java.util.concurrent.TimeUnit
12+
13+
interface PiCallback {
14+
fun success(result: BigDecimal)
15+
fun failure(error: Exception)
16+
}
17+
18+
fun computePi(callback: PiCallback) {
19+
println("Starting calculations")
20+
TimeUnit.SECONDS.sleep(2)
21+
println("Finished calculations")
22+
callback.success(BigDecimal("3.14"))
23+
}
24+
25+
class PiAsync : PiCallback, FiberAsync<BigDecimal, Exception>() {
26+
override fun success(result: BigDecimal) {
27+
asyncCompleted(result)
28+
}
29+
30+
override fun failure(error: Exception) {
31+
asyncFailed(error)
32+
}
33+
34+
override fun requestAsync() {
35+
computePi(this)
36+
}
37+
}
38+
39+
class PiAsyncTest {
40+
@Test
41+
fun testPi() {
42+
val result = fiber @Suspendable {
43+
val pi = PiAsync()
44+
println("Waiting to get PI on: " + Fiber.currentFiber().name)
45+
val result = pi.run()
46+
println("Got PI")
47+
48+
result
49+
}.get()
50+
51+
Assert.assertEquals(BigDecimal("3.14"), result)
52+
}
53+
}

0 commit comments

Comments
 (0)