Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions lib/os/mpsc_pbuf.c
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,9 @@ const union mpsc_pbuf_generic *mpsc_pbuf_claim(struct mpsc_pbuf_buffer *buffer)
{
union mpsc_pbuf_generic *item;
bool cont;
#ifdef CONFIG_MULTITHREADING
bool need_post = false;
#endif

do {
uint32_t a;
Expand All @@ -562,6 +565,9 @@ const union mpsc_pbuf_generic *mpsc_pbuf_claim(struct mpsc_pbuf_buffer *buffer)
idx_inc(buffer, buffer->tmp_rd_idx, inc);
rd_idx_inc(buffer, inc);
cont = true;
#ifdef CONFIG_MULTITHREADING
need_post = true;
#endif
} else {
item->hdr.busy = 1;
buffer->tmp_rd_idx =
Expand All @@ -576,6 +582,11 @@ const union mpsc_pbuf_generic *mpsc_pbuf_claim(struct mpsc_pbuf_buffer *buffer)
k_spin_unlock(&buffer->lock, key);
} while (cont);

#ifdef CONFIG_MULTITHREADING
if (need_post && item == NULL) {
k_sem_give(&buffer->sem);
}
#endif
return item;
}

Expand Down
145 changes: 145 additions & 0 deletions tests/lib/mpsc_pbuf/src/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -1034,6 +1034,151 @@ void start_threads(struct mpsc_pbuf_buffer *buffer)
}
}

static uint32_t buf32_1[512];
struct test_msg_data {
union mpsc_pbuf_generic hdr;
uint32_t buf_len; /*contain buf_len and buf*/
uint8_t buf[sizeof(buf32_1)];
};

static struct mpsc_pbuf_buffer mpsc_buffer;

static uint32_t test_data_product_max_cnt = 2;
static uint32_t test_data_product_cnt;
static uint32_t test_data_consumer_cnt;
K_THREAD_STACK_DEFINE(t3_stack, 1024);
static struct k_thread threads_t3;
static k_tid_t tid3;

void
test_mpsc_notify_drop(const struct mpsc_pbuf_buffer *buffer,
const union mpsc_pbuf_generic *packet)
{
struct test_msg_data *msg_data = NULL;

msg_data = CONTAINER_OF(packet, struct test_msg_data, hdr);

PRINT("msg data len = %d\n", msg_data->buf_len);
/*never drop*/
}

uint32_t
test_mpsc_get_used_len(const union mpsc_pbuf_generic *packet)
{
uint32_t size = 0;
struct test_msg_data *msg_data = NULL;

msg_data = CONTAINER_OF(packet, struct test_msg_data, hdr);

size = msg_data->buf_len + sizeof(union mpsc_pbuf_generic);
size = ROUND_UP(size, sizeof(uint32_t)); /*return number of uint32_t */
size = size / sizeof(uint32_t);
return size;
}

void t_data_consumer_entry(void *p0, void *p1, void *p2)
{
uint32_t read_cnt = 0;
bool wait = true;
const union mpsc_pbuf_generic *msg = NULL;
const struct test_msg_data *test_data = NULL;
struct mpsc_pbuf_buffer *buffer = (struct mpsc_pbuf_buffer *)p0;

while (1) {
if (msg == NULL) {
msg = mpsc_pbuf_claim(buffer);
}
test_data = (const struct test_msg_data *)msg;
if (test_data == NULL) {
continue;
}
if (test_data && buffer->wr_idx == 0) {
wait = false;
}
if (wait == true) {
continue;
}
read_cnt++;
mpsc_pbuf_free(buffer, msg);
msg = NULL;
if (read_cnt == test_data_product_max_cnt) {
break;
}

}
test_data_consumer_cnt = read_cnt;
}

/* test mpsc_pbuf_alloc can get sem_take
* one thread product data, one thread consumer data
* requirement:
* consumer slow process data
* step:
* 1:product data len is 0.75 of cfg.size
* 2:run product times
*
*/
ZTEST(log_buffer, test_sema_lock)
{
struct test_msg_data test_data;
struct test_msg_data *item = NULL;
struct mpsc_pbuf_buffer_config cfg;
uint32_t loop = 0;
size_t wlen = 0;
bool fist_wait = true;

if (CONFIG_SYS_CLOCK_TICKS_PER_SEC < 10000) {
ztest_test_skip();
}

cfg.buf = buf32_1;
cfg.size = ARRAY_SIZE(buf32_1);
cfg.get_wlen = test_mpsc_get_used_len;
cfg.notify_drop = test_mpsc_notify_drop;

memset(&mpsc_buffer, 0, sizeof(mpsc_buffer));
mpsc_pbuf_init(&mpsc_buffer, &cfg);

tid3 = k_thread_create(&threads_t3, t3_stack, 1024,
t_data_consumer_entry,
&mpsc_buffer, NULL, NULL,
10, 0, K_NO_WAIT);
k_thread_name_set(&threads_t3, "test_mpsc_consumer");
for (loop = 0; loop < test_data_product_max_cnt; loop++) {
if (loop == 0) {
test_data.buf_len = 1600;
} else {
test_data.buf_len = 1800;
}
test_data.buf_len = ROUND_UP(test_data.buf_len, sizeof(uint32_t));
memset(test_data.buf, loop + 1, test_data.buf_len);

wlen = test_data.buf_len + sizeof(test_data.buf_len) +
sizeof(test_data.hdr);
wlen = ROUND_UP(wlen, sizeof(uint32_t));
wlen = wlen / sizeof(uint32_t);

if (fist_wait && test_data.buf_len == 1800) {
PRINT(" mpsc sema wait\n");
}
item = (struct test_msg_data *)mpsc_pbuf_alloc(&mpsc_buffer,
wlen, K_FOREVER);
item->hdr.raw = 0;
memcpy(item->buf, test_data.buf, test_data.buf_len);
item->buf_len = test_data.buf_len + sizeof(test_data.buf_len);
mpsc_pbuf_commit(&mpsc_buffer, &item->hdr);
if (fist_wait && test_data.buf_len == 1800) {
PRINT(" mpsc sema wake\n");
fist_wait = false;
}
test_data_product_cnt++;
}
k_thread_join(tid3, K_FOREVER);
zassert_equal(test_data_product_cnt,
test_data_product_max_cnt, "product %d consume %d",
test_data_product_cnt, test_data_consumer_cnt);

}
/* Test creates two threads which pends on the buffer until there is a space
* available. When enough buffers is released threads are woken up and they
* allocate packets.
Expand Down
Loading