Skip to content

Commit 11815e0

Browse files
committed
Fix error propagation rule for Python's C API
1 parent 765795c commit 11815e0

File tree

6 files changed

+94
-1
lines changed

6 files changed

+94
-1
lines changed

src/confluent_kafka/src/Consumer.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1575,6 +1575,7 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
15751575
if (result)
15761576
Py_DECREF(result);
15771577
else {
1578+
PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback);
15781579
CallState_crash(cs);
15791580
rd_kafka_yield(rk);
15801581
}

src/confluent_kafka/src/Producer.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm,
163163
if (result)
164164
Py_DECREF(result);
165165
else {
166+
PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback);
166167
CallState_crash(cs);
167168
rd_kafka_yield(rk);
168169
}

src/confluent_kafka/src/confluent_kafka.c

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1755,6 +1755,8 @@ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque)
17551755
if (result)
17561756
Py_DECREF(result);
17571757
else {
1758+
1759+
PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback);
17581760
crash:
17591761
CallState_crash(cs);
17601762
rd_kafka_yield(h->rk);
@@ -1810,6 +1812,8 @@ static void throttle_cb (rd_kafka_t *rk, const char *broker_name, int32_t broker
18101812
goto done;
18111813
}
18121814

1815+
PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback);
1816+
18131817
/**
18141818
* Stop callback dispatcher, return err to application
18151819
* fall-through to unlock GIL
@@ -1839,6 +1843,7 @@ static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) {
18391843
if (result)
18401844
Py_DECREF(result);
18411845
else {
1846+
PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback);
18421847
CallState_crash(cs);
18431848
rd_kafka_yield(h->rk);
18441849
}
@@ -1874,6 +1879,7 @@ static void log_cb (const rd_kafka_t *rk, int level,
18741879
if (result)
18751880
Py_DECREF(result);
18761881
else {
1882+
PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback);
18771883
CallState_crash(cs);
18781884
rd_kafka_yield(h->rk);
18791885
}
@@ -2572,6 +2578,9 @@ void CallState_begin (Handle *h, CallState *cs) {
25722578
cs->thread_state = PyEval_SaveThread();
25732579
assert(cs->thread_state != NULL);
25742580
cs->crashed = 0;
2581+
cs->exception_type = NULL;
2582+
cs->exception_value = NULL;
2583+
cs->exception_traceback = NULL;
25752584
#ifdef WITH_PY_TSS
25762585
PyThread_tss_set(&h->tlskey, cs);
25772586
#else
@@ -2592,8 +2601,19 @@ int CallState_end (Handle *h, CallState *cs) {
25922601

25932602
PyEval_RestoreThread(cs->thread_state);
25942603

2595-
if (PyErr_CheckSignals() == -1 || cs->crashed)
2604+
if (PyErr_CheckSignals() == -1)
2605+
return 0;
2606+
2607+
if (cs->crashed) {
2608+
/* Restore the saved exception if we have one */
2609+
if (cs->exception_type) {
2610+
PyErr_Restore(cs->exception_type, cs->exception_value, cs->exception_traceback);
2611+
cs->exception_type = NULL;
2612+
cs->exception_value = NULL;
2613+
cs->exception_traceback = NULL;
2614+
}
25962615
return 0;
2616+
}
25972617

25982618
return 1;
25992619
}

src/confluent_kafka/src/confluent_kafka.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,9 @@ int Handle_traverse (Handle *h, visitproc visit, void *arg);
275275
typedef struct {
276276
PyThreadState *thread_state;
277277
int crashed; /* Callback crashed */
278+
PyObject *exception_type; /* Stored exception type */
279+
PyObject *exception_value; /* Stored exception value */
280+
PyObject *exception_traceback; /* Stored exception traceback */
278281
} CallState;
279282

280283
/**

tests/test_Consumer.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,3 +324,38 @@ def test_consumer_without_groupid():
324324
with pytest.raises(ValueError) as ex:
325325
TestConsumer({'bootstrap.servers': "mybroker:9092"})
326326
assert ex.match('group.id must be set')
327+
328+
329+
def test_callback_exception_no_system_error():
330+
331+
exception_raised = []
332+
333+
def error_cb_that_raises(error):
334+
"""Error callback that raises an exception"""
335+
exception_raised.append(error)
336+
raise RuntimeError("Test exception from error_cb")
337+
338+
# Create consumer with error callback that raises exception
339+
consumer = TestConsumer({
340+
'group.id': 'test-callback-systemerror-fix',
341+
'bootstrap.servers': 'nonexistent-broker:9092', # Will trigger error
342+
'socket.timeout.ms': 100,
343+
'session.timeout.ms': 1000,
344+
'error_cb': error_cb_that_raises
345+
})
346+
347+
consumer.subscribe(['test-topic'])
348+
349+
# This should trigger the error callback due to connection failure
350+
# Before fix: Would get RuntimeError + SystemError
351+
# After fix: Should only get RuntimeError (no SystemError)
352+
with pytest.raises(RuntimeError) as exc_info:
353+
consumer.consume(timeout=0.1)
354+
355+
# Verify we got the expected exception message
356+
assert "Test exception from error_cb" in str(exc_info.value)
357+
358+
# Verify the error callback was actually called
359+
assert len(exception_raised) > 0
360+
361+
consumer.close()

tests/test_Producer.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,3 +283,36 @@ def test_producer_bool_value():
283283

284284
p = Producer({})
285285
assert bool(p)
286+
287+
288+
def test_callback_exception_no_system_error():
289+
delivery_reports = []
290+
291+
def delivery_cb_that_raises(err, msg):
292+
"""Delivery report callback that raises an exception"""
293+
delivery_reports.append((err, msg))
294+
raise RuntimeError("Test exception from delivery_cb")
295+
296+
producer = Producer({
297+
'bootstrap.servers': 'nonexistent-broker:9092', # Will cause delivery failures
298+
'socket.timeout.ms': 100,
299+
'message.timeout.ms': 1000,
300+
'on_delivery': delivery_cb_that_raises
301+
})
302+
303+
# Produce a message - this will trigger delivery report callback when it fails
304+
producer.produce('test-topic', value='test-message')
305+
306+
# Flush to ensure delivery reports are processed
307+
# Before fix: Would get RuntimeError + SystemError
308+
# After fix: Should only get RuntimeError (no SystemError)
309+
with pytest.raises(RuntimeError) as exc_info:
310+
producer.flush(timeout=1.0)
311+
312+
# Verify we got an exception from our callback
313+
assert "Test exception from delivery_cb" in str(exc_info.value)
314+
315+
# Verify the delivery callback was actually called
316+
assert len(delivery_reports) > 0
317+
318+
producer.close()

0 commit comments

Comments
 (0)