Skip to content

Commit ee32c9f

Browse files
committed
server skips pub confirmation to prevent blocking io
1 parent 8a69904 commit ee32c9f

File tree

1 file changed

+16
-12
lines changed

1 file changed

+16
-12
lines changed

GeoFaaS/src/main/kotlin/geofaas/ServerGBClient.kt

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ class ServerGBClient(loc: Location, debug: Boolean, host: String = "localhost",
2929
logger.debug("incoming Pub added to pubQueue. dump: {}", newMessage)
3030
receivedPubs.incrementAndGet()
3131
}
32+
is Payload.PUBACKPayload -> {
33+
logger.debug("drop incoming PUBACKPayload on the server")
34+
receivedHandshakes.incrementAndGet()
35+
}
3236
else -> {
3337
ackQueue.add(newMessage)
3438
logger.debug("incoming msg added to ackQueue. dump: {}", newMessage)
@@ -42,34 +46,34 @@ class ServerGBClient(loc: Location, debug: Boolean, host: String = "localhost",
4246
val type = if(isOffload) TypeCode.OFFLOAD else TypeCode.NORMAL
4347
val responseTopicFence = ResponseInfoPatched(id, null, brokerArea.toJson()) // null topic = not listening for any response
4448
val message = FunctionMessage(reqId, funcName, FunctionAction.RESULT, res, type, reqId.clientId, responseTopicFence)
45-
basicClient.send(Payload.PUBLISHPayload(Topic("functions/$funcName/result"), clientFence, gson.toJson(message))) // Json.encodeToString(FunctionMessage.serializer(), message)
46-
val pubStatus = listenForPubAckAndProcess(FunctionAction.RESULT, funcName, 8000)
47-
if (pubStatus.first != StatusCode.Success)
48-
logger.error("Unsuccessful sending Result to ${reqId.clientId}. StatusCode: {}", pubStatus.first)
49+
basicClient.send(Payload.PUBLISHPayload(Topic("functions/$funcName/result"), clientFence, gson.toJson(message)))
50+
// val pubStatus = listenForPubAckAndProcess(FunctionAction.RESULT, funcName, 8000)
51+
// if (pubStatus.first != StatusCode.Success)
52+
// logger.error("Unsuccessful sending Result to ${reqId.clientId}. StatusCode: {}", pubStatus.first)
4953
}
5054
// publishes an Acknowledgement for receiving a client's request. the client listening for it
5155
fun sendAck(funcName: String, clientFence: Geofence, reqId: RequestID) {
5256
val responseTopicFence = ResponseInfoPatched(id,null, brokerArea.toJson())
5357
val message = FunctionMessage(reqId, funcName, FunctionAction.ACK, "", TypeCode.NORMAL, reqId.clientId, responseTopicFence)
5458
basicClient.send(Payload.PUBLISHPayload(Topic("functions/$funcName/ack"), clientFence, gson.toJson(message)))
55-
val pubStatus = listenForPubAckAndProcess(FunctionAction.ACK, funcName, 8000)
56-
if (pubStatus.first != StatusCode.Success)
57-
logger.error("Unsuccessful sending Ack to ${reqId.clientId}. StatusCode: {}", pubStatus.first)
59+
// val pubStatus = listenForPubAckAndProcess(FunctionAction.ACK, funcName, 8000)
60+
// if (pubStatus.first != StatusCode.Success)
61+
// logger.error("Unsuccessful sending Ack to ${reqId.clientId}. StatusCode: {}", pubStatus.first)
5862
}
5963

6064
// publishes a NotAck to offload to the cloud. the cloud listening for it
6165
fun sendNack(funcName: String, data: String, clientFence: Geofence, reqId: RequestID, cloudId: String, cloudFence: Geofence = Geofence.world()) {
6266
val responseTopicFence = ResponseInfoPatched(reqId.clientId, Topic("functions/$funcName/result"), clientFence.toJson())
6367
val message = FunctionMessage(reqId, funcName, FunctionAction.NACK, data, TypeCode.NORMAL, cloudId, responseTopicFence)
6468
basicClient.send(Payload.PUBLISHPayload(Topic("functions/$funcName/nack"), cloudFence, gson.toJson(message)))
65-
val pubStatus = listenForPubAckAndProcess(FunctionAction.NACK, funcName, 8000)
66-
if (pubStatus.first == StatusCode.Failure) logger.error("failed to offload $funcName call by ${reqId.clientId}. Is $cloudId online?")
67-
else if(pubStatus.first != StatusCode.Success)
68-
logger.error("Unsuccessful sending Nack to ${reqId.clientId}. StatusCode: {}", pubStatus)
69+
// val pubStatus = listenForPubAckAndProcess(FunctionAction.NACK, funcName, 8000)
70+
// if (pubStatus.first == StatusCode.Failure) logger.error("failed to offload $funcName call by ${reqId.clientId}. Is $cloudId online?")
71+
// else if(pubStatus.first != StatusCode.Success)
72+
// logger.error("Unsuccessful sending Nack to ${reqId.clientId}. StatusCode: {}", pubStatus)
6973
}
7074

7175
fun registerFunctions(functions: Set<GeoFaaSFunction>, fence: Geofence): StatusCode { //FIXME: should update CALL subscriptions in geoBroker when remote FaaS added/removed serving function
72-
val subscriptions = subscribedFunctionsList()
76+
val subscriptions: Map<String, List<String>> = subscribedFunctionsList()
7377
val callSubs = subscriptions.filter { it.value.contains("call") } // assume Cloud either subscribed to both Nack & Call or none
7478
// val nackSubs = subscriptions.filter { it.value.contains("nack") }
7579

0 commit comments

Comments
 (0)