Skip to content

Commit d2f67e1

Browse files
committed
initial prototype
- refactored package - added service.proto StatusCode MESSAGE_COUNT_EXCEEDED - added several persistence exceptions - added several service exceptions - implemented rpc service endpoint - implemented service layer logic - implemented persistence layer utilizing java.util.concurrent.ConcurrentHashMap's internal synchronizations - added basic logging - disabled log4j2 file logging - supplied Client with basic blocking gRPCs
1 parent 33f9bab commit d2f67e1

32 files changed

+909
-172
lines changed

pom.xml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
<configuration>
5454
<archive>
5555
<manifest>
56-
<mainClass>Main</mainClass>
56+
<mainClass>de.dhbw.ravensburg.verteiltesysteme.Main</mainClass>
5757
</manifest>
5858
</archive>
5959
<descriptorRefs>
@@ -120,5 +120,12 @@
120120
<artifactId>mapstruct-processor</artifactId>
121121
<version>${org.mapstruct.version}</version>
122122
</dependency>
123+
124+
<dependency>
125+
<groupId>org.testng</groupId>
126+
<artifactId>testng</artifactId>
127+
<version>6.14.3</version>
128+
<scope>test</scope>
129+
</dependency>
123130
</dependencies>
124131
</project>

src/main/java/Client.java

Lines changed: 0 additions & 38 deletions
This file was deleted.

src/main/java/Main.java

Lines changed: 0 additions & 7 deletions
This file was deleted.

src/main/java/ServiceEndpoint.java

Lines changed: 0 additions & 33 deletions
This file was deleted.
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package de.dhbw.ravensburg.verteiltesysteme;
2+
3+
import de.dhbw.ravensburg.verteiltesysteme.de.dhbw.ravensburg.verteiltesysteme.rpc.SamplingMessageGrpc;
4+
import de.dhbw.ravensburg.verteiltesysteme.de.dhbw.ravensburg.verteiltesysteme.rpc.SamplingMessageGrpcService;
5+
import io.grpc.ManagedChannel;
6+
import io.grpc.ManagedChannelBuilder;
7+
import lombok.extern.slf4j.Slf4j;
8+
9+
@Slf4j
10+
public class Client {
11+
12+
//TODO; implement Apache CLI https://stackoverflow.com/a/367714/876724
13+
public static void main(String[] args) throws InterruptedException {
14+
ManagedChannelBuilder<?> managedChannelBuilder = ManagedChannelBuilder.forAddress("127.0.0.1", 8080).usePlaintext();
15+
ManagedChannel managedChannel = managedChannelBuilder.build();
16+
SamplingMessageGrpc.SamplingMessageStub samplingMessageStub = SamplingMessageGrpc.newStub(managedChannel);
17+
SamplingMessageGrpc.SamplingMessageBlockingStub samplingMessageBlockingStub = SamplingMessageGrpc.newBlockingStub(managedChannel);
18+
19+
final String name = "NOLI";
20+
final String content = "bla";
21+
final long lifetime = 2;
22+
23+
SamplingMessageGrpcService.CreateSamplingMessageRequest createSamplingMessageRequest =
24+
SamplingMessageGrpcService.CreateSamplingMessageRequest.newBuilder()
25+
.setMessageName(name)
26+
.setLifetimeInSec(lifetime)
27+
.build();
28+
29+
SamplingMessageGrpcService.WriteSamplingMessageRequest writeSamplingMessageRequest =
30+
SamplingMessageGrpcService.WriteSamplingMessageRequest.newBuilder()
31+
.setMessageName(name)
32+
.setMessageContent(content)
33+
.build();
34+
35+
SamplingMessageGrpcService.ReadSamplingMessageRequest readSamplingMessageRequest =
36+
SamplingMessageGrpcService.ReadSamplingMessageRequest.newBuilder()
37+
.setMessageName(name)
38+
.build();
39+
40+
41+
//SamplingMessageGrpcService.WriteSamplingMessageRequest writeSamplingMessageRequest = SamplingMessageGrpcService.WriteSamplingMessageRequest.newBuilder().setMessageContent("BLA").setMessageName("BLA").build();
42+
/**
43+
samplingMessageStub.writeSamplingMessage(writeSamplingMessageRequest, new StreamObserver<SamplingMessageGrpcService.WriteSamplingMessageResponse>() {
44+
@Override public void onNext(SamplingMessageGrpcService.WriteSamplingMessageResponse writeSamplingMessageResponse) {
45+
log.info(writeSamplingMessageResponse.toString());
46+
}
47+
48+
@Override public void onError(Throwable throwable) {
49+
50+
}
51+
52+
@Override public void onCompleted() {
53+
log.info("DONE writeSamplingMessage");
54+
}
55+
});
56+
try {
57+
managedChannel.awaitTermination(10, TimeUnit.SECONDS);
58+
} catch (InterruptedException e) {
59+
e.printStackTrace();
60+
}**/
61+
/*System.out.println(System.currentTimeMillis());
62+
for(int i = 0; i < 10; ++i){
63+
samplingMessageStub.createSamplingMessage(createSamplingMessageRequest, new StreamObserver<SamplingMessageGrpcService.CreateSamplingMessageResponse>() {
64+
@Override public void onNext(SamplingMessageGrpcService.CreateSamplingMessageResponse createSamplingMessageResponse) {
65+
log.info(createSamplingMessageResponse.getStatusCode().name());
66+
System.out.println(System.currentTimeMillis());
67+
}
68+
69+
@Override public void onError(Throwable throwable) {
70+
log.error(throwable.getMessage());
71+
}
72+
73+
@Override public void onCompleted() {
74+
}
75+
});
76+
}
77+
Thread.sleep(200000);
78+
try {
79+
managedChannel.awaitTermination(100, TimeUnit.SECONDS);
80+
} catch (InterruptedException e) {
81+
e.printStackTrace();
82+
}
83+
managedChannel.shutdown();
84+
System.exit(0);*/
85+
86+
SamplingMessageGrpcService.CreateSamplingMessageResponse createSamplingMessageResponse = samplingMessageBlockingStub.createSamplingMessage(createSamplingMessageRequest);
87+
log.info("createSamplingMessageResponse Status Code: " + createSamplingMessageResponse.getStatusCode().name());
88+
89+
SamplingMessageGrpcService.WriteSamplingMessageResponse writeSamplingMessageResponse = samplingMessageBlockingStub.writeSamplingMessage(writeSamplingMessageRequest);
90+
log.info("writeSamplingMessageResponse Status Code: " + writeSamplingMessageResponse.getStatusCode().name());
91+
92+
93+
SamplingMessageGrpcService.ReadSamplingMessageResponse readSamplingMessageResponse = samplingMessageBlockingStub.readSamplingMessage(readSamplingMessageRequest);
94+
log.info("readSamplingMessageResponse Status Code: " + readSamplingMessageResponse.getStatusCode().name());
95+
log.info("readSamplingMessageResponse Content: " + readSamplingMessageResponse.getMessageContent());
96+
log.info("readSamplingMessageResponse Valid: " + readSamplingMessageResponse.getMessageIsValid());
97+
98+
99+
SamplingMessageGrpcService.CreateSamplingMessageResponse createDuplicateSamplingMessageResponse = samplingMessageBlockingStub.createSamplingMessage(createSamplingMessageRequest);
100+
log.info("createSamplingMessageResponse Status Code: " + createDuplicateSamplingMessageResponse.getStatusCode().name());
101+
102+
Thread.sleep(lifetime * 1000);
103+
104+
SamplingMessageGrpcService.ReadSamplingMessageResponse readSamplingMessageResponse3 = samplingMessageBlockingStub.readSamplingMessage(readSamplingMessageRequest);
105+
log.info("readSamplingMessageResponse Status Code: " + readSamplingMessageResponse3.getStatusCode().name());
106+
log.info("readSamplingMessageResponse Content: " + readSamplingMessageResponse3.getMessageContent());
107+
log.info("readSamplingMessageResponse Valid: " + readSamplingMessageResponse3.getMessageIsValid());
108+
109+
managedChannel.shutdown();
110+
}
111+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package de.dhbw.ravensburg.verteiltesysteme;
2+
3+
import lombok.extern.slf4j.Slf4j;
4+
5+
@Slf4j
6+
public class Main {
7+
8+
//TODO; implement Apache CLI https://stackoverflow.com/a/367714/876724
9+
public static void main(String[] args) {
10+
ServiceEndpoint serviceEndpoint = new ServiceEndpoint();
11+
12+
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
13+
log.info("Shutting Hook received.");
14+
serviceEndpoint.shutdown();
15+
log.info("Bye bye...");
16+
}));
17+
18+
serviceEndpoint.init();
19+
}
20+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package de.dhbw.ravensburg.verteiltesysteme;
2+
3+
import de.dhbw.ravensburg.verteiltesysteme.persistence.DatabaseAccessObjectImpl;
4+
import de.dhbw.ravensburg.verteiltesysteme.persistence.FakePersistence;
5+
import de.dhbw.ravensburg.verteiltesysteme.rpc.RpcService;
6+
import de.dhbw.ravensburg.verteiltesysteme.service.SamplingMessageServiceImpl;
7+
import io.grpc.*;
8+
import lombok.extern.slf4j.Slf4j;
9+
10+
import java.io.IOException;
11+
import java.net.SocketAddress;
12+
13+
@Slf4j
14+
public class ServiceEndpoint {
15+
private final Server server;
16+
17+
//TODO; replace with ServerConfig
18+
private final Integer port;
19+
20+
//TODO; supply the constructor with a ServerConfig object
21+
ServiceEndpoint() {
22+
log.info("Preparing Service Endpoint");
23+
//TODO; remove env var parameterization
24+
this.port = Integer.parseInt(System.getenv().getOrDefault("port", "8080"));
25+
26+
final RpcService rpcService = new RpcService(new SamplingMessageServiceImpl(new DatabaseAccessObjectImpl(new FakePersistence<>())));
27+
28+
this.server = ServerBuilder
29+
.forPort(this.port)
30+
.addService(rpcService)
31+
.intercept(socketAddressLoggingServerInterceptor())
32+
.build();
33+
}
34+
35+
private static ServerInterceptor socketAddressLoggingServerInterceptor() {
36+
return new ServerInterceptor() {
37+
@Override
38+
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
39+
final SocketAddress socketAddress = serverCall.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
40+
log.info(String.format("Receiving request from: %s", socketAddress == null ? "Unavailable" : socketAddress.toString()));
41+
return serverCallHandler.startCall(serverCall, metadata);
42+
}
43+
};
44+
}
45+
46+
public void init() {
47+
try {
48+
log.info("Starting Service Endpoint");
49+
this.server.start();
50+
log.info(String.format("Server listening on TCP port: %s", this.port));
51+
this.server.awaitTermination();
52+
} catch (IOException e) {
53+
log.error(e.getMessage());
54+
} catch (InterruptedException e) {
55+
log.error(e.getMessage());
56+
}
57+
}
58+
59+
public void shutdown() {
60+
log.info("Shutting down gRPC Server");
61+
this.server.shutdownNow();
62+
log.info("gRPC Server shut down.");
63+
}
64+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package de.dhbw.ravensburg.verteiltesysteme.persistence;
2+
3+
import de.dhbw.ravensburg.verteiltesysteme.persistence.exception.DatabaseSamplingMessageAlreadyExistsException;
4+
import de.dhbw.ravensburg.verteiltesysteme.persistence.exception.DatabaseSamplingMessageNotFoundException;
5+
import de.dhbw.ravensburg.verteiltesysteme.persistence.model.DatabaseSamplingMessage;
6+
import lombok.NonNull;
7+
8+
import java.time.Instant;
9+
10+
public interface DatabaseAccessObject {
11+
DatabaseSamplingMessage getSamplingMessage(@NonNull String messageName) throws DatabaseSamplingMessageNotFoundException;
12+
13+
void createSamplingMessage(@NonNull String messageName, @NonNull DatabaseSamplingMessage databaseSamplingMessage) throws DatabaseSamplingMessageAlreadyExistsException;
14+
15+
void writeSamplingMessageContentAndTimestamp(@NonNull final String messageName, @NonNull final String messageContent, @NonNull final Instant updateTimestamp) throws DatabaseSamplingMessageNotFoundException;
16+
17+
void deleteSamplingMessage(@NonNull String messageName) throws DatabaseSamplingMessageNotFoundException;
18+
19+
long getTotalMessageCount();
20+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package de.dhbw.ravensburg.verteiltesysteme.persistence;
2+
3+
import de.dhbw.ravensburg.verteiltesysteme.persistence.exception.DatabaseSamplingMessageAlreadyExistsException;
4+
import de.dhbw.ravensburg.verteiltesysteme.persistence.exception.DatabaseSamplingMessageNotFoundException;
5+
import de.dhbw.ravensburg.verteiltesysteme.persistence.model.DatabaseSamplingMessage;
6+
import lombok.NonNull;
7+
import lombok.extern.slf4j.Slf4j;
8+
9+
import java.time.Instant;
10+
11+
@Slf4j
12+
public class DatabaseAccessObjectImpl implements DatabaseAccessObject {
13+
private final FakePersistence<String, DatabaseSamplingMessage> fakePersistence;
14+
15+
public DatabaseAccessObjectImpl(final @NonNull FakePersistence<String, DatabaseSamplingMessage> fakePersistence) {
16+
this.fakePersistence = fakePersistence;
17+
}
18+
19+
@Override
20+
public DatabaseSamplingMessage getSamplingMessage(@NonNull final String messageName) throws DatabaseSamplingMessageNotFoundException {
21+
final DatabaseSamplingMessage databaseSamplingMessage = fakePersistence.get(messageName);
22+
if (databaseSamplingMessage == null) {
23+
throw new DatabaseSamplingMessageNotFoundException(String.format("DatabaseSamplingMessage with the messageName: %s not found.", messageName));
24+
}
25+
return databaseSamplingMessage;
26+
}
27+
28+
@Override
29+
public void createSamplingMessage(@NonNull final String messageName, @NonNull final DatabaseSamplingMessage databaseSamplingMessage) throws DatabaseSamplingMessageAlreadyExistsException {
30+
if (fakePersistence.putIfAbsent(messageName, databaseSamplingMessage) != null) {
31+
throw new DatabaseSamplingMessageAlreadyExistsException(String.format("DatabaseSamplingMessage with the messageName: %s already exists.", messageName));
32+
}
33+
}
34+
35+
public void writeSamplingMessageContentAndTimestamp(@NonNull final String messageName, @NonNull final String messageContent, @NonNull final Instant updateTimestamp) throws DatabaseSamplingMessageNotFoundException {
36+
if (fakePersistence.computeIfPresent(messageName, (key, value) -> value.setMessageContent(messageContent).setMessageUpdateTimestamp(updateTimestamp)) == null) {
37+
throw new DatabaseSamplingMessageNotFoundException(String.format("DatabaseSamplingMessage with the messageName: %s not found.", messageName));
38+
}
39+
log.info(fakePersistence.get(messageName).toString());
40+
}
41+
42+
@Override
43+
public void deleteSamplingMessage(@NonNull final String messageName) throws DatabaseSamplingMessageNotFoundException {
44+
if (fakePersistence.remove(messageName) == null) {
45+
throw new DatabaseSamplingMessageNotFoundException(String.format("DatabaseSamplingMessage with the messageName: %s not found.", messageName));
46+
}
47+
}
48+
49+
@Override
50+
public long getTotalMessageCount() {
51+
return fakePersistence.size();
52+
}
53+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package de.dhbw.ravensburg.verteiltesysteme.persistence;
2+
3+
import java.util.concurrent.ConcurrentHashMap;
4+
5+
/**
6+
* Fake persistence class
7+
* Mocking a partially synchronized persistence key-value storage by being nothing else than a simple java.util.concurrent.ConcurrentHashMap
8+
*
9+
* @param <K> Key generic type
10+
* @param <V> Value generic type
11+
*/
12+
public class FakePersistence<K, V> extends ConcurrentHashMap<K, V> {
13+
}

0 commit comments

Comments
 (0)