Skip to content

Commit 34292de

Browse files
author
Fei Wang
committed
lib: os: mpsc_pbuf: fix potential semaphore wait forever
One thread calls mpsc_pbuf_alloc to produce data, which invokes add_skip_item and steps into k_sem_take. Another thread calls mpsc_pbuf_claim to consume data. In this condition, mpsc_pbuf_claim has only small remaining space and needs to call rd_idx_inc to reserve space, but there is still no data available. The consumer should call k_sem_give to wake mpsc_pbuf_alloc again, so the producer can allocate space and continue producing data. Without this wake-up, the producer thread may wait forever in k_sem_take, leading to a deadlock situation. Signed-off-by: Fei Wang <fei.wang@jaguarmicro.com>
1 parent e438b57 commit 34292de

File tree

2 files changed

+156
-0
lines changed

2 files changed

+156
-0
lines changed

lib/os/mpsc_pbuf.c

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,9 @@ const union mpsc_pbuf_generic *mpsc_pbuf_claim(struct mpsc_pbuf_buffer *buffer)
537537
{
538538
union mpsc_pbuf_generic *item;
539539
bool cont;
540+
#ifdef CONFIG_MULTITHREADING
541+
bool need_post = false;
542+
#endif
540543

541544
do {
542545
uint32_t a;
@@ -562,6 +565,9 @@ const union mpsc_pbuf_generic *mpsc_pbuf_claim(struct mpsc_pbuf_buffer *buffer)
562565
idx_inc(buffer, buffer->tmp_rd_idx, inc);
563566
rd_idx_inc(buffer, inc);
564567
cont = true;
568+
#ifdef CONFIG_MULTITHREADING
569+
need_post = true;
570+
#endif
565571
} else {
566572
item->hdr.busy = 1;
567573
buffer->tmp_rd_idx =
@@ -576,6 +582,11 @@ const union mpsc_pbuf_generic *mpsc_pbuf_claim(struct mpsc_pbuf_buffer *buffer)
576582
k_spin_unlock(&buffer->lock, key);
577583
} while (cont);
578584

585+
#ifdef CONFIG_MULTITHREADING
586+
if (need_post && item == NULL) {
587+
k_sem_give(&buffer->sem);
588+
}
589+
#endif
579590
return item;
580591
}
581592

tests/lib/mpsc_pbuf/src/main.c

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1034,6 +1034,151 @@ void start_threads(struct mpsc_pbuf_buffer *buffer)
10341034
}
10351035
}
10361036

1037+
static uint32_t buf32_1[512];
1038+
struct test_msg_data {
1039+
union mpsc_pbuf_generic hdr;
1040+
uint32_t buf_len; /*contain buf_len and buf*/
1041+
uint8_t buf[sizeof(buf32_1)];
1042+
};
1043+
1044+
static struct mpsc_pbuf_buffer mpsc_buffer;
1045+
1046+
static uint32_t test_data_product_max_cnt = 2;
1047+
static uint32_t test_data_product_cnt;
1048+
static uint32_t test_data_consumer_cnt;
1049+
K_THREAD_STACK_DEFINE(t3_stack, 1024);
1050+
static struct k_thread threads_t3;
1051+
static k_tid_t tid3;
1052+
1053+
void
1054+
test_mpsc_notify_drop(const struct mpsc_pbuf_buffer *buffer,
1055+
const union mpsc_pbuf_generic *packet)
1056+
{
1057+
struct test_msg_data *msg_data = NULL;
1058+
1059+
msg_data = CONTAINER_OF(packet, struct test_msg_data, hdr);
1060+
1061+
PRINT("msg data len = %d\n", msg_data->buf_len);
1062+
/*never drop*/
1063+
}
1064+
1065+
uint32_t
1066+
test_mpsc_get_used_len(const union mpsc_pbuf_generic *packet)
1067+
{
1068+
uint32_t size = 0;
1069+
struct test_msg_data *msg_data = NULL;
1070+
1071+
msg_data = CONTAINER_OF(packet, struct test_msg_data, hdr);
1072+
1073+
size = msg_data->buf_len + sizeof(union mpsc_pbuf_generic);
1074+
size = ROUND_UP(size, sizeof(uint32_t)); /*return number of uint32_t */
1075+
size = size / sizeof(uint32_t);
1076+
return size;
1077+
}
1078+
1079+
void t_data_consumer_entry(void *p0, void *p1, void *p2)
1080+
{
1081+
uint32_t read_cnt = 0;
1082+
bool wait = true;
1083+
const union mpsc_pbuf_generic *msg = NULL;
1084+
const struct test_msg_data *test_data = NULL;
1085+
struct mpsc_pbuf_buffer *buffer = (struct mpsc_pbuf_buffer *)p0;
1086+
1087+
while (1) {
1088+
if (msg == NULL) {
1089+
msg = mpsc_pbuf_claim(buffer);
1090+
}
1091+
test_data = (const struct test_msg_data *)msg;
1092+
if (test_data == NULL) {
1093+
continue;
1094+
}
1095+
if (test_data && buffer->wr_idx == 0) {
1096+
wait = false;
1097+
}
1098+
if (wait == true) {
1099+
continue;
1100+
}
1101+
read_cnt++;
1102+
mpsc_pbuf_free(buffer, msg);
1103+
msg = NULL;
1104+
if (read_cnt == test_data_product_max_cnt) {
1105+
break;
1106+
}
1107+
1108+
}
1109+
test_data_consumer_cnt = read_cnt;
1110+
}
1111+
1112+
/* test mpsc_pbuf_alloc can get sem_take
1113+
* one thread product data, one thread consumer data
1114+
* requirement:
1115+
* consumer slow process data
1116+
* step:
1117+
* 1:product data len is 0.75 of cfg.size
1118+
* 2:run product times
1119+
*
1120+
*/
1121+
ZTEST(log_buffer, test_sema_lock)
1122+
{
1123+
struct test_msg_data test_data;
1124+
struct test_msg_data *item = NULL;
1125+
struct mpsc_pbuf_buffer_config cfg;
1126+
uint32_t loop = 0;
1127+
size_t wlen = 0;
1128+
bool fist_wait = true;
1129+
1130+
if (CONFIG_SYS_CLOCK_TICKS_PER_SEC < 10000) {
1131+
ztest_test_skip();
1132+
}
1133+
1134+
cfg.buf = buf32_1;
1135+
cfg.size = ARRAY_SIZE(buf32_1);
1136+
cfg.get_wlen = test_mpsc_get_used_len;
1137+
cfg.notify_drop = test_mpsc_notify_drop;
1138+
1139+
memset(&mpsc_buffer, 0, sizeof(mpsc_buffer));
1140+
mpsc_pbuf_init(&mpsc_buffer, &cfg);
1141+
1142+
tid3 = k_thread_create(&threads_t3, t3_stack, 1024,
1143+
t_data_consumer_entry,
1144+
&mpsc_buffer, NULL, NULL,
1145+
10, 0, K_NO_WAIT);
1146+
k_thread_name_set(&threads_t3, "test_mpsc_consumer");
1147+
for (loop = 0; loop < test_data_product_max_cnt; loop++) {
1148+
if (loop == 0) {
1149+
test_data.buf_len = 1600;
1150+
} else {
1151+
test_data.buf_len = 1800;
1152+
}
1153+
test_data.buf_len = ROUND_UP(test_data.buf_len, sizeof(uint32_t));
1154+
memset(test_data.buf, loop + 1, test_data.buf_len);
1155+
1156+
wlen = test_data.buf_len + sizeof(test_data.buf_len) +
1157+
sizeof(test_data.hdr);
1158+
wlen = ROUND_UP(wlen, sizeof(uint32_t));
1159+
wlen = wlen / sizeof(uint32_t);
1160+
1161+
if (fist_wait && test_data.buf_len == 1800) {
1162+
PRINT(" mpsc sema wait\n");
1163+
}
1164+
item = (struct test_msg_data *)mpsc_pbuf_alloc(&mpsc_buffer,
1165+
wlen, K_FOREVER);
1166+
item->hdr.raw = 0;
1167+
memcpy(item->buf, test_data.buf, test_data.buf_len);
1168+
item->buf_len = test_data.buf_len + sizeof(test_data.buf_len);
1169+
mpsc_pbuf_commit(&mpsc_buffer, &item->hdr);
1170+
if (fist_wait && test_data.buf_len == 1800) {
1171+
PRINT(" mpsc sema wake\n");
1172+
fist_wait = false;
1173+
}
1174+
test_data_product_cnt++;
1175+
}
1176+
k_thread_join(tid3, K_FOREVER);
1177+
zassert_equal(test_data_product_cnt,
1178+
test_data_product_max_cnt, "product %d consume %d",
1179+
test_data_product_cnt, test_data_consumer_cnt);
1180+
1181+
}
10371182
/* Test creates two threads which pends on the buffer until there is a space
10381183
* available. When enough buffers is released threads are woken up and they
10391184
* allocate packets.

0 commit comments

Comments
 (0)