Multi-stage data-processing pipeline with optional concurrent execution.
More...
#include <pipeline.hxx>
|
|
| Pipeline () noexcept=default |
| | Default constructor Initializes an empty pipeline buffer.
|
| |
| | Pipeline (const Pipeline &other) |
| | Copy constructor Creates a new Pipeline that shares the same underlying buffer as the original.
|
| |
| | Pipeline (Pipeline &&other) noexcept=default |
| | Move constructor Moves the contents of another Pipeline into this instance.
|
| |
|
| ~Pipeline () noexcept |
| | Destructor.
|
| |
| Pipeline & | operator= (const Pipeline &other) |
| | Copy assignment operator.
|
| |
| Pipeline & | operator= (Pipeline &&other) noexcept=default |
| | Move assignment operator.
|
| |
| void | AddPipe (const PipeFunction &pipe) |
| | Add a processing stage to the pipeline.
|
| |
| void | AddPipe (PipeFunction &&pipe) |
| | Add a processing stage to the pipeline (move version).
|
| |
| void | SetError () const noexcept |
| | Mark all internal pipeline stages as errored, causing them to stop accepting writes.
|
| |
| Consumer | Process (Consumer buffer, const ExecutionMode &mode, std::shared_ptr< Logger::Log > log) const noexcept |
| | Execute the pipeline on input data.
|
| |
Multi-stage data-processing pipeline with optional concurrent execution.
- Overview
- Pipeline manages a sequence of transformation functions (PipeFunction) that move data through multiple stages. Each stage may run concurrently (Async mode) or sequentially (Sync mode). Intermediate buffers between stages are thread-safe SharedFIFO instances and are managed automatically by the pipeline.
- Pipe function signature
- A pipeline stage must be a callable with the signature:
Read-only interface for consuming data from a shared FIFO buffer.
Definition consumer.hxx:46
Producer interface for writing data to a shared FIFO buffer.
Definition producer.hxx:27
- Read from
input using Read() / Extract()
- Write processed bytes to
output using Write()
- Close or
SetError() the output when the stage finishes
- Execution modes
ExecutionMode::Async (default): each stage is launched in its own detached thread. Stages process data concurrently and communicate via SharedFIFO buffers.
ExecutionMode::Sync : stages run sequentially in the caller's thread; no detached threads are created.
- Example (conceptual, Async mode)
auto data = in.Extract(1024);
if (data && !data->empty()) {
out.Write(*data);
}
}
});
Consumer result = pipeline.
Process(input.Consumer(), ExecutionMode::Async, logger);
bool EoF() const noexcept override
Check if the reader has reached end-of-file.
Definition consumer.hxx:172
Multi-stage data-processing pipeline with optional concurrent execution.
Definition pipeline.hxx:74
Consumer Process(Consumer buffer, const ExecutionMode &mode, std::shared_ptr< Logger::Log > log) const noexcept
Execute the pipeline on input data.
void AddPipe(const PipeFunction &pipe)
Add a processing stage to the pipeline.
void Close() noexcept
Thread-safe close for further writes.
Definition producer.hxx:118
- Error handling
- Stages should catch and handle errors locally. To propagate failure, a stage may call
SetError() on its output buffer; downstream stages will observe the buffer's unreadable/closed state via EoF() and can react accordingly.
- Best practices
- Always
Close() your stage's output when finished (or call SetError() on failure).
- Prefer simple, focused transformations per stage.
- Use Sync mode for deterministic debugging; use Async for throughput on multi-core systems.
- When using Async mode, ensure any captured data remains valid for the lifetime of the detached thread (use value captures or
std::shared_ptr).
- See also
- PipeFunction, Consumer, Producer, SharedFIFO, ExecutionMode
◆ Pipeline() [1/2]
| StormByte::Buffer::Pipeline::Pipeline |
( |
const Pipeline & |
other | ) |
|
Copy constructor Creates a new Pipeline that shares the same underlying buffer as the original.
- Parameters
-
◆ Pipeline() [2/2]
| StormByte::Buffer::Pipeline::Pipeline |
( |
Pipeline && |
other | ) |
|
|
defaultnoexcept |
Move constructor Moves the contents of another Pipeline into this instance.
- Parameters
-
◆ AddPipe() [1/2]
| void StormByte::Buffer::Pipeline::AddPipe |
( |
const PipeFunction & |
pipe | ) |
|
Add a processing stage to the pipeline.
- Parameters
-
| pipe | Function to execute as a pipeline stage. |
Stages are executed in the order they are added. Each stage runs in its own thread when Process() is called.
- See also
- PipeFunction, Process()
◆ AddPipe() [2/2]
| void StormByte::Buffer::Pipeline::AddPipe |
( |
PipeFunction && |
pipe | ) |
|
Add a processing stage to the pipeline (move version).
- Parameters
-
| pipe | Function to move into the pipeline. |
More efficient than copy when passing temporary functions or lambdas.
- See also
- AddPipe(const PipeFunction&)
◆ operator=() [1/2]
Copy assignment operator.
- Parameters
-
- Returns
- Reference to the updated
Pipeline instance
◆ operator=() [2/2]
Move assignment operator.
- Parameters
-
- Returns
- Reference to the updated
Pipeline instance
◆ Process()
| Consumer StormByte::Buffer::Pipeline::Process |
( |
Consumer |
buffer, |
|
|
const ExecutionMode & |
mode, |
|
|
std::shared_ptr< Logger::Log > |
log |
|
) |
| const |
|
noexcept |
Execute the pipeline on input data.
- Parameters
-
| buffer | Consumer providing input data to the first pipeline stage. |
| mode | Execution mode: ExecutionMode::Async (concurrent detached threads) or ExecutionMode::Sync (sequential execution in caller thread). |
| log | Logger instance for logging within pipeline stages. |
- Returns
- Consumer for reading the final output from the last pipeline stage.
Async: Launches all pipeline stages concurrently in detached threads. Sync: Executes each stage sequentially; no new threads are created. Each stage:
- Reads data from the previous stage (or the input buffer for the first stage)
- Processes the data according to its transformation logic
- Writes results to a SharedFIFO buffer that feeds the next stage The returned Consumer represents final stage output and can be read as data becomes available (Async) or after preceding stages finish (Sync).
- Thread Execution
- Async: Parallel processing across stages; implicit cleanup. Sync : Deterministic ordering; no thread coordination required.
- Data Availability
- Data becomes available in the output Consumer as the pipeline processes it:
- Extract(0) returns currently available data without blocking
- Extract(count) blocks until count bytes available or buffer unreadable
- EoF() returns true when no more data can be produced (unwritable & empty)
- Multiple Invocations
- Each call creates new buffers; Async spawns threads, Sync reuses caller thread. Executing Process more than once on the same Pipeline object while a previous execution is still running is undefined behavior. Always wait until the prior run has completed (e.g., the returned Consumer reaches EoF) before invoking Process again on the same instance.
- Warning
- Async: Captured variables must remain valid for thread lifetime (use value capture/shared_ptr). Sync : Standard lifetimes apply.
- See also
- AddPipe(), Consumer, Producer, ExecutionMode
◆ SetError()
| void StormByte::Buffer::Pipeline::SetError |
( |
| ) |
const |
|
noexcept |
Mark all internal pipeline stages as errored, causing them to stop accepting writes.
This notifies every internal pipe so they transition to an error state and become unwritable; readers will observe end-of-data once existing buffered data is consumed. The method is const because it performs thread-safe signaling on shared state, not logical mutation of the pipeline object itself.
- Note
- The operation is thread-safe and may be called concurrently.
The documentation for this class was generated from the following file:
- /home/runner/work/StormByte-Buffer/StormByte-Buffer/lib/public/StormByte/buffer/pipeline.hxx