Artifacts: Managing Large Analysis Outputs
parade-artifacts.RmdNote: Code evaluation is disabled in this vignette to keep builds fast and cluster-agnostic. Copy code into an interactive R session to run locally or on your cluster.
Prerequisites
This vignette assumes you understand: - Smart Path Management - How portable paths work - Core Concepts - Basic flow and stage structure
The Problem: Large Outputs Overwhelm Your System
When your analysis produces large outputs, several problems arise:
# Typical neuroimaging analysis - each model is ~500MB
results <- lapply(subjects, function(subj) {
brain_data <- load_brain(subj)
model <- fit_complex_model(brain_data) # 500MB object
return(model)
})
# Problems:
# 1. Memory explosion - 100 subjects = 50GB in RAM
# 2. Can't save intermediate results - all or nothing
# 3. Can't resume if job fails after 80 subjects
# 4. Can't share specific models with collaborators
# 5. Results object becomes unwieldy to work withEven worse on HPC clusters:
# This WILL get your account suspended on most clusters:
saveRDS(huge_results, "~/my_home_dir/results.rds") # NO! Home has quota!The Solution: Artifacts - Smart File Management
Artifacts automatically save large outputs to disk during processing, returning just file references:
library(parade)
# Local dev:
paths_init()
# HPC (recommended):
# parade_init_hpc(persist = TRUE)
# Define what should go to disk
results <- flow(subjects) |>
stage("fit_model",
f = function(subject) {
brain_data <- load_brain(subject)
model <- fit_complex_model(brain_data) # 500MB object
list(model = model, subject = subject)
},
schema = returns(
model = artifact(), # This becomes a file on disk
subject = chr() # This stays in memory
),
sink = sink_spec(
fields = "model",
dir = "artifacts://brain_models" # Smart, portable path
)
) |>
collect()
# Now results$model contains file paths, not huge objects!
print(results$model[[1]])
#> $path: "$PARADE_SCRATCH/parade-artifacts/brain_models/model_1.rds"
#> $bytes: 524288000
#> $sha256: "abc123..."
# Load a specific model when needed
model_50 <- readRDS(results$model[[50]]$path)Quick Start: Your First Artifact Workflow
Reproducibility tip: When examples use random data
(rnorm(), sampling, etc.), set a seed (e.g.,
set.seed(42)) so printed outputs are deterministic.
Step 1: Initialize parade’s path system
library(parade)
paths_init() # Sets up smart paths for your environmentStep 2: Create an analysis with artifacts
# Small example with built-in data
data <- data.frame(
sample = paste0("sample_", 1:10),
size = sample(100:1000, 10)
)
results <- flow(data) |>
stage("analyze",
f = function(sample, size) {
set.seed(42) # reproducible example output
# Simulate some analysis
model <- lm(rnorm(size) ~ 1:size)
metrics <- summary(model)$r.squared
list(
model = model, # Large object
metrics = metrics, # Small value
sample = sample # Identifier
)
},
schema = returns(
model = artifact(), # Save to disk
metrics = dbl(), # Keep in memory
sample = chr() # Keep in memory
),
sink = sink_spec(
fields = "model",
dir = "artifacts://analysis/models"
)
) |>
collect()
# Results contain paths for models, values for metrics
print(results)
#> # A tibble: 10 × 3
#> sample model metrics
#> <chr> <list> <dbl>
#> 1 sample_1 <tibble [1 × 4]> 0.023
#> 2 sample_2 <tibble [1 × 4]> 0.045
#> ...Understanding Artifact Storage
Where Do Artifacts Go?
The artifacts:// prefix is a smart alias that adapts to
your environment:
| Environment | Artifacts Path | Why |
|---|---|---|
| Your laptop | /tmp/parade-artifacts/ |
Uses system temp |
| HPC cluster |
$PARADE_SCRATCH/parade-artifacts/ (preferred) or
$SCRATCH/parade-artifacts/
|
Uses shared scratch storage |
| Custom setup | Whatever you configure | Full control |
Discoverability: catalog and search
Artifacts are just files, but Parade can help you
discover them by scanning the sink sidecars
(*.json) for provenance metadata.
# List artifacts under your artifacts root
artifact_catalog()
# Search by stage/field/row_key/path
artifact_catalog_search(query = "fit_model")Directory Organization
Artifacts are organized hierarchically based on your sink specification:
sink_spec(
fields = "model",
dir = "artifacts://project_x/models",
template = "{subject}/{session}_{task}.rds"
)
# Creates structure like:
# /scratch/alice/parade-artifacts/
# └── project_x/
# └── models/
# ├── subj01/
# │ ├── session1_rest.rds
# │ └── session2_task.rds
# └── subj02/
# ├── session1_rest.rds
# └── session2_task.rdsReal-World Scenarios
Scenario 1: Neuroimaging Pipeline with Mixed Output Sizes
Different stages produce different sized outputs - keep small stuff in memory, large stuff on disk:
neuro_pipeline <- flow(subjects) |>
# Stage 1: Preprocessing (large outputs)
stage("preprocess",
f = function(subject) {
# Simulate loading brain data (in production, use actual neuroimaging tools)
# raw <- oro.nifti::readNIfTI(sprintf("data/%s_T1.nii.gz", subject))
# For this example, simulate with a large matrix
set.seed(as.numeric(factor(subject)))
raw <- matrix(rnorm(256 * 256 * 176), nrow = 256) # ~46MB
# Simulate preprocessing
cleaned <- raw / max(abs(raw)) # Normalize to [-1, 1]
# Calculate quality control metrics
qc_metrics <- list(
snr = 20 * runif(1, 0.5, 1.5), # Signal-to-noise ratio
motion = runif(1, 0, 2), # Motion estimate in mm
outliers = rpois(1, 3) # Number of outlier voxels
)
list(
cleaned_brain = cleaned,
qc = qc_metrics
)
},
schema = returns(
cleaned_brain = artifact(), # Too big for memory
qc = struct() # Small, keep in memory
),
sink = sink_spec(
fields = "cleaned_brain",
dir = "artifacts://preprocessing",
format = "rds" # Use RDS here to match readRDS() below
)
) |>
# Stage 2: Analysis (moderate outputs)
stage("analyze",
needs = "preprocess", # Declare dependency
f = function(preprocess.cleaned_brain, preprocess.qc) {
# Load the artifact (matches format = "rds" above)
brain <- readRDS(preprocess.cleaned_brain$path)
# Only process if QC passed
if (preprocess.qc$snr > 10) {
# Simulate GLM fitting on brain data
# In production: model <- fsl.glm(brain, design_matrix)
# For this example, fit simple model on flattened data
y <- as.vector(brain[1:100, 1:100]) # Use subset
x <- seq_along(y)
model <- lm(y ~ x)
# Extract statistics
stats <- list(
beta = coef(model)[2],
t_stat = summary(model)$coefficients[2, "t value"],
p_value = summary(model)$coefficients[2, "Pr(>|t|)"]
)
list(model = model, stats = stats)
} else {
list(model = NULL, stats = NULL)
}
},
schema = returns(
model = maybe(artifact()), # Might be NULL
stats = maybe(struct())
),
sink = sink_spec(
fields = "model",
dir = "artifacts://models",
format = "rds"
)
)
results <- collect(neuro_pipeline)
# Memory usage stays manageable even with 1000 subjects!
# Only paths and small summaries in memoryRegistering custom formats (e.g., NIfTI)
If you prefer domain formats (like NIfTI), register a reader/writer
pair once and reference it by name in sink_spec():
# Requires the RNifti package
if (requireNamespace("RNifti", quietly = TRUE)) {
register_sink_format(
"nii", # format name used in sink_spec(format = "nii")
writer = function(x, path, ...) { RNifti::writeNifti(x, path, ...); invisible(path) },
reader = function(path, ...) RNifti::readNifti(path, ...),
ext = ".nii.gz" # ensures files end with .nii.gz
)
}
# Then in your sink, use the registered format name:
sink_spec(
fields = "cleaned_brain",
dir = "artifacts://preprocessing",
format = "nii"
)
# Reading later (if autoload = FALSE):
# brain <- RNifti::readNifti(cleaned_brain$path)Scenario 2: Machine Learning with Checkpointing
Save models at each epoch for resume capability:
ml_training <- flow(param_grid(
epoch = 1:100,
fold = 1:5
)) |>
stage("train",
f = function(epoch, fold) {
# Check if already completed
checkpoint_path <- sprintf("artifacts://ml/checkpoints/fold%d_epoch%03d.rds",
fold, epoch)
if (file.exists(resolve_path(checkpoint_path))) {
# Resume from checkpoint
model <- readRDS(resolve_path(checkpoint_path))
message("Resumed from checkpoint")
} else {
# Train from scratch or previous epoch
if (epoch > 1) {
prev_path <- sprintf("artifacts://ml/checkpoints/fold%d_epoch%03d.rds",
fold, epoch - 1)
model <- readRDS(resolve_path(prev_path))
} else {
# Initialize a new model (simple linear model for demo)
model <- list(
weights = rnorm(10),
bias = 0,
learning_rate = 0.01
)
}
# Simulate one epoch of training
set.seed(fold * 1000 + epoch)
# Get mock training data for this fold
n_samples <- 100
X <- matrix(rnorm(n_samples * 10), ncol = 10)
y <- X %*% rnorm(10) + rnorm(n_samples)
# Simple gradient descent update (mock training)
predictions <- X %*% model$weights + model$bias
errors <- predictions - y
gradient_w <- t(X) %*% errors / n_samples
gradient_b <- mean(errors)
# Update model
model$weights <- model$weights - model$learning_rate * gradient_w
model$bias <- model$bias - model$learning_rate * gradient_b
}
# Calculate loss on validation data
val_X <- matrix(rnorm(50 * 10), ncol = 10)
val_y <- val_X %*% rnorm(10) + rnorm(50)
val_pred <- val_X %*% model$weights + model$bias
loss <- mean((val_pred - val_y)^2) # MSE loss
list(
model = model,
fold = fold,
epoch = epoch,
loss = loss
)
},
schema = returns(
model = artifact(),
fold = int(),
epoch = int(),
loss = dbl()
),
sink = sink_spec(
fields = "model",
dir = "artifacts://ml/checkpoints",
template = "fold{fold}_epoch{epoch:03d}",
overwrite = "skip" # Don't recompute existing
)
)
# Can interrupt and resume training!
results <- collect(ml_training)
# Plot training curves without loading models
library(ggplot2)
ggplot(results, aes(x = epoch, y = loss, color = factor(fold))) +
geom_line() +
labs(title = "Training Progress")Scenario 3: Collaborative Analysis with Selective Sharing
Different team members work on different parts:
# Setup: Create subject list
all_subjects <- data.frame(
subject_id = sprintf("sub_%03d", 1:100),
group = rep(c("control", "treatment"), 50)
)
# Alice runs preprocessing
preprocessing <- flow(all_subjects[1:50, ]) |>
stage("clean",
f = function(subject_id, group) {
# Simulate data cleaning
set.seed(as.numeric(factor(subject_id)))
raw_data <- data.frame(
value = rnorm(1000, mean = ifelse(group == "control", 100, 110)),
time = 1:1000
)
# Remove outliers and normalize
cleaned <- raw_data[abs(scale(raw_data$value)) < 3, ]
cleaned$value <- scale(cleaned$value)
list(cleaned = cleaned)
},
schema = returns(cleaned = artifact()),
sink = sink_spec(
fields = "cleaned",
dir = "artifacts://shared/preprocessing"
))
alice_results <- collect(preprocessing)
# Bob runs models on Alice's outputs
modeling <- flow(alice_results) |>
stage("model",
needs = "clean", # Reference Alice's stage
f = function(clean.cleaned) {
# Load preprocessed data
data <- readRDS(clean.cleaned$path)
# Fit a simple time series model
model <- lm(value ~ poly(time, 3), data = data)
list(model = model)
},
schema = returns(model = artifact()),
sink = sink_spec(
fields = "model",
dir = "artifacts://shared/models"
))
bob_results <- collect(modeling)
# Carol visualizes without needing full models
visualization <- flow(bob_results) |>
stage("plot",
needs = "model", # Reference Bob's stage
f = function(model.model) {
# Load just the coefficients, not full model
m <- readRDS(model.model$path)
coefs <- coef(m)
# Create a simple coefficient plot
# In production, use ggplot2 for better visualizations
pdf(NULL) # Start null device to capture plot
barplot(coefs[-1],
names.arg = paste0("Term", 1:(length(coefs)-1)),
main = "Model Coefficients",
col = "steelblue")
plot_obj <- recordPlot() # Capture the plot
dev.off()
list(plot = plot_obj)
},
schema = returns(plot = artifact()),
sink = sink_spec(
fields = "plot",
dir = "artifacts://shared/figures",
format = "rds" # Save R plot objects as RDS
))Advanced Patterns
Pattern 1: Mixed Storage Strategies
Different data types need different formats and locations:
# Configure different sinks for different data types
model_sink <- sink_spec(
fields = "model",
dir = "artifacts://models",
format = "rds",
compress = "gzip" # Compress large R objects
)
image_sink <- sink_spec(
fields = "brain_image",
dir = "artifacts://images",
format = "nii.gz", # Standard neuroimaging format
compress = FALSE # Already compressed
)
summary_sink <- sink_spec(
fields = "summary",
dir = "project://summaries", # Small files stay with project
format = "json" # Human-readable
)Pattern 2: Conditional Artifacts
Save artifacts only when they meet criteria:
stage("process",
f = function(data) {
model <- fit_model(data)
size <- object.size(model)
if (size > 10*1024*1024) { # > 10MB
list(model = model, stored = TRUE)
} else {
list(model = model, stored = FALSE)
}
},
schema = returns(
model = maybe(artifact()), # Conditional artifact
stored = lgl()
),
sink = sink_spec(
fields = "model",
dir = "artifacts://conditional",
when = ~ stored # Only sink when stored = TRUE
)
)Pattern 3: Artifact Versioning
Track different versions of analyses:
version <- format(Sys.Date(), "%Y%m%d")
sink_spec(
fields = "model",
dir = sprintf("artifacts://models/v_%s", version),
template = "{subject}_{session}_{.row_key}"
)
# Creates versioned paths:
# artifacts://models/v_20240315/subj01_ses1_abc123.rds
# artifacts://models/v_20240316/subj01_ses1_def456.rdsTroubleshooting
Issue: “Permission denied”
# Ensure write permissions
dir <- resolve_path("artifacts://test")
dir.create(dir, recursive = TRUE, mode = "0755")Issue: Can’t find artifact files
# Check if paths match between systems
artifact_path <- results$model[[1]]$path
file.exists(artifact_path)
# Use relative paths in portable code
relative_path <- sub(paths_get()$artifacts, "artifacts://", artifact_path)Best Practices
-
Use artifacts for outputs > 10MB
-
Organize artifacts hierarchically
# Good: Clear organization "artifacts://preprocessing/cleaned/" "artifacts://models/fitted/" "artifacts://results/figures/" # Bad: Flat structure "artifacts://everything/" -
Include metadata with artifacts
sink_spec( fields = "model", dir = "artifacts://models", sidecar = "json" # Creates .json with metadata ) -
Clean up old artifacts periodically
# Remove artifacts older than 30 days old_files <- list.files( resolve_path("artifacts://temp"), full.names = TRUE, recursive = TRUE ) file.remove(old_files[file.mtime(old_files) < Sys.Date() - 30])
Next Steps
Now that you understand artifacts, learn about:
- Advanced Sinks - Format-specific writers and multi-file patterns
- SLURM Distribution - Running artifact pipelines on clusters
- Script Monitoring - Tracking artifact generation progress
Quick Reference
| Function | Purpose | Example |
|---|---|---|
artifact() |
Declare field as artifact | returns(model = artifact()) |
sink_spec() |
Configure artifact storage | sink_spec(fields = "model", dir = "artifacts://") |
resolve_path() |
Get absolute path | resolve_path("artifacts://models") |
readRDS() |
Load artifact | readRDS(results$model[[1]]$path) |
| Sink Option | Purpose | Example |
|---|---|---|
fields |
What to save | fields = c("model", "data") |
dir |
Where to save | dir = "artifacts://outputs" |
format |
File format |
format = "rds" or
format = list(model = "rds", plot = "pdf")
|
template |
Naming pattern | template = "{subject}/{session}.rds" |
overwrite |
Resume behavior | overwrite = "skip" |
Summary
Artifacts solve the large output problem by:
- Automatically saving large objects to disk during processing
- Returning references instead of loading everything into memory
- Using portable paths that work across different systems
- Enabling selective loading of only the data you need
Start using artifacts when your outputs exceed ~10MB per item or when total memory usage becomes a concern. Your RAM (and your cluster admin) will thank you!