Add an inline code block stage to a parade flow
code_stage.Rdcode_stage() captures a bare code block via non-standard evaluation and
wraps it in a function whose formals are automatically inferred from the
grid columns and upstream stage outputs.
This fills the gap between tiny lambdas passed to stage() and
external scripts used by script_stage(): the code lives inline in the
pipeline definition but can span many lines without cluttering the pipe.
Usage
code_stage(
fl,
id,
code,
needs = character(),
schema,
prefix = TRUE,
sink = NULL,
skip_when = NULL,
hoist_struct = FALSE,
inputs = NULL,
input_artifacts = NULL,
outputs = NULL,
io_mode = c("off", "warn", "error"),
retries = NULL,
retry_backoff = NULL,
retry_base = NULL,
retry_on = NULL,
resources = NULL,
cpus = NULL,
memory = NULL,
time = NULL,
...
)Arguments
- fl
A
parade_flowobject- id
Unique stage identifier (character)
- code
A bare R expression (code block) to execute for each row. Captured via
base::substitute()— never evaluated in the caller's frame. Grid columns and upstream outputs are injected as local variables.- needs
Character vector of stage IDs this stage depends on
- schema
Schema defining expected output structure (from
returns())- prefix
Whether to prefix output columns with stage ID (logical)
- sink
Optional sink specification for artifact persistence
- skip_when
Optional function to determine when to skip this stage
- hoist_struct
Whether to hoist nested data structures (logical)
- inputs
Optional character vector of required input names for this stage.
- input_artifacts
Optional character vector of
inputsthat must be artifact/file-reference values.- outputs
Optional character vector of declared output names. Defaults to schema column names.
- io_mode
I/O contract behavior:
"off"(default),"warn", or"error".- retries
Optional stage-level retry override (non-negative integer).
- retry_backoff
Optional stage-level retry backoff override:
"none","fixed","linear","exponential".- retry_base
Optional stage-level retry base delay (seconds).
- retry_on
Optional stage-level retry classifier override.
- resources
Optional stage-level resource hints as a named list with keys
cpus,memory,time(aliases:cpus_per_task,ncpus,mem).- cpus
Optional stage-level CPUs per task hint.
- memory
Optional stage-level memory hint.
- time
Optional stage-level walltime hint.
- ...
Additional constant arguments passed to the stage function
Details
The code block is placed right after id so that it reads naturally in
pipeline style when needs and schema are named.
Inside the block, grid columns and upstream outputs (prefixed with
<stage>.<field>) are available as plain variables.
The block should return a named list matching the schema.
Examples
grid <- data.frame(x = 1:3)
fl <- flow(grid) |>
code_stage("sq", {
list(result = x^2)
}, schema = returns(result = dbl()))
#> Error in .stage_available_cols(fl, needs): could not find function ".stage_available_cols"
# With upstream dependencies — code block at end with named args
fl2 <- flow(grid) |>
stage("dbl", function(x) list(y = x * 2),
schema = returns(y = dbl())) |>
code_stage("add", needs = "dbl",
schema = returns(total = dbl()), code = {
total <- x + dbl.y
list(total = total)
})
#> Error in .stage_available_cols(fl, needs): could not find function ".stage_available_cols"