Skip to content

Commit 3aaf345

Browse files
authored
[feat] PIP-442: Add memory limits for CommandGetTopicsOfNamespace (#24833)
1 parent 9944ab0 commit 3aaf345

File tree

39 files changed

+4944
-281
lines changed

39 files changed

+4944
-281
lines changed

conf/broker.conf

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,30 @@ allowOverrideEntryFilters=false
567567
# Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
568568
maxConcurrentLookupRequest=50000
569569

570+
# Maximum heap memory for inflight topic list operations (MB).
571+
# Default: 100 MB (supports ~1M topic names assuming 100 bytes each)
572+
maxTopicListInFlightHeapMemSizeMB=100
573+
574+
# Maximum direct memory for inflight topic list responses (MB).
575+
# Default: 100 MB (network buffers for serialized responses)
576+
maxTopicListInFlightDirectMemSizeMB=100
577+
578+
# Timeout for acquiring heap memory permits (milliseconds).
579+
# Default: 25000 (25 seconds)
580+
maxTopicListInFlightHeapMemSizePermitsAcquireTimeoutMillis=25000
581+
582+
# Maximum queue size for heap memory permit requests.
583+
# Default: 10000 (prevent unbounded queueing)
584+
maxTopicListInFlightHeapMemSizePermitsAcquireQueueSize=10000
585+
586+
# Timeout for acquiring direct memory permits (milliseconds).
587+
# Default: 25000 (25 seconds)
588+
maxTopicListInFlightDirectMemSizePermitsAcquireTimeoutMillis=25000
589+
590+
# Maximum queue size for direct memory permit requests.
591+
# Default: 10000 (prevent unbounded queueing)
592+
maxTopicListInFlightDirectMemSizePermitsAcquireQueueSize=10000
593+
570594
# Max number of concurrent topic loading request broker allows to control number of zk-operations
571595
maxConcurrentTopicLoadRequest=5000
572596

conf/proxy.conf

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,30 @@ maxConcurrentInboundConnectionsPerIp=0
228228
# Max concurrent outbound connections. The proxy will error out requests beyond that.
229229
maxConcurrentLookupRequests=50000
230230

231+
# Maximum heap memory for inflight topic list operations (MB).
232+
# Default: 100 MB (supports ~1M topic names assuming 100 bytes each)
233+
maxTopicListInFlightHeapMemSizeMB=100
234+
235+
# Maximum direct memory for inflight topic list responses (MB).
236+
# Default: 100 MB (network buffers for serialized responses)
237+
maxTopicListInFlightDirectMemSizeMB=100
238+
239+
# Timeout for acquiring heap memory permits (milliseconds).
240+
# Default: 25000 (25 seconds)
241+
maxTopicListInFlightHeapMemSizePermitsAcquireTimeoutMillis=25000
242+
243+
# Maximum queue size for heap memory permit requests.
244+
# Default: 10000 (prevent unbounded queueing)
245+
maxTopicListInFlightHeapMemSizePermitsAcquireQueueSize=10000
246+
247+
# Timeout for acquiring direct memory permits (milliseconds).
248+
# Default: 25000 (25 seconds)
249+
maxTopicListInFlightDirectMemSizePermitsAcquireTimeoutMillis=25000
250+
251+
# Maximum queue size for direct memory permit requests.
252+
# Default: 10000 (prevent unbounded queueing)
253+
maxTopicListInFlightDirectMemSizePermitsAcquireQueueSize=10000
254+
231255
##### --- TLS --- #####
232256

233257
# Deprecated - use servicePortTls and webServicePortTls instead

conf/standalone.conf

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,30 @@ preciseDispatcherFlowControl=false
320320
# Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
321321
maxConcurrentLookupRequest=50000
322322

323+
# Maximum heap memory for inflight topic list operations (MB).
324+
# Default: 100 MB (supports ~1M topic names assuming 100 bytes each)
325+
maxTopicListInFlightHeapMemSizeMB=100
326+
327+
# Maximum direct memory for inflight topic list responses (MB).
328+
# Default: 100 MB (network buffers for serialized responses)
329+
maxTopicListInFlightDirectMemSizeMB=100
330+
331+
# Timeout for acquiring heap memory permits (milliseconds).
332+
# Default: 25000 (25 seconds)
333+
maxTopicListInFlightHeapMemSizePermitsAcquireTimeoutMillis=25000
334+
335+
# Maximum queue size for heap memory permit requests.
336+
# Default: 10000 (prevent unbounded queueing)
337+
maxTopicListInFlightHeapMemSizePermitsAcquireQueueSize=10000
338+
339+
# Timeout for acquiring direct memory permits (milliseconds).
340+
# Default: 25000 (25 seconds)
341+
maxTopicListInFlightDirectMemSizePermitsAcquireTimeoutMillis=25000
342+
343+
# Maximum queue size for direct memory permit requests.
344+
# Default: 10000 (prevent unbounded queueing)
345+
maxTopicListInFlightDirectMemSizePermitsAcquireQueueSize=10000
346+
323347
# Max number of concurrent topic loading request broker allows to control number of zk-operations
324348
maxConcurrentTopicLoadRequest=5000
325349

pip/pip-442.md

Lines changed: 401 additions & 199 deletions
Large diffs are not rendered by default.

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1391,6 +1391,42 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
13911391
doc = "Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic")
13921392
private int maxConcurrentLookupRequest = 50000;
13931393

1394+
@FieldContext(
1395+
category = CATEGORY_SERVER,
1396+
doc = "Maximum heap memory for inflight topic list operations (MB).\n"
1397+
+ "Default: 100 MB (supports ~1M topic names assuming 100 bytes each)")
1398+
private int maxTopicListInFlightHeapMemSizeMB = 100;
1399+
1400+
@FieldContext(
1401+
category = CATEGORY_SERVER,
1402+
doc = "Maximum direct memory for inflight topic list responses (MB).\n"
1403+
+ "Default: 100 MB (network buffers for serialized responses)")
1404+
private int maxTopicListInFlightDirectMemSizeMB = 100;
1405+
1406+
@FieldContext(
1407+
category = CATEGORY_SERVER,
1408+
doc = "Timeout for acquiring heap memory permits (milliseconds).\n"
1409+
+ "Default: 25000 (25 seconds)")
1410+
private int maxTopicListInFlightHeapMemSizePermitsAcquireTimeoutMillis = 25000;
1411+
1412+
@FieldContext(
1413+
category = CATEGORY_SERVER,
1414+
doc = "Maximum queue size for heap memory permit requests.\n"
1415+
+ "Default: 10000 (prevent unbounded queueing)")
1416+
private int maxTopicListInFlightHeapMemSizePermitsAcquireQueueSize = 10000;
1417+
1418+
@FieldContext(
1419+
category = CATEGORY_SERVER,
1420+
doc = "Timeout for acquiring direct memory permits (milliseconds).\n"
1421+
+ "Default: 25000 (25 seconds)")
1422+
private int maxTopicListInFlightDirectMemSizePermitsAcquireTimeoutMillis = 25000;
1423+
1424+
@FieldContext(
1425+
category = CATEGORY_SERVER,
1426+
doc = "Maximum queue size for direct memory permit requests.\n"
1427+
+ "Default: 10000 (prevent unbounded queueing)")
1428+
private int maxTopicListInFlightDirectMemSizePermitsAcquireQueueSize = 10000;
1429+
13941430
@FieldContext(
13951431
dynamic = true,
13961432
category = CATEGORY_SERVER,

0 commit comments

Comments
 (0)