Parade core: declarative flows, types, and execution
parade-core.RmdNote: Code evaluation is disabled in this vignette to keep builds fast and cluster-agnostic. Copy code into an interactive R session to run locally or on your cluster.
Requires parade ≥ 0.6.0 (flow/stage/distribute/collect).
Quick Start (5 minutes)
Want to see parade in action? Here’s a complete working example:
library(parade)
# 1. Define what varies (3 subjects × 2 conditions = 6 analyses)
grid <- param_grid(
subject = c("s01", "s02", "s03"),
condition = c("A", "B")
)
# 2. Create workflow with one analysis step
workflow <- flow(grid) |>
stage(
id = "analyze",
f = function(subject, condition) {
# Your analysis code here
score <- rnorm(1, mean = 100, sd = 15)
list(score = score)
},
schema = returns(score = dbl())
)
# 3. Run it!
results <- collect(workflow)
print(results)
# A tibble: 6 × 4
# subject condition score .ok
# <chr> <chr> <dbl> <lgl>
# 1 s01 A 98.2 TRUE
# 2 s01 B 102.4 TRUE
# ... (4 more rows)That’s it! You’ve just run 6 analyses automatically. Read on to understand how parade can scale from this simple example to complex, multi-stage pipelines on HPC clusters.
Why parade? A simple research scenario
Imagine you’re analyzing experimental data from multiple participants. You need to:
- Load each participant’s data file
- Clean and preprocess the data
- Run your analysis
- Generate summary statistics
With 20 participants and 3 experimental conditions, that’s 60 separate analyses. You could write nested loops, manage file paths manually, and hope nothing crashes halfway through. Or you could use parade to declare your workflow once and let it handle the complexity.
What is parade?
parade helps you build reproducible analysis pipelines that can run on everything from your laptop to a computing cluster. Instead of writing loops and managing files manually, you:
- Declare what you want to compute
- Define the steps and their relationships
- Execute locally or in parallel
- Collect validated, organized results
Let’s see how this works with a concrete example.
Your first parade workflow
Reproducibility tip: When generating synthetic data for examples, always use
set.seed()to ensure outputs remain consistent across runs. In production code, you typically wouldn’t set seeds unless you need reproducible randomization.
Step 1: Define your parameter space
Every parade workflow starts with a parameter grid - a table that defines all the analyses you want to run:
library(parade)
library(progressr) # For progress bars
# Initialize parade's path system for managing artifacts and registries
# This creates directories for storing intermediate results and job metadata
paths_init()
# Define what varies in your analysis
grid <- param_grid(
participant = c("p01", "p02", "p03"), # 3 participants
condition = c("control", "treatment") # 2 conditions
)
print(grid)
# A tibble: 6 × 2
# participant condition
# <chr> <chr>
# 1 p01 control
# 2 p01 treatment
# 3 p02 control
# 4 p02 treatment
# 5 p03 control
# 6 p03 treatmentEach row in this grid represents one analysis we need to run. Parade will handle all 6 automatically.
Step 2: Define your analysis pipeline
Now we create a flow - a series of computational steps that will be applied to each row:
# Create a flow from our parameter grid
results_flow <- flow(grid) |>
# First stage: Load the data
stage(
id = "load", # Give this stage a name for reference
f = function(participant, condition) {
# This function receives parameters for one row of the grid
# In production, you would typically:
# data <- read.csv(sprintf("data/%s_%s.csv", participant, condition))
# For this example, we simulate data
# Use participant/condition as seed components for reproducible but varied data
seed_val <- as.numeric(factor(participant)) * 100 +
as.numeric(factor(condition))
set.seed(seed_val)
n_trials <- 100
# Simulate reaction time experiment data
data <- data.frame(
participant = participant,
condition = condition,
trial = 1:n_trials,
response_time = rnorm(n_trials,
mean = ifelse(condition == "control", 500, 450),
sd = 50),
accuracy = rbinom(n_trials, 1,
prob = ifelse(condition == "control", 0.8, 0.9))
)
# Stages must return a list
list(data = data)
},
# Tell parade what type of output to expect
schema = returns(data = lst())
)Let’s break this down: - flow(grid)
creates a workflow that will process each row of our grid -
stage() adds a computational step to the
workflow - id gives the stage a name so
other stages can reference it - f is the
function that does the actual work -
schema tells parade what kind of data this
stage produces
Step 3: Add analysis stages
Let’s add a stage that analyzes the loaded data:
results_flow <- results_flow |>
# Second stage: Analyze the data
stage(
id = "analyze",
needs = "load", # This stage needs the output from "load"
f = function(load.data) {
# Notice: load.data refers to the 'data' output from the 'load' stage
# Calculate summary statistics
mean_rt <- mean(load.data$response_time)
sd_rt <- sd(load.data$response_time)
accuracy <- mean(load.data$accuracy)
n_trials <- nrow(load.data)
list(
mean_rt = mean_rt,
sd_rt = sd_rt,
accuracy = accuracy,
n_trials = n_trials
)
},
# Specify the types of our outputs
schema = returns(
mean_rt = dbl(), # Double/numeric
sd_rt = dbl(),
accuracy = dbl(),
n_trials = int() # Integer
)
)Key points: - needs = "load" creates a
dependency - this stage waits for “load” to complete -
load.data is how we access the output from
the previous stage - The naming convention is:
[stage_name].[output_field]
Step 4: Execute and collect results
Now we run the entire workflow:
# Execute the workflow
results <- collect(results_flow)
print(results)
# A tibble: 6 × 7
# participant condition mean_rt sd_rt accuracy n_trials .ok
# <chr> <chr> <dbl> <dbl> <dbl> <int> <lgl>
# 1 p01 control 498. 48.2 0.79 100 TRUE
# 2 p01 treatment 451. 51.3 0.91 100 TRUE
# 3 p02 control 502. 49.8 0.81 100 TRUE
# 4 p02 treatment 449. 50.1 0.88 100 TRUE
# 5 p03 control 499. 52.1 0.80 100 TRUE
# 6 p03 treatment 448. 47.9 0.92 100 TRUEThe results include: - Our original parameters
(participant, condition) - The outputs from
our analysis stage (mean_rt, sd_rt,
accuracy, n_trials) - A .ok
column indicating whether each row succeeded
Understanding schemas and types
You might wonder why we specify schemas. They serve three important purposes:
2. Memory optimization
Parade pre-allocates memory based on your schemas, making execution more efficient.
Parallel execution
So far, our workflow runs sequentially. Let’s make it parallel:
# Run with parallel workers
results_parallel <- collect(
results_flow,
engine = "future", # Use parallel execution
workers = 3 # Use 3 parallel workers
)Parade automatically distributes the 6 analyses across the 3 workers, handling all the complexity of parallel execution for you.
Handling failures gracefully
Real analyses sometimes fail. Parade provides several strategies:
# Option 1: Keep failed rows in results (default)
flow_robust <- flow(grid, error = "keep")
# Option 2: Stop on first error (for debugging)
flow_strict <- flow(grid, error = "stop")
# Option 3: Silently remove failed rows
flow_filter <- flow(grid, error = "omit")
# Check which rows failed
failed_rows <- failed(results)
# Get detailed error information
diagnostics(results, stage = "analyze")Building complex pipelines
Stages can depend on multiple previous stages, creating a directed acyclic graph (DAG):
complex_flow <- flow(grid) |>
stage(id = "load",
f = function(participant, condition) {
# Load raw data from CSV files
# Using resolve_path() for parade's path management
file_path <- sprintf("data/%s_%s.csv", participant, condition)
# For this example, simulate loading data
set.seed(as.numeric(factor(participant)) * 100 +
as.numeric(factor(condition)))
raw_data <- data.frame(
x = rnorm(100),
y = rnorm(100),
participant = participant,
condition = condition
)
list(raw_data = raw_data)
},
schema = returns(raw_data = lst())) |>
stage(id = "clean",
needs = "load",
f = function(load.raw_data) {
# Remove outliers: values > 3 SD from mean
clean_data <- load.raw_data
# Remove outliers from x variable
x_mean <- mean(clean_data$x, na.rm = TRUE)
x_sd <- sd(clean_data$x, na.rm = TRUE)
clean_data <- clean_data[abs(clean_data$x - x_mean) <= 3 * x_sd, ]
# Remove outliers from y variable
y_mean <- mean(clean_data$y, na.rm = TRUE)
y_sd <- sd(clean_data$y, na.rm = TRUE)
clean_data <- clean_data[abs(clean_data$y - y_mean) <= 3 * y_sd, ]
list(clean_data = clean_data)
},
schema = returns(clean_data = lst())) |>
stage(id = "model",
needs = "clean",
f = function(clean.clean_data) {
# Fit a linear model
model <- lm(y ~ x, data = clean.clean_data)
list(model = model)
},
schema = returns(model = lst())) |>
stage(id = "validate",
needs = "model",
f = function(model.model) {
# Simple cross-validation: calculate R-squared
# In production, use caret::train() or mlr3 for proper CV
# Extract R-squared as a simple validation metric
cv_score <- summary(model.model)$r.squared
list(cv_score = cv_score)
},
schema = returns(cv_score = dbl())) |>
stage(id = "report",
needs = c("model", "validate"), # Depends on TWO stages
f = function(model.model, validate.cv_score) {
# Generate summary report combining model and validation
summary_report <- list(
coefficients = coef(model.model),
r_squared = cv_score,
n_obs = length(model.model$fitted.values),
rmse = sqrt(mean(model.model$residuals^2))
)
list(summary = summary_report)
},
schema = returns(summary = lst()))Parade automatically figures out the execution order based on dependencies.
Debugging workflows
Start small and build up:
# 1. Test with just 2 rows first
test_results <- collect(results_flow, limit = 2)
# Check if everything worked
if (all(test_results$.ok)) {
message("Test successful! Now try full dataset.")
} else {
# Find which rows failed
failed_rows <- test_results[!test_results$.ok, ]
print(failed_rows)
}
# 2. Run sequentially for easier debugging
debug_results <- collect(results_flow, engine = "sequential")
# 3. Check intermediate outputs from specific stages
stage_outputs <- diagnostics(test_results)
# Examine what the load stage produced for the first row
print(stage_outputs$load[[1]])
# $data
# A data frame with 100 rows and 5 columns
# Check analyze stage output
print(stage_outputs$analyze[[1]])
# $mean_rt
# [1] 498.2
# $sd_rt
# [1] 48.2
# ... etc
# 4. Inspect the flow structure
explain(results_flow)
# Shows DAG with stages and dependencies:
# Stage 'load' (no dependencies)
# Stage 'analyze' (depends on: load)
# 5. Debug a specific failure
if (!test_results$.ok[1]) {
# Get detailed error for first row
error_info <- diagnostics(test_results, row = 1)
print(error_info$.error) # See exact error message
}Common patterns and tips
Pattern 1: Conditional execution
Skip stages based on conditions:
flow_with_skip <- flow(grid) |>
stage(id = "load", ...) |>
stage(
id = "optional_analysis",
needs = "load",
f = function(load.data) {
# Complex analysis
list(result = complex_calculation(load.data))
},
skip_when = function(load.data) {
nrow(load.data) < 10 # Skip if too little data
},
schema = returns(result = dbl())
)Pattern 2: Reproducible randomness
Use a seed column for reproducible random operations:
# Create grid with unique seeds for each analysis
grid_with_seeds <- param_grid(
participant = c("p01", "p02"),
replicate = 1:10
) |>
mutate(seed = 1000 + row_number()) # Add unique seed column
# Tell flow to use the seed column
flow_reproducible <- flow(grid_with_seeds, seed_col = "seed") |>
stage(
id = "bootstrap",
f = function(participant, replicate) {
# No need to call set.seed() - parade handles it!
# Each row gets its designated seed automatically
# Simulate some data for this participant
original_data <- rnorm(100, mean = 100, sd = 15)
# Bootstrap sampling (will be reproducible)
sample_data <- sample(original_data, replace = TRUE)
list(bootstrap_mean = mean(sample_data))
},
schema = returns(bootstrap_mean = dbl())
)
# Results will be identical every time you run this
results1 <- collect(flow_reproducible)
results2 <- collect(flow_reproducible)
identical(results1$bootstrap_mean, results2$bootstrap_mean) # TRUEPattern 3: Working with complex objects
For complex objects (models, custom classes, neuroimaging data), use
lst():
model_flow <- flow(grid) |>
stage(
id = "fit_model",
f = function(participant) {
# Simulate loading participant data
# In production: data <- read.csv(sprintf("data/%s.csv", participant))
set.seed(as.numeric(factor(participant)))
n <- 100
data <- data.frame(
x1 = rnorm(n),
x2 = rnorm(n),
x3 = rnorm(n),
y = rnorm(n)
)
# Fit a complex model
model <- lm(y ~ x1 + x2 + x3, data = data)
# Return both the model object and extracted metrics
list(
model = model, # Complex object stored as list
r_squared = summary(model)$r.squared, # Simple numeric value
n_params = length(coef(model)) # Simple integer value
)
},
schema = returns(
model = lst(), # List column for complex object
r_squared = dbl(), # Regular column for numeric
n_params = int() # Regular column for integer
)
)Troubleshooting common issues
Can’t find stage output
# Error: Can't find prep.df
# Check:
# 1. Stage name: needs = "prep" (must match stage id)
# 2. Output name: function(prep.df) (format: stage.field)
# 3. The prep stage must output a field called "df"Debugging failed rows
# Get detailed information about failures
results <- collect(flow, error = "keep")
# Which rows failed?
failed_rows <- results[!results$.ok, ]
# Why did they fail?
error_details <- diagnostics(results)
print(error_details$.error) # See error messagesNext steps
Now that you understand the basics:
- Paths System: Learn about parade’s URI-based path management system
- Sinks and Artifacts: Save intermediate results to disk automatically
- Artifacts Guide: Practical guide to portable scratch-first storage
- Parallel Execution: Scale to hundreds of workers with the mirai backend
- SLURM Distribution: Run on HPC clusters
Remember: start simple, test with small parameter grids, and gradually scale up. Parade grows with your needs - the same code that runs on your laptop can scale to a supercomputer with just a configuration change.
Summary
parade helps you: - Define once, run many: Write your analysis once, run it across all parameter combinations - Handle complexity: Automatically manage dependencies between analysis steps - Scale effortlessly: From sequential laptop execution to parallel cluster computing - Catch errors early: Type checking prevents silent failures - Stay organized: All results in a clean, typed tibble
Start with simple sequential workflows, add types for safety, then scale to parallel execution when needed. Your analysis code stays the same - only the execution strategy changes.