Skip to content

Commit 9af040b

Browse files
authored
refactor: Add debug logging (#741)
1 parent 92d0246 commit 9af040b

File tree

3 files changed

+19
-10
lines changed

3 files changed

+19
-10
lines changed

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerImpl.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,15 @@
2828
import com.google.cloud.pubsublite.proto.PartitionAssignmentRequest;
2929
import com.google.cloud.pubsublite.v1.PartitionAssignmentServiceClient;
3030
import com.google.common.annotations.VisibleForTesting;
31+
import com.google.common.flogger.GoogleLogger;
3132
import com.google.errorprone.annotations.concurrent.GuardedBy;
3233
import java.util.HashSet;
3334
import java.util.Set;
3435

3536
public class AssignerImpl extends TrivialProxyService
3637
implements Assigner, RetryingConnectionObserver<PartitionAssignment> {
38+
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
39+
3740
private final PartitionAssignmentRequest initialRequest;
3841

3942
private final CloseableMonitor monitor = new CloseableMonitor();
@@ -90,7 +93,9 @@ private static Set<Partition> toSet(PartitionAssignment assignment) throws ApiEx
9093
@Override
9194
public void onClientResponse(PartitionAssignment value) throws CheckedApiException {
9295
try (CloseableMonitor.Hold h = monitor.enter()) {
93-
receiver.handleAssignment(toSet(value));
96+
Set<Partition> partitions = toSet(value);
97+
receiver.handleAssignment(partitions);
98+
logger.atInfo().log("Subscribed to partitions: %s", partitions);
9499
connection.modifyConnection(connectionOr -> connectionOr.ifPresent(ConnectedAssigner::ack));
95100
}
96101
}

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import com.google.cloud.pubsublite.v1.PublisherServiceClient;
4242
import com.google.common.annotations.VisibleForTesting;
4343
import com.google.common.base.Preconditions;
44+
import com.google.common.flogger.GoogleLogger;
4445
import com.google.common.util.concurrent.Monitor;
4546
import java.util.ArrayDeque;
4647
import java.util.Collection;
@@ -54,6 +55,8 @@
5455

5556
public final class PublisherImpl extends ProxyService
5657
implements Publisher<Offset>, RetryingConnectionObserver<Offset> {
58+
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
59+
5760
private final BatchingSettings batchingSettings;
5861
private final PublishRequest initialRequest;
5962
private Future<?> alarmFuture;
@@ -135,6 +138,7 @@ private void rebatchForRestart() {
135138
messages.add(UnbatchedMessage.of(batch.messages.get(i), batch.messageFutures.get(i)));
136139
}
137140
}
141+
logger.atFiner().log("Re-publishing %s messages after reconnection", messages.size());
138142
long size = 0;
139143
int count = 0;
140144
Queue<UnbatchedMessage> currentBatch = new ArrayDeque<>();

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/RetryingConnectionImpl.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,9 @@ public void reinitialize(StreamRequestT initialRequest) {
102102
try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
103103
if (completed) return;
104104
lastInitialRequest = initialRequest;
105+
logger.atFiner().log("Start initializing connection for %s", streamDescription());
105106
currentConnection = connectionFactory.New(streamFactory, this, lastInitialRequest);
107+
logger.atFiner().log("Initialized connection for %s", streamDescription());
106108
}
107109
}
108110

@@ -111,18 +113,15 @@ protected void doStop() {
111113
try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
112114
if (completed) return;
113115
completed = true;
114-
logger.atFine().log(
115-
String.format("Terminating connection with initial request %s.", streamDescription()));
116+
logger.atFine().log("Terminating connection for %s", streamDescription());
116117
currentConnection.close();
117118
} catch (Throwable t) {
118119
logger.atWarning().withCause(t).log(
119-
String.format(
120-
"Failed while terminating connection with initial request %s.", streamDescription()));
120+
"Failed while terminating connection for %s", streamDescription());
121121
notifyFailed(t);
122122
return;
123123
}
124-
logger.atFine().log(
125-
String.format("Terminated connection with initial request %s.", streamDescription()));
124+
logger.atFine().log("Terminated connection for %s", streamDescription());
126125
notifyStopped();
127126
}
128127

@@ -196,7 +195,8 @@ public final void onError(Throwable t) {
196195
return;
197196
}
198197
logger.atFine().withCause(t).log(
199-
"Stream disconnected attempting retry, after %s milliseconds", backoffTime);
198+
"Stream disconnected attempting retry, after %s milliseconds for %s",
199+
backoffTime, streamDescription());
200200
ScheduledFuture<?> retry =
201201
SystemExecutors.getAlarmExecutor()
202202
.schedule(
@@ -214,7 +214,7 @@ public final void onError(Throwable t) {
214214

215215
@Override
216216
public final void onComplete() {
217-
logger.atFine().log("Stream completed for %s.", streamDescription());
217+
logger.atFine().log("Stream completed for %s", streamDescription());
218218
boolean expectedCompletion;
219219
try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
220220
expectedCompletion = completed;
@@ -227,7 +227,7 @@ public final void onComplete() {
227227

228228
private String streamDescription() {
229229
try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
230-
return lastInitialRequest.toString();
230+
return lastInitialRequest.getClass().getSimpleName() + ": " + lastInitialRequest.toString();
231231
}
232232
}
233233
}

0 commit comments

Comments
 (0)