Skip to content

Streams

Nagare (流れ) — flow. Everything that moves through the system rides the same current.

Nagare.Streams is the runtime under every moving part of the framework. Projections, live taps, message subscriptions, Kafka consumers, process-manager event routes — they all ride the same pipeline runtime. No second loop, no parallel hosted service, no bespoke runner per integration. One source produces, one handler consumes, one offset store persists; the framework decides when to acquire the cluster lock, when to flush, when to retry, when to park.

That uniformity is the design. Learn how one pipeline runs and you've learned how all of them run, because at runtime they are the same machine.

What rides the runtime

ConcernSourceOffsetHow
Event-sourced projectionIEventStore<TEvent>Position (via ICheckpointStore)AddPipeline<TSub, TEvent>()
Live tap (SSE / broadcaster)IEventStore<TEvent> at head— (no checkpoint)AddLiveTap<TSub, TEvent>()
Outbound messagingIEventStore<TSourceEvent> → mapperPositionAddMessageProducer<…>()
Inbound messagingITransportConsumerbroker consumer-group commitAddMessageSubscription<TH, TM, TC>()
Kafka topicIConsumer<TKey, TValue>Kafka offsetAddKafkaPipeline<TKey, TValue>(…)
Process-manager event routeIEventStore<TSourceEvent>PositionAddProcessEventRoutes<…>()
Custom (HTTP poller, ticker, …)your Source<T>your IOffsetStore<TOffset>AddPipeline<T, TOffset>(…)

Every row above resolves to the same generic registration: AddPipeline<T, TOffset>(pipelineId, sourceFactory, extractOffset, handler, wakeUpChannel). The typed overloads are sugar; the runtime is one.

The narrative

Source, transform, sink. The current carries elements from where they're produced, through whatever shaping the projection needs (filter, batch, throttle, fan out, recover), to a handler that does the work. The framework owns:

  • Lock acquisition — durable pipelines are cluster-singleton via ILockProvider. One node drains; the rest wait. Lose the lock mid-session and the current pass aborts cleanly.
  • Offset advancement — after every successful handle, the framework calls IOffsetStore<TOffset>.SaveAsync. Restart resumes from the persisted cursor; no per-pipeline replay code.
  • Wake-upISubscriptionWakeUp (LISTEN/NOTIFY where the store supports it) pushes when there's something to do; the source falls back to a poll tick when no signal arrives.
  • Dead-letter handling — when a handler throws past its retry budget, the element is parked in IStreamDeadLetterStore and the cursor advances.
  • Stage capture — operators are recorded as stage descriptors so tooling (tests, the dashboard) can introspect the pipeline without running it.

Only the source, the offset, and the handler vary across integrations. That's why a Kafka consumer, an event-sourced projection, and an HTTP poller share 90% of their lifecycle code.

When to reach for what

Reach for the streams DSL when a pipeline has shape — filters, batching, fan-out, throttling, the bits that benefit from being named. Plain handlers (one method per event, switch-on-type, write to a read model) need none of that; pass a lambda directly to AddPipeline<TEvent>("id", handler) and you're done.

Reach for AddLiveTap when ephemeral is the point: SSE feeds, in-memory broadcasters, anything where a cold restart shouldn't replay history. Live taps don't checkpoint; they start at head and every node runs its own. Reach for AddPipeline when a restart should resume — read models, integrations, anything that needs at-least-once delivery from where the predecessor left off.

Registration

csharp
services.AddPipeline<BookEvent>(
    "popular-books",
    pipeline => pipeline
        .Where(env => env.Event is BookEvent.BookBorrowed)
        .Tap(env => Activity.Current?.AddTag("popularity.book_id", env.AggregateId.Value))
        .Handle(PopularBooksProjection.Handle));

There are three overloads. The example above uses the DSL builder; the other two are simpler:

csharp
// Plain handler.
services.AddPipeline<TEvent>("id", async (env, sp, ct) => { ... });

// Plain handler that closes over its own deps.
services.AddPipeline<TEvent>("id", async (env, ct) => { ... });

Only the DSL overload's Where/Tap/Skip/Take produce stage descriptors; plain handlers register as a single source-to-sink pair.

The Source/Sink model

Outside AddPipeline, the same DSL is available for ad-hoc pipelines via Source<T> and Sink<T>. Useful in tests and scripts:

csharp
var source = Source.From(1, 2, 3, 4, 5);

var result = await source
    .Where(x => x % 2 == 0)
    .Select(x => x * 10)
    .ToListAsync();
// [20, 40]

Source factories live on the Source static class (From, Empty, Single, Repeat, FromAsyncEnumerable) and on AdditionalSourceFactories (FromChannel, Tick, Unfold). Sinks come from Sink.Foreach, Sink.ForeachAsync, Sink.Ignore, and run via runtime.Run(source, sink, ct).

Operators

The DSL groups operators by stage kind:

KindOperators
Linear (1→1)Where, Select, Collect, Scan, Skip, Take, Tap, TakeWhile, SkipWhile
Linear (1→N)SelectMany, SelectManyAsync
Fan-in (N→1)Concat, Merge, MergeOrderedBy(comparer), Zip/ZipWith, Interleave, Prepend, OrElse, MergeMany, ConcatMany
Fan-out (1→N)Broadcast(n), Balance(n), Partition(n, partitioner), GroupBy(keySelector)
Side outputsTee, Branch, Inspect
BatchingGroupedWithin(maxCount, window), Batch(maxSize, combiner)
BackpressureBuffer(capacity, OverflowStrategy), Prefetch(capacity), Coalesce(combiner), CoalesceWithSeed, Expand
ParallelSelectAsync(parallelism, fn) (ordered), SelectAsyncUnordered(parallelism, fn)
TimeThrottle(elements, per), Debounce(quietPeriod), Sample(interval), Delay(span), InitialDelay(span), InitialTimeout(span), IdleTimeout(span), CompletionTimeout(span)
RecoveryRecover<TException>(handler), RecoverWith(fallback), Distinct, DistinctBy
ObservationWatchTermination()

The fan-out trio look similar but behave very differently:

  • Broadcast(n) sends every element to every output, and backpressures the upstream when any one output gets full.
  • Balance(n) sends each element to whichever output has room first. Order across branches is non-deterministic.
  • Partition(n, fn) routes each element to the output index returned by fn. Out-of-range indices clamp to the last output.

All three return IReadOnlyList<Source<T>>. The pump starts lazily on the first output that's materialised, so subscribe every output before consumption begins:

csharp
var outputs = upstream.Broadcast(2);
var t1 = runtime.Run(outputs[0], sinkA, ct);
var t2 = runtime.Run(outputs[1], sinkB, ct);
await Task.WhenAll(t1, t2);

GroupedWithin(maxCount, window) emits a list when either the count or the window fires — handy for aggregating writes or batching API calls. SelectAsync(parallelism, fn) runs fn concurrently up to the bound while preserving order; SelectAsyncUnordered does the same but emits results as they finish, which avoids head-of-line blocking when output order doesn't matter. Recovery operators sit between elements that succeeded and elements that threw: Recover substitutes a value, RecoverWith switches to a fallback source.

Picking the right operator

A few of these overlap — pick by intent:

You want to…Use
Limit the rate at which a downstream sees eventsThrottle(n, per)
Coalesce bursty events to one trailing emitDebounce(quiet)
Take the latest value at fixed cadenceSample(interval)
Smooth bursts to a slow consumer with arbitrary aggregationCoalesce(combiner)
Same, but bound how many get coalescedBatch(max, combiner)
Decouple producer and consumer pacingBuffer(capacity, strategy)
Process events per-key in parallel without manual partitioningGroupBy(keyFn)
Tee everything to telemetry/audit without affecting main flowTee (blocking) or Inspect (drop-on-pressure)
Route malformed events to a side channel and skip them downstreamBranch(predicate)
One-to-many transform per elementSelectMany / SelectManyAsync
Bounded-parallel async transform (no order guarantee)SelectAsyncUnordered
Fail-fast on hung upstreamIdleTimeout / InitialTimeout
Pair two streams 1:1 by positionZip / ZipWith
Switch to a fallback source when the primary is emptyOrElse(fallback)
Observe stream termination from outsideWatchTermination() returns (Source, Task)

Buffer's overflow strategies match Channel semantics: Backpressure (default — block upstream), DropOldest, DropNewest, DropBuffer, Fail. Pick by which data matters more — old or new — when the buffer fills.

GroupBy returns a Source<KeyedSubstream<TKey, T>> where each substream is the per-key view. maxSubstreams defaults to 64 and throws if exceeded; it's a safety bound, so pre-bucket the source if you need more.

Backpressure

A linear pipeline backpressures itself for free. IAsyncEnumerable<T> is a pull model: a slow handler simply doesn't call MoveNextAsync, so upstream pauses on the next yield return. There's no demand signal, no Request(n) — the upstream just doesn't get scheduled until the downstream asks again.

Where the pipeline crosses an asynchronous boundary — buffering, fan-in, fan-out, parallel mapping — that pull semantic isn't enough, and the runtime drops a Channel.CreateBounded<T> into the gap. The channel paces both sides: when full, WriteAsync awaits; when empty, ReadAsync awaits. That's the entire backpressure mechanism.

Six places where it surfaces, all tunable:

WhereDefault capacityTunable via
Buffer(capacity, OverflowStrategy) — explicit decouplinguser-suppliedfirst arg
Prefetch(capacity) — overlap upstream fetch with downstream workuser-suppliedfirst arg
Merge — fan-in across N sources64Merge(others, bufferCapacity:) overload
MergeMany — flatten a Source-of-Sources256MergeMany(parallelism:, bufferCapacity:)
Broadcast / Balance / Partition — fan-out1024 per outputbufferPerOutput:
SelectAsync(parallelism, fn) — bounded parallel mapperparallelism * 2implicit (raise parallelism)

Buffer's OverflowStrategy is the only place where the model lets you swap blocking for dropping:

  • Backpressure (default) — block upstream until room appears.
  • DropOldest — drop the head of the buffer to admit the new element.
  • DropNewest — keep the buffer; drop the new arrival.
  • DropBuffer — discard the entire buffer's contents and admit only the new element. The consumer "snaps forward" to the latest state. Useful when falling behind by a buffer's worth means none of the queued elements are still relevant (live dashboards, stale metrics, position updates).
  • Fail — throw InvalidOperationException on overflow.

The fan-in and fan-out operators always block on full — there's no "drop on overload" knob — so put a Buffer in front if you need that semantic.

Batched demand: the patterns we actually have

People asking for "demand-driven streaming" usually mean two distinct things, and we cover both without leaving the channels model:

1. Overlap fetch and process. When an upstream is expensive per element (paginated APIs, slow databases) and the handler is steady, the cost is two waits in series — one for the fetch, one for the work. Prefetch(n) runs the upstream pump ahead of the consumer through a bounded channel of capacity n; while the handler processes element K, the pump is already pulling K+n. Backpressures upstream when the buffer fills, never starves it when it can run ahead.

csharp
services.AddPipeline<OrderEvent, long>(
    "order-projection",
    pipeline => pipeline
        .Prefetch(8)              // up to 8 events fetched ahead
        .SelectAsync(4, EnrichAsync)
        .Handle(ApplyToReadModel));

2. Process in batches when the consumer is ready. When the handler is faster batched than per-element (write 100 rows at a time, hit an API in groups, do a bulk database UPDATE), GroupedWithin(maxCount, window) collapses the upstream into batches sized by either count or time, whichever fires first. The handler receives IReadOnlyList<T> instead of T.

csharp
services.AddPipeline<MetricEvent>(
    "metric-aggregate",
    pipeline => pipeline
        .GroupedWithin(maxCount: 100, window: TimeSpan.FromSeconds(5))
        .Handle(BulkInsertAsync));

Both patterns are pull-driven through IAsyncEnumerable — there's no explicit Request(n) signal travelling upstream, but the effect (batched work, overlapped fetch) is what callers usually want when they reach for credit-based streams. For the shape that genuinely needs credit propagation across operators or async boundaries, see the note below.

The model is naive on purpose

The runtime trusts IAsyncEnumerable to handle linear pull on its own, and trusts BoundedChannel.WriteAsync to handle awaiting at every async boundary. That's it — no credit propagation, no Request(n), no demand signal travelling upstream, no Reactive Streams compliance. In return the implementation stays small (a few hundred lines of operators), allocation-light (channels return ValueTask, the linear path allocates nothing per element), and easy to read end-to-end.

What this costs you:

  • Latency rises under sustained backpressure. When a downstream stalls, in-flight elements queue in the channels until they fill. Capacity caps memory but not staleness — an event that normally takes milliseconds can sit for minutes if the pipeline runs hot for a while. There's no built-in load-shedding; put a Buffer(capacity, DropOldest) in front of the slow stage when you need one.
  • No demand-batching. A Reactive Streams operator can request a hundred elements at a time when the downstream is ready for them. Channels pipelines can't — they pull one element at a time, so an upstream that's expensive per call (a paginated database, say) has to do its own batching: yield batches as single elements via Source.Create, or use GroupedWithin after the source.
  • A slow consumer paces every producer in a merge. Every pump shares the merge channel — the slowest one fills it and the rest backpressure. Fair across the merge, but a single degraded downstream throttles the whole fan-in.
  • Broadcast paces to the slowest output. No output ever drops, which is the point — but a slow telemetry sink can throttle production work. Reach for Inspect over Tee when you'd rather drop telemetry on overload than slow the main flow.
  • LiveBroadcaster deliberately breaks the rule. Per-subscriber buffers use DropOldest, so a slow browser tab loses old events instead of backpressuring the shared upstream. SSE feeds need this; nothing else should.

When the channels model is wrong: per-element credit signalling, demand-driven pull, cross-process flows with TCK-compliant contracts. For those, reach for Akka.Streams.NET. Inside Nagare's actual scope — event-sourced projections, in-process Kafka, dashboard-rendered DAGs — channels are the simpler, faster choice.

Dead-letter handling

By default a thrown handler aborts the session; the outer loop re-acquires the lock and retries the same envelope on the next tick. For poison events that never succeed, that means the projection blocks forever on a single bad envelope.

Configure a DeadLetterPolicy to retry-then-park:

csharp
services.AddPipeline<BookEvent>("popular-books", handler, new PipelineHostingOptions
{
    DeadLetterPolicy = new DeadLetterPolicy
    {
        MaxRetries = 5,
        BackoffSchedule = [
            TimeSpan.FromSeconds(1),
            TimeSpan.FromSeconds(5),
            TimeSpan.FromSeconds(30),
            TimeSpan.FromMinutes(2),
            TimeSpan.FromMinutes(10),
        ],
    },
});

When the retry budget is exhausted, the host persists the envelope to IStreamDeadLetterStore (the default in-memory implementation is fine for tests; production wants a durable one), advances the checkpoint past the offender, and keeps going. Parked envelopes can be replayed later through the live handler.

The policy applies per-envelope. A transient failure recovers in the same session if any retry succeeds; only persistent failures trigger parking.

Testing custom operators

Nagare.Streams.Testing ships a probe-based TestKit for exercising operator behaviour in isolation — backpressure, completion timing, error propagation — without spinning up a full pipeline.

csharp
[Fact]
public async Task Where_filters_evens()
{
    var probe = StreamTesting.CreateSourceProbe<int>();
    var sink = probe.Source.Where(x => x % 2 == 0).Materialise();

    probe.SendNext(1, 2, 3, 4);
    await sink.ExpectNext(2);
    await sink.ExpectNext(4);

    probe.SendComplete();
    await sink.ExpectComplete();
}

The source probe drives the operator from outside (SendNext, SendComplete, SendError); the sink probe consumes assertively (ExpectNext(value), ExpectNoElement(quiet), ExpectError<T>, ExpectComplete). Failed expectations throw AssertionException. Reach for the TestKit when await source.ToListAsync() would mask the timing or backpressure detail you care about.

Hosting

PipelineHostedService runs one pipeline per closed PipelineDefinition<T, TOffset>. The outer loop acquires the pipeline's distributed lock from ILockProvider; once held, it pulls from the source factory starting at lastOffset, applies the captured transform, invokes the handler per element, and saves the offset through IOffsetStore<TOffset> after each successful call. Lose the lock (ILockHandle.Lost) or stop the host, the current pass aborts and the outer loop re-acquires on the next tick. Wake-ups come from ISubscriptionWakeUp (LISTEN/NOTIFY where the store supports it).

For per-pass resilience, hand a Polly ResiliencePipeline to PipelineHostingOptions.Resilience — that wraps each session in retry-with-backoff or a circuit breaker. Note: this is session-level recovery; for per-envelope poison-pill handling, use DeadLetterPolicy instead.

Beyond event-sourced sources

The runtime is generic over element type T and offset TOffset. Anything that produces a Source<T> and a matching IOffsetStore<TOffset> becomes a durable pipeline with the same dead-letter handling and cluster-singleton semantics as event-sourced projections. Use the generic AddPipeline<T, TOffset> overload directly, or grab a sugar package.

Kafka

Nagare.Streams.Kafka wraps Confluent.Kafka into Source<KafkaRecord<TKey, TValue>> + IOffsetStore<KafkaOffset> (consumer-group commit). Register a topic as a pipeline:

csharp
services.AddKafkaPipeline<Ignore, OrderEvent>(
    pipelineId: "order-projection",
    topic: "orders",
    consumerFactory: sp => new ConsumerBuilder<Ignore, OrderEvent>(consumerConfig).Build(),
    handler: async (record, sp, ct) =>
    {
        var view = sp.GetRequiredService<IOrderProjection>();
        await view.Apply(record.Value, ct);
    });

The consumer's group id IS the pipeline id; offsets advance through Kafka's commit protocol.

For the producer side, KafkaSink.ToKafka is a terminal sink for any Source<T> — including the event store. Bridging an event-sourced stream out to a Kafka topic is one short pipeline:

csharp
services.AddPipeline<OrderEvent, long>(
    pipelineId: "orders-to-kafka",
    sourceFactory: (sp, fromOffset, ct) =>
        sp.GetRequiredService<IEventStore<OrderEvent>>()
          .Read(from: fromOffset, ct: ct)
          .Select(env => env.Event),
    extractOffset: env => env.Position,
    handler: KafkaSink.ToKafka<long, Null, OrderEvent>(
        topic: "orders.public",
        producerFactory: sp => new ProducerBuilder<Null, OrderEvent>(producerConfig).Build(),
        keySelector: _ => null));

The handler is the sink; the offset stays in the event store's Position space. Crash mid-flush, the next pass replays from the last saved position — at-least-once, with the broker's idempotent producer config protecting against double-publish.

Failover, lock-loss recovery, and redelivery semantics work the same as any other pipeline; Kafka offsets are just a different cursor type. The cluster test suite covers Postgres, MySQL, and SqlServer end-to-end. Sqlite is single-host only, since it has no clustering story.

Tooling

The dashboard renders every registered pipeline as an xyflow diagram with a time-machine that re-runs the captured transform over historical events without touching the live handler. That's covered in the dashboard guide; it's a window onto the runtime, not the runtime itself.

流れ — flow.