StormByte C++ Library: Buffer module 0.0.9999
StormByte-Buffer is a StormByte library module for handling buffers
Loading...
Searching...
No Matches
Public Member Functions | List of all members
StormByte::Buffer::Pipeline Class Referencefinal

Multi-stage data-processing pipeline with optional concurrent execution. More...

#include <pipeline.hxx>

Public Member Functions

 Pipeline () noexcept=default
 Default constructor Initializes an empty pipeline buffer.
 
 Pipeline (const Pipeline &other)
 Copy constructor Creates a new Pipeline that shares the same underlying buffer as the original.
 
 Pipeline (Pipeline &&other) noexcept=default
 Move constructor Moves the contents of another Pipeline into this instance.
 
 ~Pipeline () noexcept
 Destructor.
 
Pipelineoperator= (const Pipeline &other)
 Copy assignment operator.
 
Pipelineoperator= (Pipeline &&other) noexcept=default
 Move assignment operator.
 
void AddPipe (const PipeFunction &pipe)
 Add a processing stage to the pipeline.
 
void AddPipe (PipeFunction &&pipe)
 Add a processing stage to the pipeline (move version).
 
void SetError () const noexcept
 Mark all internal pipeline stages as errored, causing them to stop accepting writes.
 
Consumer Process (Consumer buffer, const ExecutionMode &mode, std::shared_ptr< Logger::Log > log) const noexcept
 Execute the pipeline on input data.
 

Detailed Description

Multi-stage data-processing pipeline with optional concurrent execution.

Overview
Pipeline manages a sequence of transformation functions (PipeFunction) that move data through multiple stages. Each stage may run concurrently (Async mode) or sequentially (Sync mode). Intermediate buffers between stages are thread-safe SharedFIFO instances and are managed automatically by the pipeline.
Pipe function signature
A pipeline stage must be a callable with the signature:
void stage_fn(Consumer input, Producer output);
Read-only interface for consuming data from a shared FIFO buffer.
Definition consumer.hxx:46
Producer interface for writing data to a shared FIFO buffer.
Definition producer.hxx:27
  • Read from input using Read() / Extract()
  • Write processed bytes to output using Write()
  • Close or SetError() the output when the stage finishes
Execution modes
  • ExecutionMode::Async (default): each stage is launched in its own detached thread. Stages process data concurrently and communicate via SharedFIFO buffers.
  • ExecutionMode::Sync : stages run sequentially in the caller's thread; no detached threads are created.
Example (conceptual, Async mode)
Pipeline pipeline;
pipeline.AddPipe([](Consumer in, Producer out) {
while (!in.EoF()) {
auto data = in.Extract(1024);
if (data && !data->empty()) {
// transform data
out.Write(*data);
}
}
out.Close();
});
// Process input -> returns Consumer for final output
Consumer result = pipeline.Process(input.Consumer(), ExecutionMode::Async, logger);
bool EoF() const noexcept override
Check if the reader has reached end-of-file.
Definition consumer.hxx:172
Multi-stage data-processing pipeline with optional concurrent execution.
Definition pipeline.hxx:74
Consumer Process(Consumer buffer, const ExecutionMode &mode, std::shared_ptr< Logger::Log > log) const noexcept
Execute the pipeline on input data.
void AddPipe(const PipeFunction &pipe)
Add a processing stage to the pipeline.
void Close() noexcept
Thread-safe close for further writes.
Definition producer.hxx:118
Error handling
Stages should catch and handle errors locally. To propagate failure, a stage may call SetError() on its output buffer; downstream stages will observe the buffer's unreadable/closed state via EoF() and can react accordingly.
Best practices
  • Always Close() your stage's output when finished (or call SetError() on failure).
  • Prefer simple, focused transformations per stage.
  • Use Sync mode for deterministic debugging; use Async for throughput on multi-core systems.
  • When using Async mode, ensure any captured data remains valid for the lifetime of the detached thread (use value captures or std::shared_ptr).
See also
PipeFunction, Consumer, Producer, SharedFIFO, ExecutionMode

Constructor & Destructor Documentation

◆ Pipeline() [1/2]

StormByte::Buffer::Pipeline::Pipeline ( const Pipeline other)

Copy constructor Creates a new Pipeline that shares the same underlying buffer as the original.

Parameters
otherPipeline to copy

◆ Pipeline() [2/2]

StormByte::Buffer::Pipeline::Pipeline ( Pipeline &&  other)
defaultnoexcept

Move constructor Moves the contents of another Pipeline into this instance.

Parameters
otherPipeline to move

Member Function Documentation

◆ AddPipe() [1/2]

void StormByte::Buffer::Pipeline::AddPipe ( const PipeFunction &  pipe)

Add a processing stage to the pipeline.

Parameters
pipeFunction to execute as a pipeline stage.

Stages are executed in the order they are added. Each stage runs in its own thread when Process() is called.

See also
PipeFunction, Process()

◆ AddPipe() [2/2]

void StormByte::Buffer::Pipeline::AddPipe ( PipeFunction &&  pipe)

Add a processing stage to the pipeline (move version).

Parameters
pipeFunction to move into the pipeline.

More efficient than copy when passing temporary functions or lambdas.

See also
AddPipe(const PipeFunction&)

◆ operator=() [1/2]

Pipeline & StormByte::Buffer::Pipeline::operator= ( const Pipeline other)

Copy assignment operator.

Parameters
otherPipeline instance to copy from
Returns
Reference to the updated Pipeline instance

◆ operator=() [2/2]

Pipeline & StormByte::Buffer::Pipeline::operator= ( Pipeline &&  other)
defaultnoexcept

Move assignment operator.

Parameters
otherPipeline instance to move from
Returns
Reference to the updated Pipeline instance

◆ Process()

Consumer StormByte::Buffer::Pipeline::Process ( Consumer  buffer,
const ExecutionMode &  mode,
std::shared_ptr< Logger::Log >  log 
) const
noexcept

Execute the pipeline on input data.

Parameters
bufferConsumer providing input data to the first pipeline stage.
modeExecution mode: ExecutionMode::Async (concurrent detached threads) or ExecutionMode::Sync (sequential execution in caller thread).
logLogger instance for logging within pipeline stages.
Returns
Consumer for reading the final output from the last pipeline stage.

Async: Launches all pipeline stages concurrently in detached threads. Sync: Executes each stage sequentially; no new threads are created. Each stage:

  • Reads data from the previous stage (or the input buffer for the first stage)
  • Processes the data according to its transformation logic
  • Writes results to a SharedFIFO buffer that feeds the next stage The returned Consumer represents final stage output and can be read as data becomes available (Async) or after preceding stages finish (Sync).
Thread Execution
Async: Parallel processing across stages; implicit cleanup. Sync : Deterministic ordering; no thread coordination required.
Data Availability
Data becomes available in the output Consumer as the pipeline processes it:
  • Extract(0) returns currently available data without blocking
  • Extract(count) blocks until count bytes available or buffer unreadable
  • EoF() returns true when no more data can be produced (unwritable & empty)
Multiple Invocations
Each call creates new buffers; Async spawns threads, Sync reuses caller thread. Executing Process more than once on the same Pipeline object while a previous execution is still running is undefined behavior. Always wait until the prior run has completed (e.g., the returned Consumer reaches EoF) before invoking Process again on the same instance.
Warning
Async: Captured variables must remain valid for thread lifetime (use value capture/shared_ptr). Sync : Standard lifetimes apply.
See also
AddPipe(), Consumer, Producer, ExecutionMode

◆ SetError()

void StormByte::Buffer::Pipeline::SetError ( ) const
noexcept

Mark all internal pipeline stages as errored, causing them to stop accepting writes.

This notifies every internal pipe so they transition to an error state and become unwritable; readers will observe end-of-data once existing buffered data is consumed. The method is const because it performs thread-safe signaling on shared state, not logical mutation of the pipeline object itself.

Note
The operation is thread-safe and may be called concurrently.

The documentation for this class was generated from the following file: