Clinker Engine Internals
This book is for engineers working on Clinker — or operators who need to reason about why the engine behaves the way it does under load. It documents the execution model, memory arbitration, correlation-key retraction, operator strategies, storage durability, and CXL compilation in implementation detail.
If you only need to author and run YAML pipelines, you want the Clinker User Guide instead (the separate book under docs/). That book deliberately stays at the level of “what do I type and what happens”; this one explains the machinery beneath it.
What’s in scope here
- The execution model — which stages stream, which block, and how the memory arbitrator decides who pauses and who spills.
- Correlation-key retraction — the shadow-column lineage, per-source rollback narrowing, and the retraction protocol that lets a relaxed aggregate drop only the failing records.
- Operator internals — Combine join-strategy selection, Merge back-pressure, streaming Output writes, and the schema-drift sidecar.
- Storage — the staging cache, crash durability, and the locking protocol.
- CXL compilation — the compiler phases and the type-unification algorithm.
How to read it alongside the User Guide
Most chapters here have a user-facing counterpart in the User Guide. The pattern is consistent: the User Guide tells you the knob and the observable outcome (e.g. “declare sort_order and the aggregate streams”); this book explains the mechanism that makes the outcome true (the streaming-ingest path, the group-table accumulation, the spill trigger). When a chapter has a user-facing sibling, it says so at the top.
Architectural ground rules
Everything here is downstream of three permanent commitments — finite inputs, finite jobs, single process — covered in Overview & Pillars. A mechanism that appears baroque (the single-process memory arbitrator, the in-process Rayon parallelism) usually makes sense only once you hold those three constraints fixed.
Overview & Pillars
Clinker is a bounded-memory batch DAG executor. A pipeline run is a finite job over finite input: Source nodes read until EOF, the DAG drains, the process exits with a status code. It pairs a custom expression language (CXL) with YAML pipeline orchestration.
Within a run, stateless operators (Transform, Route, most Combine probe-side work, Output) evaluate records one at a time without per-record state accumulation. The DAG executor materializes intermediate buffers between non-fused stages, so memory scales with the largest live intermediate stage’s output, not total input size; fused Source → Transform → Output paths skip materialization entirely. Blocking operators (Aggregate, sort, grace-hash Combine) accumulate state inside the configured RSS budget (default 512 MB) and spill to disk when soft/hard thresholds trip rather than OOM the process.
The three pillars
Every design decision cascades from three commitments. They are permanent — an architectural proposal that violates any of them is rejected at design review, not implementation review.
-
Finite inputs only. Files (CSV / JSON / XML / fixed-width) and finite-cursor network sources (paginated REST, SQL
SELECTcursors) — both reach EOF after exhausting their cursor. Unbounded sources (Kafka, Kinesis, SSE, webhooks,tail -f) are out of scope permanently. -
Finite jobs. No daemon mode, no service surface, no infinite event loop.
clinker runinvokes, drains, exits. -
Single process forever. One invocation = one OS process. Parallelism happens inside the process via
std::threadand Rayon — no worker-process pools, no multi-machine sharding, no network shuffle, no cluster manager. Scale by adding cores / RAM / disk to one host (the DuckDB / Polars / Kettle model). If a host genuinely can’t fit the work, partition the input by file or key and run multipleclinkerinvocations from a shell script.
These pillars are why the memory arbitrator is a single in-process component rather than a distributed scheduler, why there is no network shuffle in Combine, and why spill-to-local-disk is the universal pressure-relief valve.
Crate dependency layers (bottom → top)
Applications: clinker (CLI) | cxl-cli (REPL)
↓ ↓
Orchestration: clinker-core (DAG planner + executor)
clinker-channel (workspace/channel mgmt)
clinker-schema (source .schema.yaml validation)
↓
Language/IO: cxl (lexer → parser → typecheck → eval)
clinker-format (CSV/JSON/XML/fixed-width readers/writers)
↓
Foundation: clinker-record (Value, Record, Schema, coercion)
Bench plumbing: clinker-bench-support (deterministic RecordFactory + payload generators)
clinker-benchmarks (cross-crate benchmark harness)
The bench crates are siblings, not part of the runtime layer.
The node taxonomy
Pipelines use a single flat nodes: list; each entry’s type: discriminator selects a variant of one homogeneous DAG:
- Source — input reader bound to a
.schema.yaml. - Transform — record-level CXL projection / filter / lookup (1×1).
- Aggregate — grouped or windowed reduction.
- Route — predicate-based fan-out.
- Merge — streamwise concatenation of inputs.
- Combine — N-ary record combining with mixed predicates (equi + range + arbitrary CXL); distinct from Merge and Transform+lookup.
- Reshape — per-group mutate-and-synthesize.
- Output — sink writer.
- Composition — call-site node referencing a
.comp.yamlreusable sub-pipeline, lowered at compile time.
The plan itself is a petgraph DAG (ExecutionPlanDag) of topologically-sorted nodes, each carrying a parallelism strategy and NodeProperties (ordering / partitioning provenance). CXL is typechecked at compile time into a TypedProgram, and schema is propagated across the DAG at plan time.
Key engine decisions
- Memory-aware aggregation. Hash aggregation with disk spill; streaming aggregation when sort order permits; RSS tracking with soft/hard limits. The mechanism is documented in Memory Arbitration & Scheduling.
- Compile-time CXL typechecking. Type inference produces a
TypedProgram; see Compiler Phases & Type Unification. - Diagnostics. All user-facing errors use
miettefor span-annotated reports.Spanned<PipelineNode>covers the YAML side,cxl::Spancovers the expression side, and they compose into one report. - Pure Rust policy.
deny.tomlbans cmake; no C build dependencies in clinker crates.
Streaming vs. Blocking Stages
User-facing view: the User Guide’s “Streaming vs. Blocking Stages” page.
This page is the engine-internals reference for the runtime classifier that decides whether a node hands its output downstream in bounded batches or accumulates its whole input before emitting. The streaming/blocking split is the mechanism behind Clinker’s bounded-memory guarantee, and the same classifier annotates --explain output and drives the dispatcher at runtime, so the model here is exactly what the executor does — not a simplification of it. Read it alongside Memory Arbitration & Scheduling, which covers how each in-flight batch and materialized slot charges the budget.
Every node in a pipeline plan is one of two kinds at runtime:
- Streaming stages hand their output downstream in bounded batches over a back-pressured channel, never crossing an inter-stage buffer that charges the memory budget. The two fused streaming paths additionally hold at most one batch of in-flight events at a time, so their inter-stage memory does not grow with input size. The other streaming stages still build their own result before handing it off — streaming spares them the second copy into a charged buffer and overlaps the writer with downstream work, but their own working set is as large as a blocking stage’s would be.
- Blocking stages must see their whole input before they can produce any output. They accumulate state inside the memory budget and spill to disk when the soft threshold trips, rather than holding everything in RAM.
This distinction is what makes Clinker a bounded-memory executor: a pipeline’s peak memory is set by its largest live blocking-or-non-fused-streaming stage plus one batch per fused streaming stage, not by the cumulative size of every stage at once. A streaming stage’s output is never separately buffered between dispatch arms, so it is never charged twice: the arbitrator counts each in-flight batch once when the producer flushes it and discharges that charge as the consumer drains it. If RSS still crosses the soft threshold while a single-consumer streaming stage holds batches in flight, the engine spills those batches’ records to disk one batch at a time — the streaming handoff is the per-batch counterpart of a blocking stage’s full-stage spill, not an exemption from spilling.
Which stages stream
A stage streams when its output is handed straight to a single downstream consumer instead of crossing a charged inter-stage buffer. The downstream consumer is a sink Output writer, an Aggregate’s ingest, or a hash build-probe Combine’s probe (driver) side — see Streaming into an Aggregate and Streaming into a Combine probe below.
Two stages stream and bound their own footprint to one batch, because they pull records off a live upstream channel and forward each batch without ever building a full result:
- Source → Transform → Output fused chains. A non-windowed Transform whose only upstream is a single Source and whose only downstream is a single sink Output consumes that Source’s records directly and hands each batch to the Output’s writer thread over a back-pressured channel; neither the Transform nor the Output materializes the whole record set. A Transform that fans out to multiple consumers, feeds another operator, or roots a window keeps the buffered (materialized) path.
Mergeininterleavemode fed entirely by Sources. The merge reads each Source’s live stream and forwards records as they arrive.
These stages stream their output to a single downstream consumer too — sparing the second copy and overlapping the consumer — but each still builds its full result first, so its own working set is not bounded to one batch:
- Single-branch
Route. A Route with exactly one branch feeding one sink Output streams that branch’s records to the writer thread. A multi-branch Route forks records across several successor buffers and stays materialized. Mergeinconcatmode, orinterleavefed by non-Source inputs, feeding one sink Output. The merge drains its predecessors’ buffers in order (concat) or round-robin (interleave) into the merged result, then streams it.streaming-strategyAggregatefeeding one sink Output. When the planner certifies the aggregate’s input is pre-sorted on the group key, it finalizes the group rows and streams them rather than buffering them for a downstream arm.Combineprobe side (hash build-probe strategy) feeding one sink Output. The build relation stays fully materialized in the hash table; the matched probe output streams to the writer.
Each of these requires the producer to feed exactly one downstream consumer and to root no window; a producer that roots a window keeps the materialized path because the window arena needs the producer’s full output to build.
- Every
Output. A sink writes records to its configured writer and never buffers a whole stage.
Document-boundary punctuations (DocumentOpen / DocumentClose, the signals behind the $doc.* context) flow inline with records through streaming stages, preserving their order: a document’s close always trails the document’s last record, even when the document’s records span several batches.
Streaming into an Aggregate
The streaming consumer above is usually a sink Output. It can also be an Aggregate’s ingest: when an eligible producer (a fused Source → Transform, a single-branch Route, a non-fused Merge, or a streaming-strategy Aggregate) feeds exactly one downstream Aggregate, the producer streams record-at-a-time into the aggregate’s add_record over a back-pressured channel rather than the aggregate pre-draining the producer’s whole output from a charged buffer. The producer reports buffer: streaming and --explain shows no node_buffer edge between it and the aggregate.
This streams the aggregate’s ingest half only — the producer no longer needs a charged inter-stage slot, and a slow aggregate (one that is spilling, say) paces the producer through the bounded channel. The aggregate’s finalize half stays blocking by nature: a group_by value depends on every member, so the group table accumulates the whole input and emits only after the channel closes (end of input). Spill stays driven by RSS pressure, never by channel depth, exactly as on the materialized path.
Two aggregate shapes keep the materialized ingest, because their finalize is not a single forward pass: a time-windowed aggregate runs a multi-pass per-window algorithm over the whole input, and a relaxed correlation-key aggregate retains its group state for the correlation-commit phase. Both show buffer: materialized on the edge into them.
Streaming into a Combine probe
A producer can also stream into a hash build-probe Combine’s probe (driver) side. When an eligible producer (a fused Source → Transform, a single-branch Route, a non-fused Merge, a streaming-strategy Aggregate, or another hash build-probe Combine) is the Combine’s driver input, the producer streams record-at-a-time into the probe kernel over a back-pressured channel rather than the Combine pre-draining the driver’s whole output from a charged buffer. The driver producer reports buffer: streaming and --explain shows no node_buffer edge between it and the Combine. Only the HashBuildProbe strategy qualifies — the range, sort-merge, and grace-hash kernels re-sort or re-scan the driver and stay materialized.
This streams the Combine’s probe half only. The build side stays fully materialized: the engine builds the complete hash table on the main thread before the driver producer streams its first record, so the probe never matches against an incomplete index. The probe consumer runs on its own thread, so a slow driver paces the probe through the bounded channel and a slow probe (a large fan-out) back-pressures the driver. The build relation’s footprint is the hash table, exactly as on the materialized path; the streaming handoff spares only the driver’s inter-stage slot. Per-source dead-letter rewind, memory accounting, and output are byte-identical to the materialized path.
Which stages block
A stage blocks when its result depends on records it has not seen yet:
sort— the full input must be present before the first sorted record is known.- Hash
Aggregate— a group’s final value depends on every member, so the group table accumulates the whole input. (Astreaming-strategy Aggregate over a pre-sorted input is the exception: the planner certifies it can emit a group as soon as the sort key advances.) Combinebuild side — the build relation is fully indexed before any probe record is matched. The probe side streams against the built index, but the build side materializes.IEJoin/ sort-mergeCombine— both inputs are sorted and buffered before the band/merge step runs.CorrelationCommit— a correlation group is held until its commit decision (flush or dead-letter) is known.
A blocking stage keeps its full-stage accumulation inside pipeline.memory.limit and spills to disk past the soft threshold; it does not stream batches.
Seeing the classification
clinker run <pipeline>.yaml --explain annotates every node with its class in the Physical Properties section:
output.report:
buffer: streaming
aggregation.dept_totals:
buffer: materialized
buffer: streaming marks a stage whose output is consumed without an inter-stage buffer — it charges the budget per in-flight batch and, on a single-consumer edge, spills those batches to disk under pressure; buffer: materialized marks a stage whose output crosses a node_buffers slot that charges the memory budget as one full-stage slot and spills the whole stage. Both classes are spill-eligible; they differ in granularity, not in whether they can spill. The explain annotation is derived from the same classifier the executor uses at runtime, so what --explain reports is exactly what the dispatcher does. See Memory Arbitration & Scheduling for the arbitration model that rides alongside the buffer class.
Tuning the batch size
The number of events handed downstream per batch is set by pipeline.batch_size (default 2048), with an optional per-transform override on a Transform’s config.batch_size. For a fused streaming stage — the only kind whose footprint is one batch — smaller batches lower its in-flight footprint at the cost of more per-batch bookkeeping; larger batches do the reverse. For the other streaming stages the batch size sets only the in-flight slice handed across the channel; the producer’s own result is built in full regardless, so batch_size does not cap their footprint. The batch size changes only the memory profile of streaming handoffs — never their output, and never the behavior of blocking stages.
Memory Arbitration & Scheduling
User-facing view: the User Guide’s “Memory Tuning” page.
This page is the engine-internals reference for how Clinker tracks, attributes, and reclaims memory at runtime, and how it orders simultaneously-runnable nodes to keep the resident working set bounded. It covers the MemoryConsumer wrapper registry, pull-mode byte attribution, the per-operator arbitration parameters the active policy reads, the bounded-memory contract for materialized stages, the predicted_* values that feed both --explain and the scheduler, and the four ranking rules the scheduler applies (with its fallback to topological order). The user-facing knobs — the memory: block, the --memory-limit flag, the backpressure-policy selection, sizing guidance, and monitoring — live in the User Guide and are intentionally not repeated here. For how each stage’s buffer class (streaming vs materialized) is decided, see Streaming vs. Blocking Stages.
How it works
Clinker tracks memory in two layers. RSS (resident set size) is sampled at chunk boundaries and supplies the primary spill / abort signal. Alongside RSS, every memory-touching operator (Source ingest channels, Aggregate hash maps, sort buffers, grace-hash partitions, sort-merge accumulators, IEJoin arrays, inline-Combine hash tables, the Reshape per-group input buffer, node_buffers slots, and window-runtime arenas) registers a MemoryConsumer wrapper with the pipeline-scoped arbitrator. Each operator owns its live byte counter and updates it on every admit / spill transition; the arbitrator queries current_usage() per consumer at every policy poll. This pull-mode attribution lets the policy distinguish reclaimable bytes (what an operator can give up right now) from currently-held bytes — a grace-hash with on-disk partitions, for instance, reports only its in-memory portion, and the Reshape buffer reports the live bytes of the groups still resident in memory.
Window-runtime arenas (the columnar backing store that analytic-window evaluation reads from) are attributed but not independently spillable: an arena is immutable once built and is freed only indirectly, when the operator that consumes its windows drains to disk. Its wrapper reports the arena’s bytes so the arbitrator’s attribution is complete, but ranks last among spill victims so a policy never elects an arena while any consumer that can actually pause or spill remains.
Per-operator arbitration parameters
Each registered consumer carries two parameters the active policy reads: a spill priority (lower is spilled first under Priority) and a back-pressure flag (whether its producer can be paused instead). The defaults are:
| Operator class | spill_priority | can_back_pressure |
|---|---|---|
node_buffers slot (inter-stage buffer) | 0 | false |
| grace-hash Combine | 10 | false |
| Reshape | 15 | false |
| sort buffer / IEJoin build | 20 | false |
| sort-merge Combine | 25 | false |
| hash Aggregate | 30 | false |
| inline-hash Combine | 30 | false |
| Source ingest | N/A | true |
| streaming Aggregate | N/A | false |
| window arena | last | false |
Lower priority is spilled first, so node_buffers slots (priority 0) are the cheapest victim class — spilling an inter-stage buffer to disk costs one LZ4 + postcard round-trip and frees the most reclaimable bytes per call. The blocking operators climb from there: a grace-hash Combine (10) is preferred over Reshape (15), which is preferred over a sort buffer (20), which is preferred over a hash Aggregate or inline-hash Combine (30). Reshape sits between grace-hash and sort because its spill round-trip re-runs synthesis on reload — costlier to evict than grace partitions, cheaper than an external-sort merge — and it spills the raw per-group input records rather than post-processed output.
A Source and a streaming Aggregate show spill_priority=N/A because neither operator holds spillable accumulated state. A Source’s try_spill always frees zero bytes — its only real lever is the pause its can_back_pressure=true advertises. A streaming Aggregate emits each group as it completes and never accumulates a spillable group table. The N/A here is about the operator’s own state, not its downstream handoff: when a streaming stage’s output rides a per-batch streaming handoff to a single consumer, that handoff registers a priority-0 consumer just like a node_buffers slot does, and its in-flight batches are spilled to disk one batch at a time if RSS crosses the soft threshold while they are in flight. So a streaming Aggregate’s group table is never a spill victim, but the batches it hands downstream can be.
When memory pressure crosses the soft threshold (80 % of limit), the arbitrator runs the active policy to pick a victim and invokes the corresponding action: pause() on a back-pressureable consumer (its producer’s hot loop parks on a Condvar until resume), or try_spill(target_bytes) on a spillable consumer (the consumer’s wrapper flips a spill-requested flag the operator reads at its next batch boundary). When RSS crosses the hard limit, the engine fails fast with E310 MemoryBudgetExceeded.
This means:
- Pipelines always complete if disk space is available, regardless of input size.
- Performance degrades gracefully under memory pressure — you will see slower execution (and possibly disk I/O), not failures.
- The memory limit is a soft ceiling, not a hard wall. Momentary spikes may briefly exceed the limit before the policy fires.
Bounded-memory contract for non-fused stages
A stage runs streaming — no charged per-stage node_buffers slot — when it hands its output to a single downstream sink Output and roots no window: fused Source → Transform → Output and Merge.interleave-of-Sources chains, plus single-branch Route, non-fused Merge, streaming-strategy Aggregate, and hash-build-probe Combine probe-side feeding one Output (see Streaming vs. Blocking Stages). The remaining boundaries — multi-branch Route fan-out, a Merge or other operator whose output forks to several consumers, Composition bodies, diamond DAGs, and every blocking strategy — materialize records into per-stage node_buffers. Each slot registers a NodeBufferConsumer with the arbitrator (priority 0 — the cheapest-to-spill victim class), so the active policy’s victim selection is fully attributed.
When a buffer crosses the soft threshold (80 % of the limit) the arbitrator runs the active policy. Under the default pause, the producer feeding the buffer is paused at its inbound channel; under spill or when no consumer can be paused, the slot spills to disk using the same LZ4 + postcard frame format as grace-hash sort partitions. When RSS crosses the hard limit, the engine fails fast with E310 MemoryBudgetExceeded { node } naming the operator whose hot loop polled the abort gate. The explain --code E310 diagnostic covers the full diagnostic model, including the composition-involved two-shape error model.
Spill fires at the producer side of the first slot whose downstream topology permits it — single-consumer, port-less. For a Source feeding a Route, that’s the Source’s own slot, not the Route’s per-branch slots, because the Source has the one outgoing edge that satisfies the topology rule. Per-branch slots can still spill independently when their own row-distribution drives them past the soft threshold, but the canonical case lands at the producer.
Use clinker run --explain to predict which stages will dominate the budget before runtime — each node carries a buffer: streaming | materialized annotation. Materialized nodes charge pipeline.memory.limit as one full-stage slot and spill the whole stage; streaming nodes charge per in-flight batch and, on a single-consumer edge, spill those batches one at a time. Both classes count against the limit and can spill — the annotation tells you the granularity (whole-stage vs. per-batch), not whether a stage is exempt from the budget.
Reading --explain arbitration output
Alongside the buffer: class, every node in the Physical Properties stanza of --explain carries an arbitration: line giving the per-operator parameters the arbitrator would apply at runtime. The numbers are derived at plan time — --explain does no I/O, so there are no live consumers to query — but they mirror the runtime values exactly, so an author can read the spill/pause model before running the pipeline.
For a fast Source feeding a slow Aggregate (the canonical bounded-memory shape), the relevant lines read:
=== Physical Properties ===
source.orders:
buffer: materialized
arbitration: spill_priority=N/A, can_back_pressure=true, predicted_peak=1K, predicted_freed=0B, predicted_subtree_reclaim=1K
aggregation.dept_totals:
buffer: materialized
arbitration: spill_priority=30, can_back_pressure=false, predicted_peak=1K, predicted_freed=1K, predicted_subtree_reclaim=1K
The Source advertises can_back_pressure=true and spill_priority=N/A: when memory pressure rises, the arbitrator pauses the Source rather than asking it to spill (it has nothing to free). The hash Aggregate advertises the opposite — spill_priority=30, can_back_pressure=false — so it is a spill victim, ranked behind any cheaper consumer.
The three predicted_* values are the scheduler’s inputs (see Scheduling below). predicted_peak is the live volume a node is expected to hold at its peak — seeded at a file-backed Source from its path: file’s on-disk size and propagated forward. predicted_freed is what the node returns to the budget the instant it finishes draining: a blocking Aggregate holds its whole accumulated input (predicted_peak=1K) and frees it on drain (predicted_freed=1K), while a streaming Source carries the volume through but frees nothing the instant it drains (predicted_freed=0B). predicted_subtree_reclaim is the largest reclaim the node’s downstream chain eventually unlocks: the Source frees nothing itself, but launching it is the only way to reach the point where its downstream Aggregate can drain, so it inherits that Aggregate’s reclaim (predicted_subtree_reclaim=1K). Propagation of the subtree value stops at a convergence node — the Combine two independent chains feed — so each feeding chain keeps the distinct reclaim it owns up to the join rather than the shared post-join total. All three render 0B when no file-size seed reached the node — a multi-file (glob/regex/paths) or absent/unreadable Source, or any node downstream of one. The bytes are formatted in the same binary-prefix units as memory.limit (1K, 64M, 2G), and the same three values appear in --explain --format json under node_properties.<name>.predicted_peak_bytes, predicted_freed_bytes_on_complete, and predicted_subtree_reclaim_bytes.
A === Buffer Edges === section follows, listing the node_buffers slot between each pair of non-fused stages. Every slot is a priority-0, non-back-pressureable NodeBufferConsumer — the cheapest victim class — and the slot= number is the stable index the executor admits into. The slot carries the producer’s predicted volume (it holds the producer’s materialized output and frees that whole buffer once the consumer drains it):
=== Buffer Edges ===
edge source.orders -> aggregation.dept_totals:
buffer: node_buffer (slot=0)
arbitration: spill_priority=0, can_back_pressure=false, predicted_peak=1K, predicted_freed=1K (producer: source)
Reading top to bottom: under memory pressure the arbitrator first spills the inter-stage buffer (priority 0), then — if the soft threshold is still tripped — pauses the Source before it ever forces the Aggregate (priority 30) to spill. That ordering is exactly what the default pause policy (BackPressurePreferred -> Priority) encodes. Cross-reference the per-operator table to see where any operator in your own pipeline lands.
Scheduling
When a pipeline has several nodes that are simultaneously runnable — every one of their inputs is ready, so the executor could legally run any of them next — the engine picks one deterministically rather than walking topological position blindly. The common case is a single linear chain where only one node is ever runnable at a time, and there is nothing to choose. The choice matters only for a pipeline whose DAG has multiple independent subgraphs (for example, two unrelated Source → Aggregate branches that a later Combine or Merge joins): both branches’ lead nodes become runnable together.
The engine runs one node to completion before dispatching the next. When two independent chains converge — two Source → Aggregate branches a later Combine joins — both branches’ outputs must be materialized and held until the Combine consumes them, so the chain that runs second builds its working set while the first chain’s output already sits in a buffer. Running the memory-heaviest chain first therefore drains and releases its large state before the lighter chain’s output has to coexist with it, lowering the peak resident working set; running it last makes its large state coexist with the already-materialized output of every chain that finished before it. What the ranking also buys is when the frontier offers a mix of node kinds: with a blocking operator ready to drain (and reclaim its accumulated state) alongside a fresh Source about to charge a new buffer, draining first reclaims headroom before the new charge lands, and under a tight budget the engine prefers the runnable node that fits the remaining headroom over one that would overflow it.
The engine ranks the simultaneously-runnable nodes by these rules, in order:
-
Headroom fit. A node whose
predicted_peakfits within the budget’s remaining headroom is preferred over one that does not. Running a node that fits avoids tipping the live working set over the soft threshold and forcing a spill that a different ordering would have avoided. A node with an unknown peak (predicted_peak=0B— no file-size seed reached it) counts as fitting, because0is always within any headroom; this keeps an unestimated pipeline on its topological order rather than deprioritizing every node. -
Immediate-freed tiebreak. Among nodes that fit equally, the one with the larger
predicted_freedruns first. Finishing a node that returns more bytes to the budget the instant it completes maximizes the headroom available to everything still waiting — the same intuition as shortest-remaining-state-first. A ready blocking operator (which reclaims its accumulated state now) therefore wins over a fresh Source (which frees nothing the instant it drains), because the immediate reclaim is the headroom-minimizing choice. -
Subtree-reclaim tiebreak. Among nodes that also tie on immediate freed — most importantly the fresh Sources of independent chains, which all free
0the instant they drain — the one with the largerpredicted_subtree_reclaimruns first. This front-loads the chain whose completion eventually frees the most: a Source’s value is the reclaim its downstream Aggregate will release, so the heavier chain’s Source is dispatched ahead of the lighter one even when it sorts later in topological order. Because it ranks below immediate freed, it never elects a fresh heavy Source over a ready light Aggregate (which would raise the peak), only between candidates whose immediate reclaim is equal. -
Stable-index tiebreak. If two nodes still tie (equal fit, equal immediate freed, equal subtree reclaim — including the all-unknown case where all are
0), the one with the lower stable node index wins. The index is each node’s position in the plan’s topological order — the exact sequence the executor walks the DAG — so this tiebreak is fully deterministic and independent of the machine, the thread schedule, and the order the runnable set happened to be assembled in.
Fallback to topological order. When no node carries a volume estimate (every predicted_peak is 0B), rules 1–3 are no-ops — every node fits and every node frees the same 0 — so rule 4 alone decides, and the engine runs nodes in exactly the lowest-index / topological order it used before any volume estimates existed. This is the load-bearing guarantee: scheduling never changes record output or branching order. A pipeline’s data output is byte-identical regardless of the predictions; the estimates only steer which runnable node goes first to reclaim headroom sooner, front-load the heaviest chain, and prefer fitting nodes under pressure, never what each node computes.
Because the predictions are a pure function of the plan shape and the input files’ on-disk sizes (resolved against the pipeline file’s directory, never the process working directory), the scheduling decision is identical on every machine for an identical plan over identically-sized inputs.
Correlation Key Lifecycle & Rollback Narrowing
This page is the engineer’s reference for how a correlation key is born, carried, and unwound inside the executor. A correlation key declares a set of records from a single source as an atomic group: if any record in the group fails validation or processing, the whole group is sent to the DLQ. The mechanics here are the lineage substrate — $ck.<field> shadow columns, (row_id, source_name) lineage pairs, the per_source_rollback_cursors map, and Combine input snapshots — that the Retraction Protocol builds on. Where this page describes how identity is tracked and how a failure narrows the rollback, the retraction protocol describes how an aggregate refinalizes without DLQ’ing whole groups.
User-facing view: the User Guide’s “Correlation Keys” page.
Lifecycle
The engine adds a shadow column named $ck.<field> (one per correlation-key field) to every declaring source’s schema and copies the field’s value into it at ingest. From that point on, the shadow column is the authoritative group identity — if a downstream transform rewrites the user-declared correlation field, the shadow column is untouched and the group identity is preserved.
Shadow columns are an internal engine namespace. You never write $ck.<field> in YAML or CXL — the engine manages them. They are stripped from default writer output. To surface them for debugging, set include_correlation_keys: true on an output node:
- type: output
name: debug_out
input: validate
config:
name: debug_out
type: csv
path: "./debug.csv"
include_correlation_keys: true
A correlation key is declared per source: each source’s config: block carries an optional correlation_key: field naming the column (or list of columns) whose value identifies a record’s correlation group within that source. The engine widens each declaring source’s schema with one $ck.<field> shadow column per field and stamps the user-declared value into it at ingest. A record’s correlation group is identified by the tuple of values for that source’s listed fields; records sharing the same tuple within the same source belong to the same group. There is no pipeline-level correlation key. A source whose declared correlation_key: field names a column not present in its own schema: block is rejected at compile time with diagnostic E153.
Multi-source pipelines
Different sources can declare different correlation-key fields. The engine treats each source’s CK identity as locally consistent: a record from customers is a member of the customer-id group named in its row, and a record from orders is a member of the order-id group named in its row, regardless of whether customer_id appears in orders or vice versa. Combine and Merge nodes that join across sources negotiate which CK columns survive into the joined output via the Combine node’s propagate_ck: field (see Combine Join Strategies).
A source that declares no correlation_key: carries no $ck.* widening. Records from such a source flow through the pipeline without group identity; per-record errors DLQ on a per-record basis with no group fan-out. The orchestrator’s relaxed-aggregate retraction protocol still activates if any other source on the same DAG carries a CK field that an aggregate’s group_by omits — the retraction protocol scope is the DAG’s lattice of $ck.* columns, not any single source’s declaration.
DLQ semantics
When a record fails inside a correlation group:
- The failing record produces a trigger DLQ entry. Its category reflects the actual failure (e.g.
type_error,validation_failed). - Every other record from a source that contributed a trigger to the same group produces a collateral DLQ entry. Collaterals carry the category
correlated. - Records belonging to other (clean) groups proceed normally.
A record with a null value for the correlation-key field is treated as its own per-record group: it has no peers and DLQ atomicity does not span multiple records.
A Combine output-row eval failure that the engine recovers from (under continue / best_effort, in the hash build-probe inline arm) produces entries under the combine_output_row category — distinct from the upstream-Transform type_coercion_failure because the entry carries the contributing-build lineage and rewinds both the driver and the matched build source’s rollback cursor. See Per-source rollback narrowing below for the cursor-rewind detail.
The dlq_count counter sums triggers and collaterals.
Per-source rollback narrowing
When two sources contribute records to the same correlation group, a failure originating from one source does NOT collaterally DLQ records from the OTHER source. The collateral fan-out is scoped to the failing source’s records only.
Concretely, consider [src_a, src_b] → merge → tfm → out with both sources declaring correlation_key: id. A mid-stream Transform error fires on every src_b record but leaves src_a records untouched:
- type: transform
name: tfm
input: m
config:
cxl: |
emit id = id
emit ratio = if($source.name == "src_b") then (1 / 0) else amt
Under per-source rollback, the dirty correlation group for each id value contains:
- One trigger DLQ entry — the
src_brow that hit1 / 0. - The
src_arow sharing the sameidis spared and reaches the output.
The engine identifies origin per record via the engine-stamped $source.name column. Within the failing source’s records, the existing CorrelationFanoutPolicy (Any / All / Primary) determines which records DLQ — the policy semantics are unchanged. Single-source pipelines see bit-identical behavior to the pre-narrowing engine because every co-grouped record shares the failing source by construction.
Records that carry no single-source attribution — synthetic aggregate emits and Combine output rows — are NOT spared by per-source narrowing. They flow through the existing collateral path because their stamp falls back to the merged-source identity which is ambiguous about origin.
The engine also surfaces a per_source_rollback_cursors map on the ExecutionReport, keyed by source name and carrying the highest source row number that cleanly exited a forward operator. The map advances per record at the clean exit of Transform / Route / Aggregate, and rewinds per contributing source on max_group_buffer overflow to the lowest row_num any group member of that source contributed. Sources whose records all DLQ never land in the map. The map is the replay anchor for per-source resume: a downstream rerun reads each source’s cursor as the floor for what must be reprocessed.
On max_group_buffer overflow, every record in the overflowing group still lands in DLQ (one GroupSizeExceeded trigger plus per-row collaterals), but the per-source rollback cursor rewinds independently per contributing source. Attributing the overflow failure itself to one source would be a fiction — every contributing source shared blame proportionally — so the DLQ shape stays group-wide while the rewind narrows per source.
The relaxed-CK aggregator’s per-row lineage carries (row_id, source_name) pairs so a finalize-time retract scoped to one source rewinds only that source’s contributions to each affected group. The source half of the pair is load-bearing under multi-source ingest: each source numbers its rows from its own monotonic counter, so two sources that both feed the same aggregate group can contribute records at identical row_id values. Pairing the row id with its source keeps src_a’s row 1 distinct from src_b’s row 1 when both land in one group, so a retract that must remove both reaches each one instead of collapsing the colliding ids and stranding the second source’s contribution.
Combine input snapshots are captured at fold start and cleared at every Combine arm’s exit (inline, IEJoin, GraceHash, SortMerge). When a Combine output-row eval fails recoverably under continue / best_effort in the hash build-probe (inline) arm — a probe-key, residual-filter, or matched / on_miss: null_fields body failure on one driver row — the snapshot restores each contributing source’s rollback cursor to the value it held at the start of the fold (its pre-fold floor), lowering the cursor only if it had since advanced, then routes the row to the DLQ under the combine_output_row category. Only the sources that fed the failing row rewind; co-folded sources that did not contribute keep their forward progress. The IEJoin, grace-hash, and sort-merge arms propagate an output-eval failure as fail-fast regardless of strategy. See Combine Join Strategies for the per-arm execution detail.
Group buffering
The engine buffers records per correlation group until either the group completes (all source records observed) or a failure triggers a flush. The max_group_buffer: field on the pipeline-level error_handling: block caps per-group buffering across every source’s groups:
error_handling:
max_group_buffer: 100000 # Default: 100,000
Groups that exceed the cap are DLQ’d entirely with a group_size_exceeded trigger plus a collateral entry per buffered record. This is a backpressure boundary, not a hard error. The interaction with the per_source_rollback_cursors map on overflow is described under Per-source rollback narrowing above: the DLQ shape stays group-wide while the cursor rewind narrows per contributing source.
See also
- The Retraction Protocol — how relaxed-CK aggregates refinalize affected groups instead of DLQ’ing them wholesale, and how the synthetic
$ck.aggregate.<name>column lifts the post-aggregate retract path. - Operator Retraction Cost Reference — the per-operator memory/CPU footprint under retraction, plus the
=== Retraction ===explain block and thecorrelation.retract.*metrics counters. - Combine Join Strategies —
propagate_ck:semantics, match modes, and the per-arm snapshot/rewind behavior. - Memory Arbitration & Scheduling — how the RSS budget and spill thresholds interact with group buffers.
The Retraction Protocol
When an aggregate’s group_by omits a correlation-key field that is visible upstream, a single correlation group no longer maps cleanly onto a single aggregate group — one CK group can span many aggregate groups. The strict-collateral DLQ shape (roll back the whole group, including the aggregate output row) would then over-reject: a single bad source row would void an entire department’s total. The retraction protocol is the engine’s answer. It retracts only the failing records’ contributions and refinalizes the affected aggregate groups, so surviving contributions still produce a correct output row.
This page is the protocol itself, woven from three operator surfaces: the aggregate’s strict-vs-retraction path selection, the synthetic $ck.aggregate.<name> lineage column that lifts post-aggregate failures, and the buffer-mode window behavior. It builds directly on the lineage substrate in Correlation Key Lifecycle & Rollback Narrowing. For the per-operator memory/CPU footprint and the explain/metrics surfaces, see Operator Retraction Cost Reference.
User-facing view: the User Guide’s “Correlation Keys” / “Aggregate Nodes” pages.
Path selection: strict vs. retraction
The engine inspects each aggregate’s group_by against the upstream CK lattice (the union of $ck.* shadow columns visible at the aggregate’s input). Authors do not configure this — the engine inspects the configuration and picks the correct path:
-
group_bycovers every upstream CK field — strict-collateral path. Each emitted row inherits the correlation identity of its inputs, the aggregate emits one row per group, and a DLQ trigger anywhere in the group rolls back the whole group including the aggregate output row. This is the zero-overhead default; strict aggregates short-circuit to the two-phase commit body and pay no retraction overhead.- type: aggregate name: order_totals input: orders config: group_by: [order_id] # strict — covers the upstream CK cxl: | emit total = sum(amount) -
group_byomits any upstream CK field — retraction protocol path. A single correlation group may span multiple aggregate groups; CK fields omitted fromgroup_bystop being visible to downstream consumers of this aggregate’s output as user-named columns. The engine retracts only the failing records and refinalizes affected groups, so the aggregate output row reflects the surviving contributions.- type: aggregate name: dept_totals input: orders config: group_by: [department] # retraction protocol is active cxl: | emit total = sum(amount)
On the strict path, aggregate output rows inherit the correlation meta of the records that fed them. If any input record in a correlation group fails, the surviving records in that group still flow through the aggregator and produce one aggregate row — but that aggregate row is itself DLQ’d as a collateral and never reaches the writer.
On the retraction path, the engine retracts only the failing records and refinalizes affected groups, so the aggregate output row reflects the surviving contributions. Operators downstream of a retraction-mode aggregate run only at commit time on the post-recompute aggregate emits, so non-deterministic CXL builtins (e.g. now) evaluate exactly once per output row and need no special-casing.
E15Y: streaming incompatibility
The retraction protocol’s runtime constraint is enforced automatically once the engine has classified the aggregate. A retraction-mode aggregate is incompatible with strategy: streaming and is rejected with E15Y:
clinker explain --code E15Y # retraction-mode aggregate incompatible with strategy: streaming
The reason is structural: streaming aggregates emit at group-boundary close, before the terminal correlation commit, and that early emit defeats the rollback window the retraction protocol depends on. There is nothing left to retract from once a streaming group has already emitted and been handed downstream. The engine selects the path from group_by content, so an author who writes strategy: streaming on what turns out to be a relaxed-CK aggregate gets the compile-time E15Y rather than silent incorrect behavior.
Reversible vs. BufferRequired accumulators
The cost of refinalizing a group depends on whether the accumulator can be run in reverse:
-
Reversible accumulators (
sum,count,collect,any) carry a per-row lineage map(input_row_id → group_index)alongside accumulator state. A retract is O(retracted_rows) reverse-op calls plus onefinalize_in_place. The lineage map costs ~8 bytes/row plus the per-groupinput_rowsVec inline cost. -
BufferRequired accumulators (
min,max,avg,weighted_avg) cannot be unwound by a reverse op — removing the current max, for instance, requires knowing the second-largest value, which the running accumulator never retained. They hold per-group raw contributions until commit and recompute affected groups fromcontributions − retracted_rows.
The full per-accumulator memory formulas live in Operator Retraction Cost Reference.
Synthetic correlation column
A retraction-mode aggregate emits one engine-managed $ck.aggregate.<name> column on its output schema, alongside the user-emitted bindings ([group_by_columns] ++ [emitted_binding_columns]). The column carries the aggregator’s per-group index at finalize and costs ~16 bytes per emitted row (the Value::Integer payload plus its slot overhead). It is hidden from default writer output, mirroring the source-CK shadow column posture, and lives outside any user-visible CXL surface — authors never write or read it.
The synthetic column is the lineage hook that lifts the post-aggregate retract path. Without it, a failure on an aggregate output row would have no way back to the source rows that produced it: the aggregate has already collapsed many source rows into one. The column lets the orchestrator’s detect phase decode the per-group index back to the contributing source row ids via the retained aggregator’s input_rows table, and the recompute phase then retracts those source rows just as it would retract a directly-failing source record — matching the upstream-failure DLQ fan-out semantic.
Where retraction triggers are sourced
Retraction handles failures on both sides of the aggregate, via two different lineage hooks:
-
Upstream of a retraction-mode aggregate (Source ingest, Transform evaluation, Combine probe, Validation): retraction is fine-grained. The failing record carries
$ck.<field>shadow columns, the engine identifies its correlation group from those columns, andretract_rowremoves that record’s specific contribution from every affected aggregate group while leaving every other contributing record intact. -
Downstream of a retraction-mode aggregate (a Transform that fails on an aggregate output row, an Output writer that rejects an aggregate row): the failing record carries the synthetic
$ck.aggregate.<name>lineage column described above. The detect phase resolves that column to the contributing source row ids and feeds them into the same recompute pipeline as upstream failures.
Both surfaces converge on one recompute pipeline. The end-to-end demo at examples/pipelines/retract-demo/ runs both surfaces in one pipeline (a Transform failing on an aggregate output row alongside an upstream Transform error).
Window interaction: buffer-mode and wholesale recompute
When a window sits downstream of a relaxed-CK aggregate whose dropped correlation-key fields overlap the window’s group_by, the planner switches the window from streaming-emit to buffer-mode. Streaming windows are structurally incompatible with retraction for the same reason streaming aggregates are: a streaming window emits per-partition as the partition closes, leaving nothing to retract. The plan-time derivation detects the overlap between the aggregate’s dropped CK fields and the window’s partition_by axis and forces buffer mode.
A buffer-mode window stores per-partition raw row buffers until commit. On retraction, it reruns the configured $window.* evaluation over partition − retracted_rows and emits per-output deltas through the replay phase. All 13 $window.* builtins are covered uniformly by this wholesale recompute — there is no per-function reverse op the way Reversible aggregate accumulators have; the window simply re-evaluates the surviving partition end to end. This keeps ranking functions (row_number, rank, dense_rank), positional functions (lag, lead, first_value, last_value), and iterable predicates (any, every, exists, not_exists) all correct after a retract without bespoke per-builtin unwind logic.
Degrade fallback
When retraction’s preconditions break at runtime — an aggregate spilled before retract reached it, or a window partition exceeded the memory budget — the orchestrator degrades to “DLQ entire affected group/partition”, the same strict-collateral DLQ shape every aggregate uses on the strict path. Each degrade increments correlation.retract.degrade_fallback_count; persistent non-zero values point at a tighter memory budget or a smaller correlation-key cardinality. The degrade path and its metrics are detailed in Operator Retraction Cost Reference.
See also
- Correlation Key Lifecycle & Rollback Narrowing — the
$ck.<field>shadow columns,(row_id, source_name)lineage pairs, andper_source_rollback_cursorsmap this protocol consumes. - Operator Retraction Cost Reference — the per-operator cost table, the
=== Retraction ===explain block, and thecorrelation.retract.*counters. - Memory Arbitration & Scheduling — the spill thresholds whose breach triggers the degrade fallback.
Operator Retraction Cost Reference
This is the capacity-planning reference for pipelines running the retraction protocol. An aggregate whose group_by omits any upstream CK field activates retraction automatically (see The Retraction Protocol for the path-selection rules and Correlation Key Lifecycle & Rollback Narrowing for the underlying lineage substrate). Each operator on the post-source DAG carries a different cost profile under retraction; the table below is the centerpiece — it summarizes the per-operator footprint so you can size memory and pick propagate_ck settings before pipelines hit production.
User-facing view: the User Guide’s “Correlation Keys” / “Aggregate Nodes” pages.
Per-operator cost table
| Operator | Retraction cost |
|---|---|
| Source | None at retraction time. The CK shadow columns are stamped at ingest; replay never re-reads the source file. |
| Transform | Runs only at commit time on post-recompute aggregate emits when sitting inside a deferred region. Cost = O(rows_emitted_post_recompute) per region member, no extra state held. Non-deterministic CXL builtins (e.g. now) evaluate exactly once per output row, same as on a non-retraction pipeline. |
Aggregate (strict, group_by covers upstream CK lattice) | None. Strict aggregates short-circuit to today’s two-phase commit body and pay zero retraction overhead. |
| Aggregate (retraction-mode, Reversible bindings) | Per-row lineage map (input_row_id → group_index) carried alongside accumulator state — ~8 bytes/row plus the per-group input_rows Vec inline cost — plus one synthetic $ck.aggregate.<name> shadow column on every output row at ~16 bytes/row. Retract is O(retracted_rows) reverse-op calls plus one finalize_in_place. Reversible accumulators: sum, count, collect, any. |
| Aggregate (retraction-mode, BufferRequired bindings) | Per-group raw contributions held until commit, plus one synthetic $ck.aggregate.<name> shadow column on every output row at ~16 bytes/row. Memory cost = O(input_rows × Σ binding_value_size) plus the synthetic-column tail. Retract recomputes affected groups from contributions − retracted_rows. BufferRequired accumulators: min, max, avg, weighted_avg. |
| Combine (driver propagation) | One propagated $ck.<field> slot from the driver record. No retraction state held by the combine itself; replay carries upstream deltas through. |
Combine (propagate_ck: all / named: [...]) | Same per-row cost as driver propagation, plus the widened output schema’s $ck.<field> columns must be re-populated on replay. Cost scales with the output schema width, not retraction frequency. |
| Window (streaming) | None — streaming windows are incompatible with a retraction-mode aggregate whose dropped CK fields overlap partition_by. The plan-time derivation switches such windows into buffer mode. |
| Window (buffer-mode) | Per-partition raw row buffers held until commit. Memory cost = O(largest partition × per-row-size). Retract reruns the configured $window.* evaluation over partition − retracted_rows. Covers all 13 $window.* builtins uniformly via wholesale recompute. |
| Output | Holds retracted rows in correlation_buffers until commit. Replay substitutes the post-retract row in place; clean records flush to the writer, dirty records DLQ per the resolved correlation_fanout_policy. |
Degrade fallback and metrics counters
When retraction’s preconditions break at runtime (an aggregate spilled before retract reached it, or a window partition exceeded the memory budget), the orchestrator degrades to “DLQ entire affected group/partition” — the same strict-collateral DLQ shape every aggregate uses on the strict path. Each degrade increments correlation.retract.degrade_fallback_count; persistent non-zero values point at a tighter memory budget or a smaller correlation-key cardinality. The spill thresholds whose breach triggers this fallback are described in Memory Arbitration & Scheduling.
The clinker metrics collect spool reports the runtime counterpart to the plan-time table above:
correlation.retract.groups_recomputedcorrelation.retract.partitions_recomputedcorrelation.retract.subdag_replay_rowscorrelation.retract.output_rows_retracted_totalcorrelation.retract.degrade_fallback_countcorrelation.retract.synthetic_ck_columns_emitted_totalcorrelation.retract.synthetic_ck_fanout_lookups_totalcorrelation.retract.synthetic_ck_fanout_rows_expanded_total
Use the explain block (below) for plan-time capacity sizing, the metrics spool for post-run confirmation.
The === Retraction === explain block
Pipelines whose at least one Aggregate has a group_by that omits a correlation-key field get a === Retraction === block in the clinker run --explain text output. The engine selects the retraction-mode path automatically based on group_by content; the block is silent on every other pipeline, so strict-correlation and non-correlated --explain output stays identical to today’s text. (For the rest of the explain surface — buffer classes, arbitration parameters, the === Statistics === section — see the broader explain documentation.)
The block opens with a one-line summary —
retraction enabled — N relaxed aggregates, M buffer-mode windows, fanout policy: <policy>.
— followed by one block per retraction-mode Aggregate and one per buffer-mode window index.
Per retraction-mode Aggregate the block reports:
- the resolved accumulator path (
ReversibleorBufferRequired), - the per-row lineage memory cost (
~8 bytes/rowfor Reversible,n/afor BufferRequired which holds raw contributions instead), - the per-aggregate synthetic-CK column and its ~16-byte/output-row cost,
- the worst-case degrade fallback when retraction’s preconditions break at runtime.
Per buffer-mode window index the block reports:
- the source name and
partition_byfields, - the per-row buffer cost in
Valueslots over the index’s arena fields, - the worst-case partition memory ceiling under degrade.
Group cardinality is honestly surfaced as “unknown at plan time” — the planner has no group-cardinality side-table to consult before the run. Use this per-operator cost table and the per-row figures the explain block prints for capacity planning, then confirm the live shape via clinker metrics collect after the first production run.
See also
- The Retraction Protocol — the path-selection rules, E15Y, synthetic column, and buffer-mode window mechanics the costs above quantify.
- Correlation Key Lifecycle & Rollback Narrowing — the
$ck.*shadow columns andper_source_rollback_cursorsmap the cost model accounts for. - Combine Join Strategies —
propagate_ck:modes and their replay-time output-schema-width cost. - Memory Arbitration & Scheduling — the RSS budget and spill thresholds that govern when retraction degrades to whole-group DLQ.
Combine Join Strategies
Combine is the N-ary record-combining operator: every input is declared up front, the where: predicate matches records across inputs, and the cxl: body shapes the output row. This page covers the parts an engine engineer reaches for when reasoning about how a Combine executes — the strategy selection the planner performs from the predicate shape, the heap cost of materializing build sides, how reconciled document boundaries flow through every join path, the runtime mechanics of correlation-key propagation across all four execution paths, and the join-planner statistics catalog that drives build-side selection and grace-hash partitioning.
User-facing view: the User Guide’s “Combine Nodes” page.
Predicate classification
The where: expression is a CXL boolean evaluated for every candidate record pair across inputs. The planner splits a compound and-predicate into three conjunct classes, and the classification is what selects the execution strategy:
- Equi conjunct — a cross-input equality (
a.x == b.y). Drives the hash lookup or the sort-merge join. - Range conjunct — a cross-input ordered comparison (
a.start <= b.ts and b.ts <= a.end). Handled by the IEJoin algorithm when no equi conjunct constrains the same input pair. - Residual conjunct — any other CXL predicate (intra-input filter, function call, and so on). Applied as a post-filter after the equi/range match succeeds.
At least one cross-input equality is required for every Combine, except for pure-range predicates, which IEJoin handles without an equi conjunct.
Strategy hint
The strategy config field carries a hint; the planner has final say.
| Value | Behavior |
|---|---|
auto (default) | Planner picks a strategy from the predicate shape. Hash join for equi predicates; IEJoin for pure-range predicates. |
grace_hash | Force grace hash join (disk-spilling partitioned hash). Applies only to pure-equi predicates; ignored on predicates carrying range conjuncts. |
grace_hash is the right hint when build-side inputs are larger than the memory budget but fit on disk after partitioning. It is mostly an explicit performance assertion rather than a behavioral switch: the planner falls back automatically to grace-hash spill when an in-memory hash table approaches the RSS soft limit. So strategy: grace_hash on a build side that would have spilled anyway changes nothing operationally — it documents the author’s intent and pins the strategy regardless of the plan-time size estimate.
The choice of in-memory hash versus grace-hash for a pure-equality Combine is driven by the build-side row-count estimate (see Join-planner statistics below): a build side large enough to risk overrunning the memory limit is what tips the planner from the in-memory hash strategy to the disk-spilling grace-hash strategy.
Memory considerations
Build-side inputs are materialized in memory as hash tables keyed by the equi columns. For each non-driving input, plan for roughly 1.5–2× the raw CSV size in heap. A 50 MB product catalog typically occupies 75–100 MB of hash-table memory — the multiplier covers the per-key Value boxing, the bucket array overhead, and the per-entry chaining structure on top of the raw payload bytes.
This heap cost is the quantity the memory arbitrator charges against pipeline.memory.limit, and it is what the soft/hard threshold machinery watches when deciding whether to flip a pure-equi Combine to grace-hash spill. See Memory Arbitration & Scheduling for the spill thresholds, the back-pressure knob, and strategy overrides.
Document boundaries
A Combine forwards reconciled document boundaries to its output on every strategy — the inline hash build-probe, IEJoin, grace-hash, sort-merge, and the streaming-probe path. The boundary semantics are uniform across the strategy matrix, so a downstream operator never has to know which join algorithm ran.
Concretely:
- A per-document
Aggregatedownstream of a join flushes per document. A driver source that carries several documents (aglob:over monthly files, say) produces one roll-up per driver document after the join, not one fold spanning all of them. - A document that spans both join inputs — the same document carried on the driver and on the build side — opens and closes exactly once downstream. The boundary is reconciled, never double-fired: the join does not emit a separate open/close for the driver-side and build-side appearances of the same document.
This reconciliation is what lets the per-document aggregation model compose with joins without special-casing the operator order.
Correlation-key propagation
Combine declares which correlation-key columns its output rows carry via the required propagate_ck field. The choice shapes both the compile-time output schema and the runtime record builder — those are the two internal surfaces an engine engineer touches when changing CK behavior.
propagate_ck value | Compile-time output schema | Runtime record builder |
|---|---|---|
driver | Carries only the driver input’s $ck.<field> columns. | Build-side records contribute body fields only; their CK identity is consumed by the match and not copied onto the output row. |
all | Carries every input’s $ck.<field> columns. | Copies build-side CK values onto each output row alongside the body’s emit columns. Use when the build side carries CK fields downstream operators must read. |
{ named: [<field>, ...] } | Carries the explicit subset, intersected with what is actually present upstream. | Copies exactly the named subset. Use to project a multi-field CK down to a single field after a join. |
Driver wins on a name collision. If both the driver and a build input declare $ck.<field>, the column appears once on the output schema and the runtime keeps the driver’s value.
propagate_ck is required on every Combine; a pipeline without an explicit value fails to compile.
Match-mode interaction across the strategy paths
The propagation contract holds identically across the hash build-probe, IEJoin, grace-hash, and sort-merge paths — the record builder is shared, so a build-side CK value lands on the output row the same way regardless of which algorithm produced the match. The interaction that does vary is by match mode rather than by strategy:
match: first/match: all— each emitted row is one driver × one build pairing, so the propagated$ck.<field>slot holds a single value (the driver’s, or the build’s, per the table above).match: collect— the propagated CK slot is single-valued (it tracks the driver’s correlation-group identity), while the collected array column preserves the full lineage of every build match. The single-valued slot and the array column are distinct: the slot answers “which correlation group does this output row belong to,” the array answers “which build records were gathered.”
See the User Guide’s correlation-keys reference for the per-mode lifecycle narrative; the lifecycle and rollback-narrowing mechanics on the engine side are in Correlation Keys: Lifecycle & Rollback Narrowing.
Join-planner statistics
When the plan carries column statistics, --explain ends with a === Statistics === section listing the planner-wide statistics catalog. These are the figures that drive build-side selection and grace-hash partition-bit choice, so they belong to the join planner. Every figure is tagged with its provenance, so a metadata-derived estimate is distinguishable from a record-exact measurement.
Row counts — [file metadata] vs [exec sketch]
One line per source node, for example:
orders: ≈90 rows [file metadata] (informs combine build/probe + partition bits)
- A
[file metadata]figure is derived at plan time by dividing the input file’s on-disk byte length by an average-record-bytes constant, before any record is read. This is the same row count that drives a Combine’s build-side selection and its grace-hash partition-bit choice. A build side large enough to risk overrunning the memory limit is what tips a pure-equality Combine from the in-memory hash strategy to the disk-spilling grace-hash strategy. - An
[exec sketch]figure is the exact count a source measured during a run, superseding the plan-time estimate.
Row counts also appear inline on each Combine’s driving and build inputs (est. 90 [file metadata] rows).
Column sketches — distinct counts, heavy hitters, membership filters
Three sketch kinds are populated by operators while records flow. All three are maintained by the grace-hash Combine over its build-side join keys, recorded under the build input’s (node, column):
- Distinct-count estimate —
product_id: 12,431 distinct [exec sketch]. - Top-k heavy-hitter list with lower-bound counts —
product_id: heavy hitters [exec sketch, lower bound]: widget=9,000, gadget=3,200, .... The list is a lower bound on frequency: a value absent from it may still be frequent, so it is only ever used to promote a key, never to exclude one. - Membership filter —
product_id: membership filter, 119048 bits / 7 probes [exec sketch, sized from estimate]. Sized up front from the build node’s plan-time row-count estimate, built in the single build pass with no per-row buffer, and skipped entirely when no plan-time estimate is available.
Honest nulls and missing sections
A statistic that was never gathered renders as null rather than a fabricated zero. A plan over sources whose sizes cannot be read — a glob/regex multi-file source, a network source, or a missing/unreadable file — adds no Statistics section at all, and (per the membership-filter rule above) skips the membership filter that the plan-time estimate would have sized. Confirm the live shape via clinker metrics collect after the first production run, since the planner has no group-cardinality side-table to consult before the run.
Merge & Back-pressure
Merge concatenates upstream branches that share a schema into a single stream. For the engine, the interesting surface is not the YAML — it is where Merge fuses into the Source ingest loop, where the seeded-interleave path deliberately opts out of that fused channel topology, and how back-pressure propagates (or fails to) through the bounded mpsc channels behind each mode. This page covers those mechanics.
User-facing view: the User Guide’s “Merge Nodes” page.
Fusion of interleave over Sources
When every direct predecessor of an unseeded interleave Merge is a Source node, the executor fuses the Merge into the source ingest loop. The predecessor channels are polled directly and Merge consumption proceeds at live ingest rate, with no intermediate buffering tier between the Source readers and the Merge arm.
This fused live-channel path is what makes end-to-end back-pressure possible across the Merge boundary (see Back-pressure semantics below). It is also the same predicate the streaming-output path checks before it engages — see Streaming Output Writes.
When the predecessors are not all Sources (e.g. Transform → Merge), fusion does not apply: the Merge consumes pre-buffered predecessor outputs in round-robin order, and live back-pressure across the Merge boundary itself is unavailable in that shape (though the upstream operator’s own bounded buffer still throttles its predecessors).
Seeded interleave
Snapshot tests and benchmarks that need reproducible cross-input ordering opt into a deterministic schedule via interleave_seed::
- type: merge
name: combined
inputs: [east, west]
config:
mode: interleave
interleave_seed: 42
A seeded interleave bypasses the fused live-channel path entirely. Instead of polling predecessor channels at ingest rate, the Merge:
- Pre-buffers each predecessor’s full output into a
Vec. - Emits records in
fastrand-driven order, seeded byinterleave_seed.
Output is reproducible regardless of upstream timing. The cost is that the seeded path opts out of live back-pressure across this Merge — the buffers fill to completion before emission begins, so a slow downstream consumer cannot throttle the Source readers while those Vecs are still filling.
Back-pressure semantics
How a slow consumer or a slow upstream reader propagates back through the DAG depends entirely on the Merge mode.
concat
Each Source ingest task pushes into its own bounded mpsc channel, capacity 1024 records per Source. Peer sources produce concurrently up to that capacity — the dispatch arm consumes from inputs[0]’s channel before turning to inputs[1]’s.
Consequences:
- Memory. A non-leading input can hold up to one channel’s worth of buffered records (1024) before its producer blocks. Multi-input
concatoverNSources may carry up to(N − 1) × 1024records in flight even while only one input is being drained. - Latency. A record produced by
inputs[1]whileinputs[0]is still draining will not reach output untilinputs[0]finishes, regardless of how fast it was produced. - Producer-side back-pressure. When a non-leading input’s channel fills, its reader blocks at
blocking_send, propagating pressure back to the upstream file/network reader. The upstream is throttled even though it is not the currently-consumed input.
concat is the right choice when downstream consumers depend on declaration-ordered records (snapshot tests asserting byte-identical output) or when inputs represent ordered time partitions that must remain contiguous.
interleave (unseeded)
Fused with Source predecessors, the Merge arm polls every predecessor’s channel concurrently. Live back-pressure flows end-to-end:
- A slow downstream operator delays Merge consumption → the predecessor channels fill → the Source reader tasks block.
- A fast input does not wait on a slow peer — the Merge schedules whichever channel has a ready record.
When predecessors are not all Sources, fusion does not apply: the Merge consumes pre-buffered predecessor outputs in round-robin order, and live back-pressure across the Merge boundary itself is unavailable, though each upstream operator’s own bounded buffer still throttles its predecessors.
Unseeded interleave is the right choice when end-to-end latency matters and the downstream consumer is order-insensitive (an aggregator grouping on a key, or a writer that does not assert on row sequencing).
interleave (seeded)
The seeded path does not preserve live back-pressure across the Merge. It pre-buffers each predecessor’s full output into a Vec before emitting in fastrand-driven order, so a slow consumer downstream of a seeded Merge will not throttle the Source readers while the buffers are still filling.
If you need both run-to-run determinism and live back-pressure, prefer asserting on the multiset of records rather than their sequence and use unseeded interleave, or fall back to concat over deterministically-declared inputs.
Streaming Output Writes
Output nodes are the terminal sinks of a pipeline. For most topologies the Output arm runs buffered: the upstream stage accumulates every record before the writer fires. But when a single Output sits directly downstream of a fused Merge.interleave of Sources, the executor takes a streaming path that wires the Merge arm to the writer task through a bounded tokio::sync::mpsc channel and fires Writer::write_record per record, concurrent with Merge production. This page covers the topology that selects that path, the exact eligibility predicate, the end-to-end back-pressure chain, the counter semantics that must match the buffered arm, and the writer contract that rejects Value::Map payloads.
User-facing view: the User Guide’s “Output Nodes” page.
Streaming vs. buffered
When a single Output sits directly downstream of a Merge whose mode is interleave and whose every direct predecessor is a Source, the executor takes the streaming path: a bounded tokio::sync::mpsc::channel connects the Merge arm to the writer task, and Writer::write_record fires per record as Merge emits, concurrent with Merge production.
The buffered alternative — which still runs for every other Output topology — waits until the Merge arm has accumulated every record before invoking the writer. With a slow upstream Source that defeats the live back-pressure the Merge.interleave fusion provides at the Source-channel layer: each record sits in node_buffers[merge] until the slow Source finishes. The streaming path exists precisely to preserve that fused back-pressure all the way to the sink.
The streaming path is selected automatically — there is no opt-in setting. Pipelines that don’t match the topology keep the buffered path.
Topology
- type: source
name: src_a
config: { type: csv, path: a.csv, schema: ... }
- type: source
name: src_b
config: { type: csv, path: b.csv, schema: ... }
- type: merge
name: merged
inputs: [src_a, src_b]
config:
mode: interleave # required
- type: output
name: out
input: merged
config:
name: out
type: csv
path: out.csv
Eligibility
Every condition must hold for the streaming path to engage; if any fails, the buffered path runs:
- The Output has exactly one incoming edge, and that predecessor is a
Mergewithmode: interleave. - Every direct predecessor of that Merge is a
Source— the same predicate the fusedMerge.interleavearm uses for its livetokio::select!(see Merge & Back-pressure). - The Merge has no other downstream consumer besides this one Output (no fan-out).
- The Output is not in the init-phase ancestor closure.
- The
OutputConfighas nosplit:block — splitting writers manage their own file rotation lifecycle. - The writer is registered in the single-file writer registry (not
fan_out_per_source_file). - No
Sourcein the pipeline declares a correlation key — the correlation-buffered output path defers writes toCorrelationCommitand is incompatible with per-record write.
Back-pressure flow
Under the streaming path, back-pressure flows end-to-end:
writer slow → mpsc::Sender::send().await yields
→ Merge arm yields
→ Source mpsc::Receiver fills
→ Source ingest task blocks on send
The bounded handoff channel between Merge and Output (256 slots) and the existing per-Source ingest channels form a single pace-bound chain from the underlying Write sink back to the source reader. A slow file system, a saturated network sink, or a deliberately-paced writer no longer accumulates records in pipeline-internal Vecs; the upstream readers slow down to match.
Counter semantics
Counter behavior under the streaming path matches the buffered Output arm exactly:
records_writtenincrements once perWriter::write_recordcall.ok_countcounts distinct sourcerow_nums reaching the Output.dlq_countis unaffected — DLQ entries originate upstream.
Stage metrics (SchemaScan, Write, Projection) accumulate into the same fields the buffered path uses. The dispatcher folds the streaming task’s per-task accounting back into the run-wide totals at end of DAG, so a streaming run and a buffered run over the same input produce identical counter output.
Writer rejection of Value::Map payloads
CSV, XML, fixed-width, EDIFACT, X12, and HL7 writers refuse records carrying a Value::Map payload at any column slot, raising:
FormatError::UnserializableMapValue { format, column }
JSON is the exception — it serializes Value::Map natively as a nested object.
The typical cause is a $widened sidecar reaching a non-JSON writer because the Output node set include_unmapped: false, which strips the sidecar’s expansion and leaves the raw Value::Map slot to hit the writer. The contract is the same on the streaming and buffered paths: the writer rejects the map-valued record rather than emitting a malformed row. See Schema Drift & the $widened Sidecar for the sidecar lifecycle, the include_unmapped interaction, and the remediation routes for this rejection.
Schema Drift & the $widened Sidecar
User-facing view: the User Guide’s “Auto-Widen & Schema Drift” page.
This page is the engine-internals reference for how Clinker absorbs input columns the source’s declared schema: block does not name, carries them through the DAG, and either expands them back at the sink or refuses them at a writer that cannot serialize them. The mechanism is an on-schema sidecar column named $widened, stamped by the engine and propagated by the same machinery that carries every user-declared column. The depth here is the sidecar’s data model (FieldMetadata::WidenedSidecar, Value::Map), the per-node-type propagation rules, the writer/DLQ rejection paths, and the structural reasons the design is on-schema rather than off-schema. The user-facing page documents the three policy modes and the YAML knobs; this page documents why the absorber is shaped the way it is.
The three modes (context)
The per-source on_unmapped policy selects one of three behaviors for input fields absent from the declared schema. The engine-wide default is auto_widen.
auto_widen(default) — per-record undeclared fields are absorbed into aValue::Mappayload carried by an engine-stamped$widenedsidecar column appended to the source’s schema. The payload propagates downstream and the sink expands it back to top-level columns wheninclude_unmapped: trueon the Output node (the default). Pattern precedent: Databricks Auto Loader’s_rescued_datasidecar and ClickHouse’sJSONcolumn type.drop— undeclared input fields are silently stripped at read time. No sidecar; the source’s plan-time schema equals the declaredschema:.reject— any input record carrying a key not in the declared schema fails the source with aFormatError::UndeclaredFielddiagnostic naming the offending field.
Everything below concerns auto_widen, the only mode that materializes a sidecar.
The $widened sidecar absorber
auto_widen is implemented as an on-schema sidecar: the engine appends a single column named $widened to the source’s schema, marked with FieldMetadata::WidenedSidecar. Each record’s undeclared input fields are stored as the sidecar’s Value::Map payload — keyed by input field name, valued by the read scalar.
The on-schema design is deliberate, and the reason is a silent-loss bug class. An off-schema sidecar — a parallel data structure living outside Schema — would be dropped by any code path that reconstructs a Record from schema.columns() and a value vector. The DAG has many such reconstruction points (projection, sort, spill round-trips, combine collect-array assembly), and each one would carry a standing obligation to “remember to copy the side-channel.” The on-schema slot instead inherits the exact same serialization, span propagation, sort/spill, and projection machinery as a user-declared column: there is no separate copy obligation on any consumer, because to every consumer $widened is a column.
The trade-off the on-schema design accepts is that the sidecar occupies a real schema slot the typechecker can see by name. CXL expressions cannot read or write the sidecar — the typechecker is blind to its contents (the Value::Map interior is never type-resolved into addressable fields), and the parser rejects a literal $widened reference at the system-variable layer. The net effect is that the sidecar rides through every structural transform automatically while remaining unaddressable from user CXL.
Propagation through the DAG
The $widened sidecar follows these rules through downstream nodes. The table is the propagation contract each node type’s executor implements.
| Node type | Sidecar behavior |
|---|---|
| Transform | Inherits unchanged from input (transforms are row-preserving). |
| Aggregate | Output’s $widened slot is Value::Null — per-row payloads have no canonical aggregation. Users who need an unmapped field at aggregate output must add it to group_by or emit it explicitly via an aggregate function. |
| Combine | Driver’s sidecar rides through; build-side sidecars are dropped (mirrors propagate_ck: Driver). Build-side iter_user_fields() filters every engine-stamped column from match: collect array payloads, so build $widened cannot leak into the collect array. Users can lift a build-side unmapped field via <build_qualifier>.<field> in the combine body’s CXL. |
| Route / Merge | Row-preserving — sidecar passes through. Merge requires every input source to share the same on_unmapped policy; mixing fails compile with E315 (see below). |
| Composition | Body inherits the parent’s sidecar via the synthetic input port; whatever the body’s terminal node carries flows back to the parent. The body’s terminal-node propagation rule applies (e.g. an Aggregate terminal yields Value::Null at the parent boundary, a match: first Combine terminal carries the driver’s payload). |
| Output | Sidecar expands to top-level columns when include_unmapped: true (the default). Set include_unmapped: false to strip the sidecar (and every other unmapped input field) so only explicitly-emitted columns reach the writer. |
Two of these rows encode load-bearing internal mechanics worth restating:
- Combine
propagate_ck: Drivermirroring. The sidecar follows the same provenance rule as correlation keys: only the driver (probe) side’s payload survives the join, build-side payloads are dropped. The build-sideiter_user_fields()iterator is the single filter that excludes every engine-stamped column —$widenedand the$ck.*lattice alike — from thematch: collectarray, so a build record’s sidecar can never appear as an element of a collect array even though build user fields can. The escape hatch for a genuinely needed build-side unmapped field is to lift it explicitly through the combine body CXL via the build qualifier. - Aggregate null-out. There is no canonical way to fold N per-row
Value::Mappayloads into one, so the Aggregate output slot is deliberatelyValue::Nullrather than (say) the first row’s payload or a merged map. The explicit-emit path (group_bymembership or an aggregate function) is the supported way to carry a specific unmapped field across an aggregation boundary.
Output controls
When include_unmapped: true (the default), fields the source absorbed into $widened are expanded back to top-level columns at the sink. The expansion happens at the projection layer, before the writer sees the record, so the literal $widened slot is stripped during expansion and the writer never sees a Value::Map for a well-formed pass-through. Setting include_unmapped: false strips the sidecar (and every other input field not explicitly emitted upstream) so the writer sees only user-declared columns.
include_unmapped composes independently with include_correlation_keys; the two flags are orthogonal — include_correlation_keys does not surface $widened. Because expansion is a projection-layer operation, a CSV source under auto_widen feeding a JSON output under include_unmapped: true produces JSON objects whose top-level keys include both declared columns and absorbed input columns, with no sidecar key remaining.
Writer rejection of Value::Map payloads
CSV, XML, and fixed-width writers refuse any record carrying a Value::Map payload at any column slot, raising FormatError::UnserializableMapValue { format, column }. The rejection lives in each writer’s value-to-string helper — a single point of truth, with no defensive prechecks scattered ahead of it. JSON serializes Value::Map natively as a nested object and never raises.
The common trigger is the $widened sidecar reaching one of those three writers because the Output node set include_unmapped: false (which suppresses the projection-layer expansion that would otherwise have flattened the map to top-level scalars). Two remediations exist, and the error message names both: leave include_unmapped at its default true so projection expands the map before write, or coerce the map to a scalar in CXL before the emit.
DLQ filtering
The dead-letter-queue writer applies the same exclusion at its own layer rather than relying on the main-path projection. dlq::dlq_user_columns strips every column tagged FieldMetadata::WidenedSidecar, so the DLQ CSV header never contains a $widened column even when a DLQ entry’s original_record still carries the full auto_widen schema shape. Correlation-lattice columns ($ck.*) are deliberately retained in DLQ output for collateral debugging — the DLQ filter excludes only the unserializable sidecar, not the engine-stamped provenance columns.
E315 — Merge inputs must agree on policy
Merge concatenates streams positionally against the merge node’s output_schema (taken from the first input). Every input must agree on column shape — same column names, same on_unmapped policy, same correlation_key set. The $widened agreement is a special case of that rule: if one upstream source uses auto_widen (and therefore carries the sidecar column) while another uses drop or reject (and does not), the two input schemas disagree on the presence of the $widened slot, and the positional concatenation has no coherent column to align. Compile fails:
E315: merge "merged": input schemas disagree on the `$widened` auto_widen sidecar column.
The remediation is to set every merge upstream source to the same on_unmapped policy; for sources that should explicitly omit the sidecar, declare on_unmapped: { mode: drop } (or reject) on each so the absent-sidecar shape is uniform across inputs.
Fixed-width sources are structurally inert
Fixed-width sources are positional: the schema is constructed from width / start..end byte ranges, and bytes outside the declared ranges are invisible to the reader. There is no notion of an “undeclared field” to absorb — a byte either falls inside a declared range (and becomes a declared column) or is never read. auto_widen therefore can never populate the sidecar for a fixed-width source; the $widened slot stays Value::Null for every record.
Because the policy is silently inert rather than wrong, the executor emits a tracing::info diagnostic at source-reader construction time when auto_widen is the policy on a fixed-width source, naming the source. The diagnostic fires once per reader instance — a source used as a combine build-side input across multiple combines may produce one log line per combine. To avoid the noise, switch to on_unmapped: drop (or reject) for explicit scalar semantics, or accept the empty sidecar.
Staging, Crash Durability & Locks
User-facing view: the User Guide’s “Storage & Spill Location” page.
This page is the engine-internals reference for the durability and concurrency mechanics behind Clinker’s storage subsystem: how a matched source file is copied to a local staging volume without ever leaving a corrupt or half-trusted artifact behind, how concurrent clinker invocations sharing one staging or spill volume coordinate through advisory locks, and how a startup crash purge reclaims the artifacts a SIGKILL-ed run could not clean up. The depth here is the staging copy protocol (single-pass copy + hash, atomic publish via rename, parent-directory fsync, verify, manifest commit), the per-source reader-writer lock semantics, the orphan-detection liveness gates, the file-permission model, and the filesystem-journal reasoning behind the directory fsync. The user-facing page documents the [storage] config block, the spill dir, disk cap, compression, and observability surfaces; those are out of scope here.
How a file is staged
When storage.staging is enabled and a source path matches a configured pattern, the source is copied to a local volume before the pipeline reads it. Each matched source maps to a stable, content-addressed set of files directly under the staging dir, deterministic across runs of the same source:
<source-id>.staged— the local copy the reader opens.<source-id>.manifest.json— a sidecar recording the source’s identity: its path, modification time, size, the BLAKE3 content hash, and the stage time.<source-id>.lock— a small advisory-lock file that serializes concurrent invocations staging the same source. It carries no data and persists between runs as the per-source coordination point, alongside the cached copy it guards.
<source-id> is derived from the source’s canonical path, so the same source always resolves to the same staged file. That stability is what makes the reuse cache work — a later run can find the prior copy — and it is why the layout is stable rather than per-run UUIDs.
The copy is built to survive a crash at any point without leaving a corrupt or partial file a later run might trust. The five steps are ordered so that the only trustworthy state is one a crash cannot fabricate:
-
Single-pass copy + hash. The source is read once in ~1 MiB chunks; each chunk is fed to both the BLAKE3 hasher and the destination file in the same pass. The copy never holds the whole file in memory, so it stays a memory-budget no-op regardless of file size.
-
Atomic publish. Bytes are written to a
<source-id>.<run>.partialtemp file, flushed andfsync’d, then renamed to<source-id>.staged. A rename is an atomic replace on Linux, macOS, and Windows (Windows 10 1607+), so a reader scanning for.stagedfiles sees either nothing or the complete file — never a half-written one. The<run>segment in the partial name keeps any two in-flight copies of one source on distinct temp files, and the per-source lock (see the staging cache) ensures only one of them ever runs at a time. -
Durable rename. On Linux/macOS the parent directory is
fsync’d after the rename, because on ext4/xfs a rename is only crash-durable once the directory entry itself is flushed. On Windows the NTFS journal makes the rename durable, so there is no separate directory flush to do. (See Crash durability and the parent-directory fsync below for the full filesystem-journal reasoning.) -
Verify. With
verify = blake3(the default) the source is independently re-read and hashed, and the two digests must match. A size check cannot catch a soft-mount that silently truncated the read; two content digests can. A mismatch removes the published copy and fails the run with a distinct “staged copy is corrupt” diagnostic (E335) — not a generic I/O error. -
Commit the manifest. The identity manifest is written with the same atomic temp-file + rename discipline. The manifest is the commit marker: a
.stagedfile is only trustworthy once its manifest exists. A crash between the copy and the manifest leaves a.stagedwith no manifest, which the next run’s crash purge reaps as an orphan rather than half-trusting.
If the copy fails partway, the .partial is removed before the error propagates. The invariant that closes the protocol: a complete .staged paired with a committed .manifest.json is the only shape any later run will trust, and that pairing cannot exist unless every step above completed.
The staging cache
Because staged copies live at stable paths, a copy from a prior run is still on disk when the next run starts (unless cleanup removed it). The on_existing policy decides what happens when that prior copy is found — overwrite always re-stages, error refuses, and reuse reuses the prior copy only when it is still fresh (the source’s current modification time and size both match what the manifest recorded). A fresh reuse match skips the copy entirely: no bytes read off the share, nothing charged against the disk cap. The freshness check is mtime + size, not a re-hash, so it is a cheap stat rather than a full read of the source.
The internals that matter here are how this stays correct under concurrent invocations. Under the partition-and-run model — several clinker processes over a partitioned input sharing one staging volume — independent runs may stage, reuse, or clean up the same shared source at the same time. The per-source <source-id>.lock file is a reader-writer (shared/exclusive) advisory lock that keeps every such overlap safe on Linux, macOS, and Windows:
-
Exactly one copy. A run that needs to copy takes the lock exclusively for its copy-and-publish. The first run to take it copies and publishes; every other run blocks, then acquires the lock, finds the now-fresh
.staged, and reuses it. So a source is copied exactly once no matter how many invocations race for it. -
A reader is never yanked. A run reading a staged copy holds the lock in shared mode for as long as it has the file open, and keeps it held across the moment it decides to reuse a copy and the moment it opens that copy — so the file it chose cannot be deleted or replaced in between. Any number of readers share the lock at once, so concurrent runs all read the same copy in parallel.
-
Cleanup and overwrite wait for readers. Removing or re-copying a staged pair takes the lock exclusively, which a live reader’s shared lock blocks. Cleanup probes the lock without waiting (a try-lock): if a concurrent run is still reading the copy, cleanup leaves it in place — the last run to release it, or a later crash purge, reaps it. An overwrite re-stage instead waits for in-flight readers to finish, then publishes atomically.
The reader-vs-writer distinction in lock-acquisition discipline is the whole safety argument: a copy/cleanup/overwrite mutates the staged pair and must be exclusive; a reuse-and-read only observes it and can be shared; and because the shared lock is held across the choose-then-open gap, no exclusive holder can slip a delete or replace into that window.
Windows share-mode interoperation
On POSIX an unlinked-but-open file stays readable, so a concurrent delete or atomic-rename replace coexists naturally with an open reader. Windows has no such default. To match the POSIX behavior, the staged copy is opened on Windows with a share mode that permits a concurrent delete or atomic-rename replace (FILE_SHARE_DELETE), so an open reader and a concurrent publish/cleanup interoperate there exactly as they do on POSIX. The net guarantee across any mix of concurrent runs sharing a staged source: a reader always sees a complete, coherent .staged file and no run fails spuriously.
Crash purge of orphaned artifacts
A clean (or panicking) run runs its configured cleanup. But a SIGKILL, the Linux OOM-killer, or a power loss kills the process before any cleanup runs, leaking its staged artifacts under the staging root. To stop that from accumulating across crashes, every run performs an idempotent crash purge at startup, before it stages anything. It reaps four orphan shapes left under the staging root:
- a
*.partial— an interrupted copy. Reaped only when its owning run is dead (see the liveness gate below), so a concurrent sibling’s in-flight copy is never reaped; - a
*.stagedwith no matching manifest — a copy that crashed before it could commit its manifest; - a
*.manifest.jsonwith no matching staged file; - a
*.lockwhose source has no surviving cache entry — a coordination lock left by a source that is no longer staged (not necessarily from a crash), reclaimed under the liveness and age gates below.
A clean pair (a .staged with its committed .manifest.json) is the reuse cache and is kept — the purge never removes a complete, trustworthy copy — and the source’s .lock is kept alongside it so a later reuse run has a lock to take.
Liveness gate: try-lock vs exclusive, and the creation grace window
The purge must distinguish a crash corpse from a live sibling’s in-flight work, because several invocations can share one staging volume. It tells them apart the same way the spill-directory purge does — it asks the operating system “is anyone still staging this?” rather than guessing — and a reap proceeds only when both of two gates pass:
-
Acquirable under a try-lock. A
.partialis reaped only when the source’s.lockis acquirable under a non-blocking try-lock. If the try-lock succeeds, no live process holds the lock, so the owning run is gone. If the lock is still held, a concurrent live run owns the work and the artifact is kept. -
Aged past a short creation grace window. Even with an acquirable lock, the artifact must have aged past a short creation grace window. This covers the race where a sibling has just started a copy — created the
.partial— but has not yet taken the lock. A partial too young to have been locked yet is kept regardless of the try-lock result.
The actual removal is then performed while the purge holds the lock exclusively, so the reap itself cannot race a sibling mid-acquire.
A .lock whose source has no surviving cache entry (no .staged and no .manifest.json) is itself reclaimed under the same two-gate discipline: removed only when it is acquirable under a try-lock and has aged past the creation grace window, with the removal performed under the exclusive lock. A held lock, a lock still guarding a cached copy, and a freshly created lock are all kept. This bounds what would otherwise be unbounded growth of one zero-byte lock file per distinct source ever staged — relevant for a long-lived persistent cache (on_existing = reuse, cleanup = never) — while never removing a coordination point a live or cached source still needs. The net effect: a concurrent purge can never delete a running sibling’s work, and a persistent staging root does not accumulate one orphan lock per source that has ever passed through it.
File permissions
Staged copies hold verbatim source records — potentially PII, credentials, or financial data — and on a shared staging volume they must not be readable by other users. On Unix each staged file and its manifest are created with mode 0o600 (owner-only). On Windows there is no portable mode bit; staged files inherit the staging directory’s ACL, so the directory’s ACL must be restricted if the volume is shared. The asymmetry is deliberate: the Unix mode bit is enforced per-file at creation, whereas the Windows ACL model pushes the responsibility up to the directory the operator provisions.
Crash durability and the parent-directory fsync
The atomic-rename guarantee that underpins both staging publish (step 2) and the manifest commit (step 5) only holds across a crash if the rename is durable. On POSIX filesystems (ext4, xfs) a rename’s directory entry can still be in the page cache after rename returns: the inode’s data is durable, but the directory entry that names it may not be, so a power loss between the rename and the next implicit flush could lose the entry and leave the file unreachable. To close that gap, Clinker fsyncs the parent directory after the rename, forcing the directory entry to stable storage before the protocol proceeds.
Windows is intentionally exempt. The NTFS metadata journal already makes the rename crash-durable — the same semantics a MOVEFILE_WRITE_THROUGH rename requests — so the directory entry is journaled atomically with the rename and survives a crash without a separate flush. Windows also offers no directory-fsync equivalent to call, so there is nothing to do there. The single cross-platform rule is therefore: durable rename means rename + parent-dir fsync on ext4/xfs, and rename alone on NTFS, with the journal standing in for the directory flush.
Compiler Phases & Type Unification
User-facing view: the User Guide’s “CXL Overview” and “Types & Literals” pages.
This page is the engine-internals reference for how a CXL program is compiled before any record flows: the four-phase pipeline that turns source text into a typed, evaluable program, and the formal type-unification algorithm the typechecker runs when two types meet in an expression. CXL is a per-record ETL expression language — not SQL — so every program operates on one record at a time, and every type error is caught at compile time rather than surfacing mid-run. The depth here is the phase boundaries (Parse → Resolve → Typecheck → Eval), what each phase consumes and produces, the miette diagnostic surface, and the multi-point unification rules over CXL’s 9 value types. The user-facing pages document the surface syntax and the literal grammar; this page documents the compiler that stands behind them.
The four-phase compilation pipeline
CXL catches type errors before data processing begins. The compilation pipeline runs four ordered phases, each consuming the previous phase’s output and producing the input to the next. A failure at any phase produces a rich diagnostic with source locations and fix suggestions via miette, and short-circuits the remaining phases — a parse error never reaches the typechecker, a type error never reaches eval.
-
Parse — tokenize and build an AST from CXL source text. The lexer turns raw source into a token stream; the parser assembles those tokens into an abstract syntax tree of statements (
emit,let,filter,distinct) and the expressions inside them. This is the phase that rejects the symbolic boolean operators:&&,||, and!are syntax errors in CXL — the language uses theand/or/notkeywords — and that rejection happens here, at parse time, before any name or type is known. -
Resolve — bind field references, validate method names, and check arity. With the AST in hand, the resolver binds each field reference to a column in the input schema, confirms every method call names a real method, and checks that each call site supplies the right number of arguments. Name and arity errors are structural — they do not depend on types — so they are settled here, ahead of type inference, which lets the typechecker assume every reference resolves and every call is well-formed.
-
Typecheck — infer types, validate operator compatibility, and check method receiver types. The typechecker walks the resolved tree, infers a type for every expression, and applies the unification rules below at each point two types meet (a binary operator, a method receiver, a conditional’s branches). It rejects incompatible combinations — applying
+to aStringand anInt, for instance — and emits a span-annotated diagnostic that names both operand types and suggests a coercion. The output of this phase is aTypedProgram: the AST annotated with the inferred type of every node, ready to evaluate without further inference. -
Eval — execute the typed program against each record. With types resolved, evaluation is a straightforward per-record walk of the
TypedProgram. Statements execute top to bottom against the current record; later statements can reference fields produced by earlieremitorletstatements, and afilterwhose condition is false short-circuits the remaining statements and excludes the record from output. No type inference happens at this phase — every dispatch decision was settled in Typecheck — so eval is the hot per-record path and carries none of the compile-time machinery.
The phase split is what makes CXL’s compile-time guarantee meaningful: a cxl check transform.cxl runs Parse → Resolve → Typecheck and reports any error with a span before a single record is read, e.g.
error[typecheck]: cannot apply '+' to String and Int (at transform.cxl:12)
help: convert one operand — use .to_int() or .to_string()
Because the typecheck phase produces a fully-typed program, an error here is a guarantee the corresponding runtime failure cannot occur — the class of error is eliminated before Eval, not merely detected earlier.
The type lattice
CXL has 9 value types, and unification operates over them plus two compile-time-only constructs (Numeric and Any) and the Nullable(T) wrapper. The concrete value types and their Rust backings:
| Type | Rust backing | Description |
|---|---|---|
| Null | Value::Null | Missing or absent value |
| Bool | bool | true or false |
| Integer | i64 | 64-bit signed integer |
| Float | f64 | 64-bit double-precision float |
| String | Box<str> | UTF-8 text |
| Date | NaiveDate | Calendar date without timezone |
| DateTime | NaiveDateTime | Date and time without timezone |
| Array | Vec<Value> | Ordered collection of values |
| Map | IndexMap<Box<str>, Value> | Key-value pairs |
Two further type-level constructs appear only at compile time, never as a runtime Value:
Numeric— a union accepting eitherIntorFloat. It is the declared schema type for a column that may carry either; unification resolves it to a concrete numeric type when it meets one.Any— an unconstrained type with no type constraints, the declared type for a column whose type is unknown. It unifies away to whatever it meets.
And the Nullable(T) wrapper marks a type whose value may be null. Nullability is tracked through unification rather than discarded, so a nullable operand propagates its nullability into the result.
Type unification rules
When two types meet in an expression — the two operands of a binary operator, the receiver and a method’s expected type, the branches of a conditional — the typechecker unifies them to a single result type. The algorithm is a small, ordered set of rules; each is tried against the pair of types until one applies:
-
Identity. Same types unify to themselves:
Int + IntproducesInt. This is the base case — when both sides already agree, the result is that shared type. -
Anyabsorbs.Anyunifies with anything:Any + TproducesT. AnAnyoperand imposes no constraint, so the result takes the other operand’s type. (When both areAny, identity covers it.) -
Numericresolves to the concrete type.Numeric + IntproducesInt;Numeric + FloatproducesFloat. TheNumericunion collapses to whichever concrete numeric type it meets, rather than staying an unresolved union in the result. -
Intpromotes toFloat.Int + FloatproducesFloat. When the two concrete numeric types differ, the result is the wider one — integer arithmetic against a float yields a float, matching the runtime promotion the evaluator performs. -
Nullwraps.Null + TproducesNullable(T). Any operation involving theNulltype produces a nullable result: meetingNullcannot guarantee a non-null outcome, so the result type carries theNullablemarker. (Runtime behavior matches — e.g.null + 5evaluates tonull— and the type reflects that the result may be absent.) -
Nullablepropagates.Nullable(A) + BproducesNullable(unified(A, B)). When a nullable type meets any other type, unification recurses on the inner typeAagainstB, then re-wraps the result inNullable. Nullability is sticky: it survives the unification and re-wraps whatever the inner types unify to, so a nullable operand anywhere in an expression makes the whole result nullable. -
Incompatible types fail. When no rule above applies —
String + Int, for instance — unification fails and the typecheck phase emits a span-annotated type error naming both operand types and suggesting a coercion.
The ordering matters: Any and Numeric are resolved before the promotion and nullability rules, so by the time rules 4–6 run, both sides are concrete (or nullable-wrapped concrete) types. Rule 6’s recursion is the only point the algorithm re-enters itself, and it always recurses on strictly-inner types, so unification terminates.
These rules are what let the typechecker hand Eval a fully-resolved TypedProgram: every binary operator, method receiver, and conditional has a single inferred result type, computed once at compile time, so the per-record evaluator never re-derives a type or discovers a mismatch mid-run.