Skip to content

Commit 4dafc06

Browse files
author
Fei Wang
committed
lib: os: mpsc_pbuf: fix potential semaphore wait forever
One thread calls mpsc_pbuf_alloc to produce data, which invokes add_skip_item and steps into k_sem_take. Another thread calls mpsc_pbuf_claim to consume data. In this condition, mpsc_pbuf_claim has only small remaining space and needs to call rd_idx_inc to reserve space, but there is still no data available. The consumer should call k_sem_give to wake mpsc_pbuf_alloc again, so the producer can allocate space and continue producing data. Without this wake-up, the producer thread may wait forever in k_sem_take, leading to a deadlock situation. Signed-off-by: Fei Wang <fei.wang@jaguarmicro.com>
1 parent e438b57 commit 4dafc06

File tree

2 files changed

+149
-0
lines changed

2 files changed

+149
-0
lines changed

lib/os/mpsc_pbuf.c

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,9 @@ const union mpsc_pbuf_generic *mpsc_pbuf_claim(struct mpsc_pbuf_buffer *buffer)
537537
{
538538
union mpsc_pbuf_generic *item;
539539
bool cont;
540+
#ifdef CONFIG_MULTITHREADING
541+
bool need_post = false;
542+
#endif
540543

541544
do {
542545
uint32_t a;
@@ -562,6 +565,9 @@ const union mpsc_pbuf_generic *mpsc_pbuf_claim(struct mpsc_pbuf_buffer *buffer)
562565
idx_inc(buffer, buffer->tmp_rd_idx, inc);
563566
rd_idx_inc(buffer, inc);
564567
cont = true;
568+
#ifdef CONFIG_MULTITHREADING
569+
need_post = true;
570+
#endif
565571
} else {
566572
item->hdr.busy = 1;
567573
buffer->tmp_rd_idx =
@@ -576,6 +582,11 @@ const union mpsc_pbuf_generic *mpsc_pbuf_claim(struct mpsc_pbuf_buffer *buffer)
576582
k_spin_unlock(&buffer->lock, key);
577583
} while (cont);
578584

585+
#ifdef CONFIG_MULTITHREADING
586+
if (need_post && item == NULL) {
587+
k_sem_give(&buffer->sem);
588+
}
589+
#endif
579590
return item;
580591
}
581592

tests/lib/mpsc_pbuf/src/main.c

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1034,6 +1034,144 @@ void start_threads(struct mpsc_pbuf_buffer *buffer)
10341034
}
10351035
}
10361036

1037+
static uint32_t buf32_1[512];
1038+
struct test_msg_data {
1039+
union mpsc_pbuf_generic hdr;
1040+
uint32_t buf_len; /*contain buf_len and buf*/
1041+
uint8_t buf[sizeof(buf32_1)];
1042+
};
1043+
1044+
static struct mpsc_pbuf_buffer mpsc_buffer;
1045+
1046+
static uint32_t test_data_product_max_cnt = 2;
1047+
static uint32_t test_data_product_cnt;
1048+
static uint32_t test_data_consumer_cnt;
1049+
K_THREAD_STACK_DEFINE(t3_stack, 1024);
1050+
static struct k_thread threads_t3;
1051+
static k_tid_t tid3;
1052+
1053+
void
1054+
test_mpsc_notify_drop(const struct mpsc_pbuf_buffer *buffer,
1055+
const union mpsc_pbuf_generic *packet)
1056+
{
1057+
struct test_msg_data *msg_data = NULL;
1058+
1059+
msg_data = CONTAINER_OF(packet, struct test_msg_data, hdr);
1060+
1061+
PRINT("msg data len = %d\n", msg_data->buf_len);
1062+
/*never drop*/
1063+
}
1064+
1065+
uint32_t
1066+
test_mpsc_get_used_len(const union mpsc_pbuf_generic *packet)
1067+
{
1068+
uint32_t size = 0;
1069+
struct test_msg_data *msg_data = NULL;
1070+
1071+
msg_data = CONTAINER_OF(packet, struct test_msg_data, hdr);
1072+
1073+
size = msg_data->buf_len + sizeof(union mpsc_pbuf_generic);
1074+
size = ROUND_UP(size, sizeof(uint32_t)); /*return number of uint32_t */
1075+
size = size / sizeof(uint32_t);
1076+
return size;
1077+
}
1078+
1079+
void t_data_consumer_entry(void *p0, void *p1, void *p2)
1080+
{
1081+
uint32_t read_cnt = 0;
1082+
bool wait = true;
1083+
const union mpsc_pbuf_generic *msg = NULL;
1084+
const struct test_msg_data *test_data = NULL;
1085+
struct mpsc_pbuf_buffer *buffer = (struct mpsc_pbuf_buffer *)p0;
1086+
1087+
while (1) {
1088+
msg = mpsc_pbuf_claim(buffer);
1089+
test_data = (const struct test_msg_data *)msg;
1090+
if (test_data == NULL) {
1091+
continue;
1092+
}
1093+
if (test_data && buffer->wr_idx == 0) {
1094+
wait = false;
1095+
}
1096+
if (wait == true) {
1097+
continue;
1098+
}
1099+
read_cnt++;
1100+
mpsc_pbuf_free(buffer, msg);
1101+
if (read_cnt == test_data_product_max_cnt) {
1102+
break;
1103+
}
1104+
1105+
}
1106+
test_data_consumer_cnt = read_cnt;
1107+
}
1108+
1109+
/* test mpsc_pbuf_alloc can get sem_take
1110+
* one thread product data, one thread consumer data
1111+
* requirement:
1112+
* consumer slow process data
1113+
* step:
1114+
* 1:product data len is 0.75 of cfg.size
1115+
* 2:run product times
1116+
*
1117+
*/
1118+
ZTEST(log_buffer, test_sema_lock)
1119+
{
1120+
struct test_msg_data test_data;
1121+
struct test_msg_data *item = NULL;
1122+
struct mpsc_pbuf_buffer_config cfg;
1123+
uint32_t loop = 0;
1124+
size_t wlen = 0;
1125+
bool fist_wait = true;
1126+
1127+
cfg.buf = buf32_1;
1128+
cfg.size = ARRAY_SIZE(buf32_1);
1129+
cfg.get_wlen = test_mpsc_get_used_len;
1130+
cfg.notify_drop = test_mpsc_notify_drop;
1131+
1132+
memset(&mpsc_buffer, 0, sizeof(mpsc_buffer));
1133+
mpsc_pbuf_init(&mpsc_buffer, &cfg);
1134+
1135+
tid3 = k_thread_create(&threads_t3, t3_stack, 1024,
1136+
t_data_consumer_entry,
1137+
&mpsc_buffer, NULL, NULL,
1138+
10, 0, K_NO_WAIT);
1139+
k_thread_name_set(&threads_t3, "test_mpsc_consumer");
1140+
for (loop = 0; loop < test_data_product_max_cnt; loop++) {
1141+
if (loop == 0) {
1142+
test_data.buf_len = 1600;
1143+
} else {
1144+
test_data.buf_len = 1800;
1145+
}
1146+
test_data.buf_len = ROUND_UP(test_data.buf_len, sizeof(uint32_t));
1147+
memset(test_data.buf, loop + 1, test_data.buf_len);
1148+
1149+
wlen = test_data.buf_len + sizeof(test_data.buf_len) +
1150+
sizeof(test_data.hdr);
1151+
wlen = ROUND_UP(wlen, sizeof(uint32_t));
1152+
wlen = wlen / sizeof(uint32_t);
1153+
1154+
if (fist_wait && test_data.buf_len == 1800) {
1155+
PRINT(" mpsc sema wait\n");
1156+
}
1157+
item = (struct test_msg_data *)mpsc_pbuf_alloc(&mpsc_buffer,
1158+
wlen, K_FOREVER);
1159+
item->hdr.raw = 0;
1160+
memcpy(item->buf, test_data.buf, test_data.buf_len);
1161+
item->buf_len = test_data.buf_len + sizeof(test_data.buf_len);
1162+
mpsc_pbuf_commit(&mpsc_buffer, &item->hdr);
1163+
if (fist_wait && test_data.buf_len == 1800) {
1164+
PRINT(" mpsc sema wake\n");
1165+
fist_wait = false;
1166+
}
1167+
test_data_product_cnt++;
1168+
}
1169+
k_thread_join(tid3, K_FOREVER);
1170+
zassert_equal(test_data_product_cnt,
1171+
test_data_product_max_cnt, "product %d consume %d",
1172+
test_data_product_cnt, test_data_consumer_cnt);
1173+
1174+
}
10371175
/* Test creates two threads which pends on the buffer until there is a space
10381176
* available. When enough buffers is released threads are woken up and they
10391177
* allocate packets.

0 commit comments

Comments
 (0)