Pipes

A pipe is a kernel object that allows a thread to send a byte stream to another thread. Pipes enable efficient inter-thread communication and can be used to synchronously transfer chunks of data in whole or in part.

Concepts

Any number of pipes can be defined, limited only by available RAM. Each pipe is referenced by its memory address.

A pipe has the following key property:

  • A size that indicates the capacity of the pipe’s ring buffer.

A pipe must be initialized before it can be used. When initialized, the pipe is empty.

Threads interact with the pipe as follows:

  • Writing: Data is synchronously written, either in whole or in part, to a pipe by a thread. If the pipe’s ring buffer is full, the operation blocks until sufficient space becomes available or the specified timeout expires.

  • Reading: Data is synchronously read, either in whole or in part, from a pipe by a thread. If the pipe’s ring buffer is empty, the operation blocks until data becomes available or the specified timeout expires.

  • Resetting: A thread can reset a pipe, which resets its internal state and ends all pending read and write operations with an error code.

Pipes are well-suited for scenarios like producer-consumer patterns or streaming data between threads.

Implementation

A pipe is defined using a variable of type k_pipe and a byte buffer. The pipe must then be initialized by calling k_pipe_init().

The following code defines and initializes an empty pipe with a ring buffer capable of holding 100 bytes, aligned to a 4-byte boundary:

uint8_t __aligned(4) my_ring_buffer[100];
struct k_pipe my_pipe;

k_pipe_init(&my_pipe, my_ring_buffer, sizeof(my_ring_buffer));

Alternatively, a pipe can be defined and initialized at compile time using the K_PIPE_DEFINE macro, which defines both the pipe and its ring buffer:

K_PIPE_DEFINE(my_pipe, 100, 4);

This has the same effect as the code above.

Writing to a Pipe

Data is added to a pipe by calling k_pipe_write().

The following example demonstrates using a pipe to send data from a producer thread to one or more consumer threads. If the pipe’s ring buffer fills up, the producer thread waits for a specified amount of time.

struct message_header {
    size_t num_data_bytes; /* Example field */
    ...
};

void producer_thread(void)
{
    int rc;
    uint8_t *data;
    size_t total_size;
    size_t bytes_written;

    while (1) {
        /* Craft message to send in the pipe */
        make_message(data, &total_size);
        bytes_written = 0;

        /* Write data to the pipe, handling partial writes */
        while (bytes_written < total_size) {
            rc = k_pipe_write(&my_pipe, &data[bytes_written], total_size - bytes_written, K_NO_WAIT);

            if (rc < 0) {
                /* Error occurred */
                ...
                break;
            } else {
                /* Partial or full write succeeded; adjust for next iteration */
                bytes_written += rc;
            }
        }

        /* Reset bytes_written for the next message */
        bytes_written = 0;
        ...
    }
}

Reading from a Pipe

Data is retrieved from the pipe by calling k_pipe_read().

The following example builds on the producer thread example above. It shows a consumer thread that processes data generated by the producer.

struct message_header {
    size_t num_data_bytes; /* Example field */
    ...
};

void consumer_thread(void)
{
    int rc;
    uint8_t buffer[128];
    size_t bytes_read = 0;
    struct message_header *header = (struct message_header *)buffer;

    while (1) {
        /* Step 1: Read the message header */
        bytes_read = 0;
   read_header:
        while (bytes_read < sizeof(*header)) {
            rc = k_pipe_read(&my_pipe, &buffer[bytes_read], sizeof(*header) - bytes_read, &bytes_read, K_NO_WAIT);

            if (rc < 0) {
                /* Error occurred */
                ...
                goto read_header;
            }

            /* Adjust for partial reads */
            bytes_read += rc;
        }

        /* Step 2: Read the message body */
        bytes_read = 0;
        while (bytes_read < header->num_data_bytes) {
            rc = k_pipe_read(&my_pipe, &buffer[sizeof(*header) + bytes_read], header->num_data_bytes - bytes_read, K_NO_WAIT);

            if (rc < 0) {
                /* Error occurred */
                ...
                goto read_header;
            }

            /* Adjust for partial reads */
            bytes_read += rc;
        }
        /* Successfully received the complete message */
    }
}

Resetting a Pipe

The pipe can be reset by calling k_pipe_reset(). Resetting a pipe resets its internal state and ends all pending operations with an error code.

The following example demonstrates resetting a pipe in response to a critical error:

void monitor_thread(void)
{
    while (1) {
        ...
        /* Critical error detected: reset the entire pipe to reset it. */
        k_pipe_reset(&my_pipe);
        ...
    }
}

Suggested Uses

Pipes are useful for sending streams of data between threads. Typical applications include:

  • Implementing producer-consumer patterns.

  • Streaming logs or packets between threads.

  • Handling variable-length message passing in real-time systems.

API Reference

Pipe APIs