Skip to content

Commit 27ce955

Browse files
authored
[fix][broker]Leaving orphan schemas and topic-level policies after partitioned topic is deleted by GC (#24971)
1 parent 4bf335a commit 27ce955

File tree

2 files changed

+32
-6
lines changed

2 files changed

+32
-6
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@
9999
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateDataConflictResolver;
100100
import org.apache.pulsar.broker.namespace.NamespaceService;
101101
import org.apache.pulsar.broker.resources.NamespaceResources.PartitionedTopicResources;
102+
import org.apache.pulsar.broker.resources.PulsarResources;
103+
import org.apache.pulsar.broker.resources.TopicResources;
102104
import org.apache.pulsar.broker.service.AbstractReplicator;
103105
import org.apache.pulsar.broker.service.AbstractTopic;
104106
import org.apache.pulsar.broker.service.BrokerService;
@@ -185,6 +187,7 @@
185187
import org.apache.pulsar.common.protocol.Commands;
186188
import org.apache.pulsar.common.protocol.Markers;
187189
import org.apache.pulsar.common.protocol.schema.SchemaData;
190+
import org.apache.pulsar.common.protocol.schema.SchemaStorage;
188191
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
189192
import org.apache.pulsar.common.schema.SchemaType;
190193
import org.apache.pulsar.common.topics.TopicCompactionStrategy;
@@ -3453,6 +3456,34 @@ public void checkGC() {
34533456
}
34543457
}
34553458

3459+
private CompletableFuture<Void> deleteSchemaAndPoliciesIfAllPartitionsDeleted() {
3460+
if (!TopicName.get(topic).isPartitioned()) {
3461+
return CompletableFuture.completedFuture(null);
3462+
}
3463+
TopicName pTopicName = TopicName.get(TopicName.get(topic).getPartitionedTopicName());
3464+
final BrokerService broker = getBrokerService();
3465+
final PulsarResources pulsarResources = broker.pulsar().getPulsarResources();
3466+
final TopicResources topicResources = pulsarResources.getTopicResources();
3467+
final TopicPoliciesService topicPoliciesService = broker.getPulsar().getTopicPoliciesService();
3468+
final SchemaStorage schemaStorage = broker.getPulsar().getSchemaStorage();
3469+
return topicResources.listPersistentTopicsAsync(pTopicName.getNamespaceObject()).thenApply(list -> {
3470+
for (String s : list) {
3471+
TopicName item = TopicName.get(s);
3472+
if (item.isPartitioned() && item.getPartitionedTopicName().equals(pTopicName.toString())) {
3473+
return true;
3474+
}
3475+
}
3476+
return false;
3477+
}).thenCompose(partitionExists -> {
3478+
if (partitionExists) {
3479+
return CompletableFuture.completedFuture(null);
3480+
}
3481+
return schemaStorage.delete(pTopicName.getSchemaName()).thenCompose(__ -> {
3482+
return topicPoliciesService.deleteTopicPoliciesAsync(pTopicName, false);
3483+
});
3484+
});
3485+
}
3486+
34563487
private CompletableFuture<Void> tryToDeletePartitionedMetadata() {
34573488
if (TopicName.get(topic).isPartitioned() && !deletePartitionedTopicMetadataWhileInactive()) {
34583489
return CompletableFuture.completedFuture(null);
@@ -3464,7 +3495,7 @@ private CompletableFuture<Void> tryToDeletePartitionedMetadata() {
34643495
return partitionedTopicResources.partitionedTopicExistsAsync(topicName)
34653496
.thenCompose(partitionedTopicExist -> {
34663497
if (!partitionedTopicExist) {
3467-
return CompletableFuture.completedFuture(null);
3498+
return deleteSchemaAndPoliciesIfAllPartitionsDeleted();
34683499
} else {
34693500
return getBrokerService().pulsar().getPulsarResources().getNamespaceResources()
34703501
.getPartitionedTopicResources().runWithMarkDeleteAsync(topicName, () ->

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTopicGcUsingGlobalZKTest.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,6 @@ public void cleanup() throws Exception {
4343

4444
@Test(dataProvider = "topicTypes")
4545
public void testTopicGC(TopicType topicType) throws Exception {
46-
if (topicType.equals(TopicType.PARTITIONED)) {
47-
// Pulsar does not support the feature "brokerDeleteInactivePartitionedTopicMetadataEnabled" when enabling
48-
// Geo-Replication with Global ZK.
49-
return;
50-
}
5146
super.testTopicGC(topicType);
5247
}
5348

0 commit comments

Comments
 (0)