Zephyr API Documentation 4.4.99
A Scalable Open Source RTOS
Loading...
Searching...
No Matches
rtio.h
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: Copyright (c) 2022 Intel Corporation
3 * SPDX-FileCopyrightText: <text>Copyright (c) 2026 Infineon Technologies AG,
4 * or an affiliate of Infineon Technologies AG. All rights reserved.</text>
5 *
6 * SPDX-License-Identifier: Apache-2.0
7 */
8
27
28#ifndef ZEPHYR_INCLUDE_RTIO_RTIO_H_
29#define ZEPHYR_INCLUDE_RTIO_RTIO_H_
30
31#include <string.h>
32
34#include <zephyr/device.h>
35#include <zephyr/kernel.h>
36#include <zephyr/sys/__assert.h>
37#include <zephyr/sys/atomic.h>
39#include <zephyr/sys/util.h>
41#include <zephyr/rtio/sqe.h>
42#include <zephyr/rtio/cqe.h>
43#include <zephyr/rtio/iodev.h>
44
45#ifdef __cplusplus
46extern "C" {
47#endif
48
49
58
70struct rtio {
71#ifdef CONFIG_RTIO_SUBMIT_SEM
72 /* A wait semaphore which may suspend the calling thread
73 * to wait for some number of completions when calling submit
74 */
75 struct k_sem *submit_sem;
76
77 uint32_t submit_count;
78#endif
79
80#ifdef CONFIG_RTIO_CONSUME_SEM
81 /* A wait semaphore which may suspend the calling thread
82 * to wait for some number of completions while consuming
83 * them from the completion queue
84 */
85 struct k_sem *consume_sem;
86#endif
87
88 /* Total number of completions */
90
91 /* Number of completions that were unable to be submitted with results
92 * due to the cq spsc being full
93 */
95
96 /* Submission queue object pool with free list */
97 struct rtio_sqe_pool *sqe_pool;
98
99 /* Complete queue object pool with free list */
100 struct rtio_cqe_pool *cqe_pool;
101
102#ifdef CONFIG_RTIO_SYS_MEM_BLOCKS
103 /* Mem block pool */
104 struct sys_mem_blocks *block_pool;
105#endif
106
107 /* Submission queue */
108 struct mpsc sq;
109
110 /* Completion queue */
111 struct mpsc cq;
112};
113
114/* @cond ignore */
115#define Z_RTIO_DEFINE(name, _sqe_pool, _cqe_pool, _block_pool) \
116 IF_ENABLED(CONFIG_RTIO_SUBMIT_SEM, \
117 (static K_SEM_DEFINE(CONCAT(_submit_sem_, name), 0, K_SEM_MAX_LIMIT))) \
118 IF_ENABLED(CONFIG_RTIO_CONSUME_SEM, \
119 (static K_SEM_DEFINE(CONCAT(_consume_sem_, name), 0, K_SEM_MAX_LIMIT))) \
120 STRUCT_SECTION_ITERABLE(rtio, name) = { \
121 IF_ENABLED(CONFIG_RTIO_SUBMIT_SEM, (.submit_sem = &CONCAT(_submit_sem_, name),)) \
122 IF_ENABLED(CONFIG_RTIO_SUBMIT_SEM, (.submit_count = 0,)) \
123 IF_ENABLED(CONFIG_RTIO_CONSUME_SEM, (.consume_sem = &CONCAT(_consume_sem_, name),))\
124 .cq_count = ATOMIC_INIT(0), \
125 .xcqcnt = ATOMIC_INIT(0), \
126 .sqe_pool = _sqe_pool, \
127 .cqe_pool = _cqe_pool, \
128 IF_ENABLED(CONFIG_RTIO_SYS_MEM_BLOCKS, (.block_pool = _block_pool,)) \
129 .sq = MPSC_INIT((name.sq)), \
130 .cq = MPSC_INIT((name.cq)), \
131 }
132/* @endcond */
133
141#define RTIO_DEFINE(name, sq_sz, cq_sz) \
142 Z_RTIO_SQE_POOL_DEFINE(CONCAT(name, _sqe_pool), sq_sz); \
143 Z_RTIO_CQE_POOL_DEFINE(CONCAT(name, _cqe_pool), cq_sz); \
144 Z_RTIO_DEFINE(name, &CONCAT(name, _sqe_pool), \
145 &CONCAT(name, _cqe_pool), NULL)
146
147
155static inline size_t rtio_mempool_block_size(const struct rtio *r)
156{
157#ifndef CONFIG_RTIO_SYS_MEM_BLOCKS
158 ARG_UNUSED(r);
159 return 0;
160#else
161 if (r == NULL || r->block_pool == NULL) {
162 return 0;
163 }
164 return BIT(r->block_pool->info.blk_sz_shift);
165#endif
166}
167
175#if defined(CONFIG_RTIO_SYS_MEM_BLOCKS) || defined(__DOXYGEN__)
176static inline uint16_t __rtio_compute_mempool_block_index(const struct rtio *r, const void *ptr)
177{
178 uintptr_t addr = (uintptr_t)ptr;
179 struct sys_mem_blocks *mem_pool = r->block_pool;
180 uint32_t block_size = rtio_mempool_block_size(r);
181
182 uintptr_t buff = (uintptr_t)mem_pool->buffer;
183 uint32_t buff_size = mem_pool->info.num_blocks * block_size;
184
185 if (addr < buff || addr >= buff + buff_size) {
186 return UINT16_MAX;
187 }
188 return (addr - buff) / block_size;
189}
190#endif
191
192static inline int rtio_block_pool_alloc(struct rtio *r, size_t min_sz,
193 size_t max_sz, uint8_t **buf, uint32_t *buf_len)
194{
195#ifndef CONFIG_RTIO_SYS_MEM_BLOCKS
196 ARG_UNUSED(r);
197 ARG_UNUSED(min_sz);
198 ARG_UNUSED(max_sz);
199 ARG_UNUSED(buf);
200 ARG_UNUSED(buf_len);
201 return -ENOTSUP;
202#else
203 const uint32_t block_size = rtio_mempool_block_size(r);
204 uint32_t bytes = max_sz;
205
206 /* Not every context has a block pool and the block size may return 0 in
207 * that case
208 */
209 if (block_size == 0) {
210 return -ENOMEM;
211 }
212
213 do {
214 size_t num_blks = DIV_ROUND_UP(bytes, block_size);
215 int rc = sys_mem_blocks_alloc_contiguous(r->block_pool, num_blks, (void **)buf);
216
217 if (rc == 0) {
218 *buf_len = num_blks * block_size;
219 return 0;
220 }
221
222 if (bytes <= block_size) {
223 break;
224 }
225
226 bytes -= block_size;
227 } while (bytes >= min_sz);
228
229 return -ENOMEM;
230#endif
231}
232
233static inline void rtio_block_pool_free(struct rtio *r, void *buf, uint32_t buf_len)
234{
235#ifndef CONFIG_RTIO_SYS_MEM_BLOCKS
236 ARG_UNUSED(r);
237 ARG_UNUSED(buf);
238 ARG_UNUSED(buf_len);
239#else
240 size_t num_blks = buf_len >> r->block_pool->info.blk_sz_shift;
241
242 sys_mem_blocks_free_contiguous(r->block_pool, buf, num_blks);
243#endif
244}
245
246
248extern struct k_mem_partition rtio_partition;
249
250
251/* Do not try and reformat the macros */
252
262#if CONFIG_RTIO_BLOCK_POOL_PLACEMENT_DTCM
263#define RTIO_BMEM Z_GENERIC_SECTION(".dtcm_bss") static
264#elif defined(CONFIG_RTIO_BLOCK_POOL_PLACEMENT_NOCACHE)
265#define RTIO_BMEM __nocache static
266#else
267#define RTIO_BMEM COND_CODE_1(CONFIG_USERSPACE, (K_APP_BMEM(rtio_partition) static), (static))
268#endif
269
279#if CONFIG_RTIO_BLOCK_POOL_PLACEMENT_DTCM
280#define RTIO_DMEM Z_GENERIC_SECTION(".dtcm_data") static
281#elif defined(CONFIG_RTIO_BLOCK_POOL_PLACEMENT_NOCACHE)
282#define RTIO_DMEM __nocache_load static
283#else
284#define RTIO_DMEM COND_CODE_1(CONFIG_USERSPACE, (K_APP_DMEM(rtio_partition) static), (static))
285#endif
286
287/* clang-format off */
288/* @cond ignore */
289#define Z_RTIO_BLOCK_POOL_DEFINE(name, blk_sz, blk_cnt, blk_align) \
290 RTIO_BMEM uint8_t __aligned(WB_UP(blk_align)) \
291 CONCAT(_block_pool_, name)[blk_cnt*WB_UP(blk_sz)]; \
292 _SYS_MEM_BLOCKS_DEFINE_WITH_EXT_BUF(name, WB_UP(blk_sz), blk_cnt, \
293 CONCAT(_block_pool_, name), RTIO_DMEM)
294
295/* @endcond */
296
297
298/* clang-format on */
299
310#define RTIO_DEFINE_WITH_MEMPOOL(name, sq_sz, cq_sz, num_blks, blk_size, balign) \
311 Z_RTIO_SQE_POOL_DEFINE(name##_sqe_pool, sq_sz); \
312 Z_RTIO_CQE_POOL_DEFINE(name##_cqe_pool, cq_sz); \
313 Z_RTIO_BLOCK_POOL_DEFINE(name##_block_pool, blk_size, num_blks, balign); \
314 Z_RTIO_DEFINE(name, &name##_sqe_pool, &name##_cqe_pool, &name##_block_pool)
315
316/* clang-format on */
317
325static inline uint32_t rtio_sqe_acquirable(struct rtio *r)
326{
327 return r->sqe_pool->pool_free;
328}
329
338static inline struct rtio_sqe *rtio_sqe_acquire(struct rtio *r)
339{
340 SYS_PORT_TRACING_FUNC_ENTER(rtio, sqe_acquire, r);
341 struct rtio_iodev_sqe *iodev_sqe = rtio_sqe_pool_alloc(r->sqe_pool);
342
343 if (iodev_sqe == NULL) {
344 SYS_PORT_TRACING_FUNC_EXIT(rtio, sqe_acquire, r, NULL);
345 return NULL;
346 }
347
348 mpsc_push(&r->sq, &iodev_sqe->q);
349
350 SYS_PORT_TRACING_FUNC_EXIT(rtio, sqe_acquire, r, &iodev_sqe->sqe);
351 return &iodev_sqe->sqe;
352}
353
366static inline int rtio_sqe_acquire_array(struct rtio *r, size_t n, struct rtio_sqe **sqes)
367{
368 struct rtio_iodev_sqe *iodev_sqe;
369 size_t i;
370
371 for (i = 0; i < n; i++) {
372 iodev_sqe = rtio_sqe_pool_alloc(r->sqe_pool);
373 if (iodev_sqe == NULL) {
374 break;
375 }
376 sqes[i] = &iodev_sqe->sqe;
377 }
378
379 /* Not enough SQEs in the pool */
380 if (i < n) {
381 while (i > 0) {
382 i--;
383 iodev_sqe = CONTAINER_OF(sqes[i], struct rtio_iodev_sqe, sqe);
384 rtio_sqe_pool_free(r->sqe_pool, iodev_sqe);
385 sqes[i] = NULL;
386 }
387
388 return -ENOMEM;
389 }
390
391 for (i = 0; i < n; i++) {
392 iodev_sqe = CONTAINER_OF(sqes[i], struct rtio_iodev_sqe, sqe);
393 mpsc_push(&r->sq, &iodev_sqe->q);
394 }
395
396 return 0;
397}
398
404static inline void rtio_sqe_drop_all(struct rtio *r)
405{
406 struct rtio_iodev_sqe *iodev_sqe;
407 struct mpsc_node *node = mpsc_pop(&r->sq);
408
409 while (node != NULL) {
410 iodev_sqe = CONTAINER_OF(node, struct rtio_iodev_sqe, q);
411 rtio_sqe_pool_free(r->sqe_pool, iodev_sqe);
412 node = mpsc_pop(&r->sq);
413 }
414}
415
419static inline struct rtio_cqe *rtio_cqe_acquire(struct rtio *r)
420{
421 SYS_PORT_TRACING_FUNC_ENTER(rtio, cqe_acquire, r);
422 struct rtio_cqe *cqe = rtio_cqe_pool_alloc(r->cqe_pool);
423
424 if (cqe == NULL) {
425 SYS_PORT_TRACING_FUNC_EXIT(rtio, cqe_acquire, r, NULL);
426 return NULL;
427 }
428
429 memset(cqe, 0, sizeof(struct rtio_cqe));
430
431 SYS_PORT_TRACING_FUNC_EXIT(rtio, cqe_acquire, r, cqe);
432 return cqe;
433}
434
438static inline void rtio_cqe_produce(struct rtio *r, struct rtio_cqe *cqe)
439{
440 mpsc_push(&r->cq, &cqe->q);
441}
442
454static inline struct rtio_cqe *rtio_cqe_consume(struct rtio *r)
455{
456 SYS_PORT_TRACING_FUNC_ENTER(rtio, cqe_consume, r);
457 struct mpsc_node *node;
458 struct rtio_cqe *cqe = NULL;
459
460#ifdef CONFIG_RTIO_CONSUME_SEM
461 if (k_sem_take(r->consume_sem, K_NO_WAIT) != 0) {
462 SYS_PORT_TRACING_FUNC_EXIT(rtio, cqe_consume, r, NULL);
463 return NULL;
464 }
465#endif
466
467 node = mpsc_pop(&r->cq);
468 if (node == NULL) {
469 SYS_PORT_TRACING_FUNC_EXIT(rtio, cqe_consume, r, NULL);
470 return NULL;
471 }
472 cqe = CONTAINER_OF(node, struct rtio_cqe, q);
473
474 SYS_PORT_TRACING_FUNC_EXIT(rtio, cqe_consume, r, cqe);
475 return cqe;
476}
477
488static inline struct rtio_cqe *rtio_cqe_consume_block(struct rtio *r)
489{
490 struct mpsc_node *node;
491 struct rtio_cqe *cqe;
492
493#ifdef CONFIG_RTIO_CONSUME_SEM
494 k_sem_take(r->consume_sem, K_FOREVER);
495#endif
496 node = mpsc_pop(&r->cq);
497 while (node == NULL) {
498 Z_SPIN_DELAY(1);
499 node = mpsc_pop(&r->cq);
500 }
501 cqe = CONTAINER_OF(node, struct rtio_cqe, q);
502
503 return cqe;
504}
505
512static inline void rtio_cqe_release(struct rtio *r, struct rtio_cqe *cqe)
513{
514 SYS_PORT_TRACING_FUNC(rtio, cqe_release, r, cqe);
515 rtio_cqe_pool_free(r->cqe_pool, cqe);
516}
517
526static inline int rtio_flush_completion_queue(struct rtio *r)
527{
528 struct rtio_cqe *cqe;
529 int res = 0;
530
531 do {
532 cqe = rtio_cqe_consume(r);
533 if (cqe != NULL) {
534 if ((cqe->result < 0) && (res == 0)) {
535 res = cqe->result;
536 }
537 rtio_cqe_release(r, cqe);
538 }
539 } while (cqe != NULL);
540
541 return res;
542}
543
555__syscall void rtio_sqe_signal(struct rtio_sqe *sqe);
556
557static inline void z_impl_rtio_sqe_signal(struct rtio_sqe *sqe)
558{
559 struct rtio_iodev_sqe *iodev_sqe = CONTAINER_OF(sqe, struct rtio_iodev_sqe, sqe);
560
561 if (!atomic_cas(&iodev_sqe->sqe.await.ok, 0, 1)) {
562 iodev_sqe->sqe.await.callback(iodev_sqe, iodev_sqe->sqe.await.userdata);
563 }
564}
565
572static inline uint32_t rtio_cqe_compute_flags(struct rtio_iodev_sqe *iodev_sqe)
573{
574 uint32_t flags = 0;
575
576#ifdef CONFIG_RTIO_SYS_MEM_BLOCKS
577 if (iodev_sqe->sqe.op == RTIO_OP_RX && iodev_sqe->sqe.flags & RTIO_SQE_MEMPOOL_BUFFER) {
578 struct rtio *r = iodev_sqe->r;
579 struct sys_mem_blocks *mem_pool = r->block_pool;
580 unsigned int blk_index = 0;
581 unsigned int blk_count = 0;
582
583 if (iodev_sqe->sqe.rx.buf) {
584 blk_index = (iodev_sqe->sqe.rx.buf - mem_pool->buffer) >>
585 mem_pool->info.blk_sz_shift;
586 blk_count = iodev_sqe->sqe.rx.buf_len >> mem_pool->info.blk_sz_shift;
587 }
588 flags = RTIO_CQE_FLAG_PREP_MEMPOOL(blk_index, blk_count);
589 }
590#else
591 ARG_UNUSED(iodev_sqe);
592#endif
593
594 return flags;
595}
596
612__syscall int rtio_cqe_get_mempool_buffer(const struct rtio *r, struct rtio_cqe *cqe,
613 uint8_t **buff, uint32_t *buff_len);
614
615static inline int z_impl_rtio_cqe_get_mempool_buffer(const struct rtio *r, struct rtio_cqe *cqe,
616 uint8_t **buff, uint32_t *buff_len)
617{
618#ifdef CONFIG_RTIO_SYS_MEM_BLOCKS
620 unsigned int blk_idx = RTIO_CQE_FLAG_MEMPOOL_GET_BLK_IDX(cqe->flags);
621 unsigned int blk_count = RTIO_CQE_FLAG_MEMPOOL_GET_BLK_CNT(cqe->flags);
622 uint32_t blk_size = rtio_mempool_block_size(r);
623
624 *buff_len = blk_count * blk_size;
625
626 if (blk_count > 0) {
627 *buff = r->block_pool->buffer + blk_idx * blk_size;
628
629 __ASSERT_NO_MSG(*buff >= r->block_pool->buffer);
630 __ASSERT_NO_MSG(*buff <
631 r->block_pool->buffer + blk_size * r->block_pool->info.num_blocks);
632 } else {
633 *buff = NULL;
634 }
635 return 0;
636 }
637 return -EINVAL;
638#else
639 ARG_UNUSED(r);
640 ARG_UNUSED(cqe);
641 ARG_UNUSED(buff);
642 ARG_UNUSED(buff_len);
643
644 return -ENOTSUP;
645#endif
646}
647
649void rtio_executor_ok(struct rtio_iodev_sqe *iodev_sqe, int result);
650void rtio_executor_err(struct rtio_iodev_sqe *iodev_sqe, int result);
651
660static inline void rtio_iodev_sqe_ok(struct rtio_iodev_sqe *iodev_sqe, int result)
661{
662 rtio_executor_ok(iodev_sqe, result);
663}
664
673static inline void rtio_iodev_sqe_err(struct rtio_iodev_sqe *iodev_sqe, int result)
674{
675 rtio_executor_err(iodev_sqe, result);
676}
677
689static inline void rtio_cqe_submit(struct rtio *r, int result, void *userdata, uint32_t flags)
690{
691 SYS_PORT_TRACING_FUNC_ENTER(rtio, cqe_submit, r, result, flags);
692 struct rtio_cqe *cqe = rtio_cqe_acquire(r);
693
694 if (cqe == NULL) {
695 atomic_inc(&r->xcqcnt);
696 } else {
697 cqe->result = result;
698 cqe->userdata = userdata;
699 cqe->flags = flags;
700 rtio_cqe_produce(r, cqe);
701#ifdef CONFIG_RTIO_CONSUME_SEM
702 k_sem_give(r->consume_sem);
703#endif
704 }
705
706 /* atomic_t isn't guaranteed to wrap correctly as it could be signed, so
707 * we must resort to a cas loop.
708 */
709 atomic_t val, new_val;
710
711 do {
712 val = atomic_get(&r->cq_count);
713 new_val = (atomic_t)((uintptr_t)val + 1);
714 } while (!atomic_cas(&r->cq_count, val, new_val));
715
716#ifdef CONFIG_RTIO_SUBMIT_SEM
717 if (r->submit_count > 0) {
718 r->submit_count--;
719 if (r->submit_count == 0) {
720 k_sem_give(r->submit_sem);
721 }
722 }
723#endif
724 SYS_PORT_TRACING_FUNC_EXIT(rtio, cqe_submit, r);
725}
726
727#define __RTIO_MEMPOOL_GET_NUM_BLKS(num_bytes, blk_size) (((num_bytes) + (blk_size)-1) / (blk_size))
728
741static inline int rtio_sqe_rx_buf(const struct rtio_iodev_sqe *iodev_sqe, uint32_t min_buf_len,
742 uint32_t max_buf_len, uint8_t **buf, uint32_t *buf_len)
743{
744 struct rtio_sqe *sqe = (struct rtio_sqe *)&iodev_sqe->sqe;
745
746#ifdef CONFIG_RTIO_SYS_MEM_BLOCKS
747 if (sqe->op == RTIO_OP_RX && sqe->flags & RTIO_SQE_MEMPOOL_BUFFER) {
748 struct rtio *r = iodev_sqe->r;
749
750 if (sqe->rx.buf != NULL) {
751 if (sqe->rx.buf_len < min_buf_len) {
752 return -ENOMEM;
753 }
754 *buf = sqe->rx.buf;
755 *buf_len = sqe->rx.buf_len;
756 return 0;
757 }
758
759 int rc = rtio_block_pool_alloc(r, min_buf_len, max_buf_len, buf, buf_len);
760 if (rc == 0) {
761 sqe->rx.buf = *buf;
762 sqe->rx.buf_len = *buf_len;
763 return 0;
764 }
765
766 return -ENOMEM;
767 }
768#else
769 ARG_UNUSED(max_buf_len);
770#endif
771
772 if (sqe->rx.buf_len < min_buf_len) {
773 return -ENOMEM;
774 }
775
776 *buf = sqe->rx.buf;
777 *buf_len = sqe->rx.buf_len;
778 return 0;
779}
780
795__syscall void rtio_release_buffer(struct rtio *r, void *buff, uint32_t buff_len);
796
797static inline void z_impl_rtio_release_buffer(struct rtio *r, void *buff, uint32_t buff_len)
798{
799#ifdef CONFIG_RTIO_SYS_MEM_BLOCKS
800 if (r == NULL || buff == NULL || r->block_pool == NULL || buff_len == 0) {
801 return;
802 }
803
804 rtio_block_pool_free(r, buff, buff_len);
805#else
806 ARG_UNUSED(r);
807 ARG_UNUSED(buff);
808 ARG_UNUSED(buff_len);
809#endif
810}
811
818static inline void rtio_access_grant(struct rtio *r, struct k_thread *t)
819{
821
822#ifdef CONFIG_RTIO_SUBMIT_SEM
823 k_object_access_grant(r->submit_sem, t);
824#endif
825
826#ifdef CONFIG_RTIO_CONSUME_SEM
827 k_object_access_grant(r->consume_sem, t);
828#endif
829}
830
831
838static inline void rtio_access_revoke(struct rtio *r, struct k_thread *t)
839{
841
842#ifdef CONFIG_RTIO_SUBMIT_SEM
843 k_object_access_revoke(r->submit_sem, t);
844#endif
845
846#ifdef CONFIG_RTIO_CONSUME_SEM
847 k_object_access_revoke(r->consume_sem, t);
848#endif
849}
850
861__syscall int rtio_sqe_cancel(struct rtio_sqe *sqe);
862
863static inline int z_impl_rtio_sqe_cancel(struct rtio_sqe *sqe)
864{
865 SYS_PORT_TRACING_FUNC(rtio, sqe_cancel, sqe);
866 struct rtio_iodev_sqe *iodev_sqe = CONTAINER_OF(sqe, struct rtio_iodev_sqe, sqe);
867
868 do {
869 iodev_sqe->sqe.flags |= RTIO_SQE_CANCELED;
870 iodev_sqe = rtio_iodev_sqe_next(iodev_sqe);
871 } while (iodev_sqe != NULL);
872
873 return 0;
874}
875
891__syscall int rtio_sqe_copy_in_get_handles(struct rtio *r, const struct rtio_sqe *sqes,
892 struct rtio_sqe **handle, size_t sqe_count);
893
894static inline int z_impl_rtio_sqe_copy_in_get_handles(struct rtio *r, const struct rtio_sqe *sqes,
895 struct rtio_sqe **handle,
896 size_t sqe_count)
897{
898 struct rtio_sqe *sqe;
899 uint32_t acquirable = rtio_sqe_acquirable(r);
900
901 if (acquirable < sqe_count) {
902 return -ENOMEM;
903 }
904
905 for (unsigned long i = 0; i < sqe_count; i++) {
906 sqe = rtio_sqe_acquire(r);
907 __ASSERT_NO_MSG(sqe != NULL);
908 if (handle != NULL && i == 0) {
909 *handle = sqe;
910 }
911 *sqe = sqes[i];
912 }
913
914 return 0;
915}
916
933static inline int rtio_sqe_copy_in(struct rtio *r, const struct rtio_sqe *sqes, size_t sqe_count)
934{
935 return rtio_sqe_copy_in_get_handles(r, sqes, NULL, sqe_count);
936}
937
953__syscall int rtio_cqe_copy_out(struct rtio *r,
954 struct rtio_cqe *cqes,
955 size_t cqe_count,
956 k_timeout_t timeout);
957static inline int z_impl_rtio_cqe_copy_out(struct rtio *r,
958 struct rtio_cqe *cqes,
959 size_t cqe_count,
960 k_timeout_t timeout)
961{
962 size_t copied = 0;
963 struct rtio_cqe *cqe;
964 k_timepoint_t end = sys_timepoint_calc(timeout);
965
966 do {
968 : rtio_cqe_consume(r);
969 if (cqe == NULL) {
970 Z_SPIN_DELAY(25);
971 continue;
972 }
973 cqes[copied++] = *cqe;
974 rtio_cqe_release(r, cqe);
975 } while (copied < cqe_count && !sys_timepoint_expired(end));
976
977 return copied;
978}
979
995__syscall int rtio_submit(struct rtio *r, uint32_t wait_count);
996
997#ifdef CONFIG_RTIO_SUBMIT_SEM
998static inline int z_impl_rtio_submit(struct rtio *r, uint32_t wait_count)
999{
1000 SYS_PORT_TRACING_FUNC_ENTER(rtio, submit, r, wait_count);
1001 int res = 0;
1002
1003 if (wait_count > 0) {
1004 __ASSERT(!k_is_in_isr(),
1005 "expected rtio submit with wait count to be called from a thread");
1006
1007 k_sem_reset(r->submit_sem);
1008 r->submit_count = wait_count;
1009 }
1010
1012
1013 if (wait_count > 0) {
1014 res = k_sem_take(r->submit_sem, K_FOREVER);
1015 __ASSERT(res == 0,
1016 "semaphore was reset or timed out while waiting on completions!");
1017 }
1018
1019 SYS_PORT_TRACING_FUNC_EXIT(rtio, submit, r);
1020 return res;
1021}
1022#else
1023static inline int z_impl_rtio_submit(struct rtio *r, uint32_t wait_count)
1024{
1025
1026 SYS_PORT_TRACING_FUNC_ENTER(rtio, submit, r, wait_count);
1027 int res = 0;
1028 uintptr_t cq_count = (uintptr_t)atomic_get(&r->cq_count);
1029 uintptr_t cq_complete_count = cq_count + wait_count;
1030 bool wraps = cq_complete_count < cq_count;
1031
1033
1034 if (wraps) {
1035 while ((uintptr_t)atomic_get(&r->cq_count) >= cq_count) {
1036 Z_SPIN_DELAY(10);
1037 k_yield();
1038 }
1039 }
1040
1041 while ((uintptr_t)atomic_get(&r->cq_count) < cq_complete_count) {
1042 Z_SPIN_DELAY(10);
1043 k_yield();
1044 }
1045
1046 SYS_PORT_TRACING_FUNC_EXIT(rtio, submit, r);
1047 return res;
1048}
1049#endif /* CONFIG_RTIO_SUBMIT_SEM */
1050
1057
1059 struct rtio **contexts;
1060
1063};
1064
1073__syscall struct rtio *rtio_pool_acquire(struct rtio_pool *pool);
1074
1075static inline struct rtio *z_impl_rtio_pool_acquire(struct rtio_pool *pool)
1076{
1077 struct rtio *r = NULL;
1078
1079 for (size_t i = 0; i < pool->pool_size; i++) {
1080 if (atomic_test_and_set_bit(pool->used, i) == 0) {
1081 r = pool->contexts[i];
1082 break;
1083 }
1084 }
1085
1086 if (r != NULL) {
1088 }
1089
1090 return r;
1091}
1092
1099__syscall void rtio_pool_release(struct rtio_pool *pool, struct rtio *r);
1100
1101static inline void z_impl_rtio_pool_release(struct rtio_pool *pool, struct rtio *r)
1102{
1103
1104 if (k_is_user_context()) {
1106 }
1107
1108 for (size_t i = 0; i < pool->pool_size; i++) {
1109 if (pool->contexts[i] == r) {
1110 atomic_clear_bit(pool->used, i);
1111 break;
1112 }
1113 }
1114}
1115
1116/* clang-format off */
1117
1119
1120#define Z_RTIO_POOL_NAME_N(n, name) \
1121 name##_##n
1122
1123#define Z_RTIO_POOL_DEFINE_N(n, name, sq_sz, cq_sz) \
1124 RTIO_DEFINE(Z_RTIO_POOL_NAME_N(n, name), sq_sz, cq_sz)
1125
1126#define Z_RTIO_POOL_REF_N(n, name) \
1127 &Z_RTIO_POOL_NAME_N(n, name)
1128
1130
1139#define RTIO_POOL_DEFINE(name, pool_sz, sq_sz, cq_sz) \
1140 LISTIFY(pool_sz, Z_RTIO_POOL_DEFINE_N, (;), name, sq_sz, cq_sz); \
1141 static struct rtio *name##_contexts[] = { \
1142 LISTIFY(pool_sz, Z_RTIO_POOL_REF_N, (,), name) \
1143 }; \
1144 ATOMIC_DEFINE(name##_used, pool_sz); \
1145 STRUCT_SECTION_ITERABLE(rtio_pool, name) = { \
1146 .pool_size = pool_sz, \
1147 .contexts = name##_contexts, \
1148 .used = name##_used, \
1149 }
1150
1151/* clang-format on */
1152
1156
1157#ifdef __cplusplus
1158}
1159#endif
1160
1161#include <zephyr/syscalls/rtio.h>
1162
1163#endif /* ZEPHYR_INCLUDE_RTIO_RTIO_H_ */
Header file for the Atomic operations API.
RTIO Completion Queue Events and Related Functions.
long atomic_t
Atomic integer variable.
Definition atomic_types.h:31
static void atomic_clear_bit(atomic_t *target, int bit)
Atomically clear a bit.
Definition atomic.h:224
atomic_val_t atomic_get(const atomic_t *target)
Atomic get.
static bool atomic_test_and_set_bit(atomic_t *target, int bit)
Atomically set a bit and test it.
Definition atomic.h:178
atomic_val_t atomic_inc(atomic_t *target)
Atomic increment.
bool atomic_cas(atomic_t *target, atomic_val_t old_value, atomic_val_t new_value)
Atomic compare-and-set.
#define K_FOREVER
Generate infinite timeout delay.
Definition kernel.h:1682
#define K_NO_WAIT
Generate null timeout delay.
Definition kernel.h:1572
k_timepoint_t sys_timepoint_calc(k_timeout_t timeout)
Calculate a timepoint value.
static bool sys_timepoint_expired(k_timepoint_t timepoint)
Indicates if timepoint is expired.
Definition clock.h:388
#define K_TIMEOUT_EQ(a, b)
Compare timeouts for equality.
Definition clock.h:80
bool k_is_in_isr(void)
Determine if code is running at interrupt level.
int sys_mem_blocks_free_contiguous(sys_mem_blocks_t *mem_block, void *block, size_t count)
Free contiguous multiple memory blocks.
int sys_mem_blocks_alloc_contiguous(sys_mem_blocks_t *mem_block, size_t count, void **out_block)
Allocate a contiguous set of memory blocks.
static ALWAYS_INLINE void mpsc_push(struct mpsc *q, struct mpsc_node *n)
Push a node.
Definition mpsc_lockfree.h:126
static struct mpsc_node * mpsc_pop(struct mpsc *q)
Pop a node off of the list.
Definition mpsc_lockfree.h:145
#define RTIO_CQE_FLAG_MEMPOOL_GET_BLK_CNT(flags)
Get the block count of a mempool flags.
Definition cqe.h:63
#define RTIO_CQE_FLAG_MEMPOOL_GET_BLK_IDX(flags)
Get the block index of a mempool flags.
Definition cqe.h:55
#define RTIO_CQE_FLAG_MEMPOOL_BUFFER
The entry's buffer was allocated from the RTIO's mempool.
Definition cqe.h:45
#define RTIO_CQE_FLAG_PREP_MEMPOOL(blk_idx, blk_cnt)
Prepare CQE flags for a mempool read.
Definition cqe.h:72
#define RTIO_CQE_FLAG_GET(flags)
Definition cqe.h:47
#define RTIO_OP_RX
An operation that receives (reads).
Definition sqe.h:144
#define RTIO_SQE_MEMPOOL_BUFFER
The buffer should be allocated by the RTIO mempool.
Definition sqe.h:105
#define RTIO_SQE_CANCELED
The SQE should not execute if possible.
Definition sqe.h:113
void rtio_pool_release(struct rtio_pool *pool, struct rtio *r)
Return an RTIO context to a pool.
void rtio_executor_err(struct rtio_iodev_sqe *iodev_sqe, int result)
static uint32_t rtio_sqe_acquirable(struct rtio *r)
Count of acquirable submission queue events.
Definition rtio.h:325
struct rtio * rtio_pool_acquire(struct rtio_pool *pool)
Obtain an RTIO context from a pool.
static size_t rtio_mempool_block_size(const struct rtio *r)
Get the mempool block size of the RTIO context.
Definition rtio.h:155
static void rtio_cqe_submit(struct rtio *r, int result, void *userdata, uint32_t flags)
Submit a completion queue event with a given result and userdata.
Definition rtio.h:689
void rtio_release_buffer(struct rtio *r, void *buff, uint32_t buff_len)
Release memory that was allocated by the RTIO's memory pool.
static int rtio_sqe_copy_in(struct rtio *r, const struct rtio_sqe *sqes, size_t sqe_count)
Copy an array of SQEs into the queue.
Definition rtio.h:933
static void rtio_cqe_produce(struct rtio *r, struct rtio_cqe *cqe)
Produce a complete queue event if available.
Definition rtio.h:438
static uint32_t rtio_cqe_compute_flags(struct rtio_iodev_sqe *iodev_sqe)
Compute the CQE flags from the rtio_iodev_sqe entry.
Definition rtio.h:572
void rtio_executor_ok(struct rtio_iodev_sqe *iodev_sqe, int result)
static int rtio_block_pool_alloc(struct rtio *r, size_t min_sz, size_t max_sz, uint8_t **buf, uint32_t *buf_len)
Definition rtio.h:192
int rtio_sqe_copy_in_get_handles(struct rtio *r, const struct rtio_sqe *sqes, struct rtio_sqe **handle, size_t sqe_count)
Copy an array of SQEs into the queue and get resulting handles back.
struct k_mem_partition rtio_partition
The memory partition associated with all RTIO context information.
static struct rtio_sqe * rtio_sqe_acquire(struct rtio *r)
Acquire a single submission queue event if available.
Definition rtio.h:338
static void rtio_sqe_drop_all(struct rtio *r)
Drop all previously acquired sqe.
Definition rtio.h:404
int rtio_cqe_copy_out(struct rtio *r, struct rtio_cqe *cqes, size_t cqe_count, k_timeout_t timeout)
Copy an array of CQEs from the queue.
static int rtio_flush_completion_queue(struct rtio *r)
Flush completion queue.
Definition rtio.h:526
static void rtio_access_revoke(struct rtio *r, struct k_thread *t)
Revoke access to an RTIO context from a user thread.
Definition rtio.h:838
static void rtio_access_grant(struct rtio *r, struct k_thread *t)
Grant access to an RTIO context to a user thread.
Definition rtio.h:818
static void rtio_cqe_release(struct rtio *r, struct rtio_cqe *cqe)
Release consumed completion queue event.
Definition rtio.h:512
static int rtio_sqe_rx_buf(const struct rtio_iodev_sqe *iodev_sqe, uint32_t min_buf_len, uint32_t max_buf_len, uint8_t **buf, uint32_t *buf_len)
Get the buffer associate with the RX submission.
Definition rtio.h:741
static void rtio_iodev_sqe_err(struct rtio_iodev_sqe *iodev_sqe, int result)
Inform the executor of a submissions completion with error.
Definition rtio.h:673
int rtio_sqe_cancel(struct rtio_sqe *sqe)
Attempt to cancel an SQE.
static void rtio_iodev_sqe_ok(struct rtio_iodev_sqe *iodev_sqe, int result)
Inform the executor of a submission completion with success.
Definition rtio.h:660
static struct rtio_cqe * rtio_cqe_acquire(struct rtio *r)
Acquire a complete queue event if available.
Definition rtio.h:419
static struct rtio_cqe * rtio_cqe_consume(struct rtio *r)
Consume a single completion queue event if available.
Definition rtio.h:454
void rtio_sqe_signal(struct rtio_sqe *sqe)
Signal an AWAIT SQE.
static struct rtio_iodev_sqe * rtio_iodev_sqe_next(const struct rtio_iodev_sqe *iodev_sqe)
Get the next sqe in the chain or transaction.
Definition sqe.h:764
int rtio_cqe_get_mempool_buffer(const struct rtio *r, struct rtio_cqe *cqe, uint8_t **buff, uint32_t *buff_len)
Retrieve the mempool buffer that was allocated for the CQE.
void rtio_executor_submit(struct rtio *r)
static struct rtio_cqe * rtio_cqe_consume_block(struct rtio *r)
Wait for and consume a single completion queue event.
Definition rtio.h:488
static int rtio_sqe_acquire_array(struct rtio *r, size_t n, struct rtio_sqe **sqes)
Acquire a number of submission queue events if available.
Definition rtio.h:366
static void rtio_block_pool_free(struct rtio *r, void *buf, uint32_t buf_len)
Definition rtio.h:233
int rtio_submit(struct rtio *r, uint32_t wait_count)
Submit I/O requests to the underlying executor.
void k_sem_reset(struct k_sem *sem)
Resets a semaphore's count to zero.
void k_sem_give(struct k_sem *sem)
Give a semaphore.
int k_sem_take(struct k_sem *sem, k_timeout_t timeout)
Take a semaphore.
#define SYS_PORT_TRACING_FUNC_ENTER(type, func,...)
Tracing macro for the entry into a function that might or might not return a value.
Definition tracing_macros.h:257
#define SYS_PORT_TRACING_FUNC_EXIT(type, func,...)
Tracing macro for when a function ends its execution.
Definition tracing_macros.h:283
#define SYS_PORT_TRACING_FUNC(type, func,...)
Tracing macro for function calls which are not directly associated with a specific type of object.
Definition tracing_macros.h:244
#define BIT(n)
Unsigned integer with bit position n set (signed in assembly language).
Definition util_macro.h:44
#define CONTAINER_OF(ptr, type, field)
Get a pointer to a structure containing the element.
Definition util.h:281
#define DIV_ROUND_UP(n, d)
Divide and round up.
Definition util.h:348
#define EINVAL
Invalid argument.
Definition errno.h:60
#define ENOMEM
Not enough core.
Definition errno.h:50
#define ENOTSUP
Unsupported value.
Definition errno.h:114
void k_yield(void)
Yield the current thread.
static __attribute_const__ k_tid_t k_current_get(void)
Get thread ID of the current thread.
Definition kernel.h:836
void k_object_access_grant(const void *object, struct k_thread *thread)
Grant a thread access to a kernel object.
void k_object_access_revoke(const void *object, struct k_thread *thread)
Revoke a thread's access to a kernel object.
RTIO I/O Device and Related Functions.
Public kernel APIs.
Memory Blocks Allocator.
flags
Definition parser.h:97
RTIO Submission Queue Events and Related Functions.
__UINT32_TYPE__ uint32_t
Definition stdint.h:90
__UINT8_TYPE__ uint8_t
Definition stdint.h:88
#define UINT16_MAX
Definition stdint.h:28
__UINTPTR_TYPE__ uintptr_t
Definition stdint.h:105
__UINT16_TYPE__ uint16_t
Definition stdint.h:89
void * memset(void *buf, int c, size_t n)
Memory Partition.
Definition mem_domain.h:55
Semaphore structure.
Definition kernel.h:3703
Thread Structure.
Definition thread.h:259
Kernel timeout type.
Definition clock.h:65
Kernel timepoint type.
Definition clock.h:291
Queue member.
Definition mpsc_lockfree.h:79
MPSC Queue.
Definition mpsc_lockfree.h:86
A completion queue event.
Definition cqe.h:83
void * userdata
Associated userdata with operation.
Definition cqe.h:87
struct mpsc_node q
Definition cqe.h:84
uint32_t flags
Flags associated with the operation.
Definition cqe.h:88
int32_t result
Result from operation.
Definition cqe.h:86
IO device submission queue entry.
Definition sqe.h:394
struct rtio_sqe sqe
Definition sqe.h:395
struct rtio * r
Definition sqe.h:398
struct mpsc_node q
Definition sqe.h:396
Pool of RTIO contexts to use with dynamically created threads.
Definition rtio.h:1054
struct rtio ** contexts
Array containing contexts of the pool.
Definition rtio.h:1059
atomic_t * used
Atomic bitmap to signal a member is used/unused.
Definition rtio.h:1062
size_t pool_size
Size of the pool.
Definition rtio.h:1056
A submission queue event.
Definition sqe.h:304
void * userdata
User provided data which is returned upon operation completion.
Definition sqe.h:322
uint8_t op
Op code.
Definition sqe.h:305
struct rtio_sqe::@126267262255374054123217063150244034155174062054::@222067021034074304061254367152327164076222165070 rx
OP_RX.
atomic_t ok
Definition sqe.h:381
struct rtio_sqe::@126267262255374054123217063150244034155174062054::@236333123355174166163204241333175337261032350217 await
OP_AWAIT.
uint32_t buf_len
Length of buffer.
Definition sqe.h:328
uint16_t flags
Op Flags.
Definition sqe.h:309
const uint8_t * buf
Buffer to write from.
Definition sqe.h:329
rtio_callback_t callback
Definition sqe.h:346
An RTIO context containing what can be viewed as a pair of queues.
Definition rtio.h:70
struct rtio_cqe_pool * cqe_pool
Definition rtio.h:100
struct mpsc sq
Definition rtio.h:108
atomic_t cq_count
Definition rtio.h:89
struct rtio_sqe_pool * sqe_pool
Definition rtio.h:97
atomic_t xcqcnt
Definition rtio.h:94
struct mpsc cq
Definition rtio.h:111
Iterable sections helpers.
Misc utilities.
static __pinned_func bool k_is_user_context(void)
Indicate whether the CPU is currently in user mode.
Definition syscall.h:121