Skip to contents

Note: Code evaluation is disabled in this vignette to keep builds fast and cluster-agnostic. Copy code into an interactive R session to run locally or on your cluster.

Prerequisites

This vignette assumes you understand: - Smart Path Management - How portable paths work - Core Concepts - Basic flow and stage structure

The Problem: Large Outputs Overwhelm Your System

When your analysis produces large outputs, several problems arise:

# Typical neuroimaging analysis - each model is ~500MB
results <- lapply(subjects, function(subj) {
  brain_data <- load_brain(subj)
  model <- fit_complex_model(brain_data)  # 500MB object
  return(model)
})

# Problems:
# 1. Memory explosion - 100 subjects = 50GB in RAM
# 2. Can't save intermediate results - all or nothing
# 3. Can't resume if job fails after 80 subjects
# 4. Can't share specific models with collaborators
# 5. Results object becomes unwieldy to work with

Even worse on HPC clusters:

# This WILL get your account suspended on most clusters:
saveRDS(huge_results, "~/my_home_dir/results.rds")  # NO! Home has quota!

The Solution: Artifacts - Smart File Management

Artifacts automatically save large outputs to disk during processing, returning just file references:

library(parade)

# Local dev:
paths_init()

# HPC (recommended):
# parade_init_hpc(persist = TRUE)

# Define what should go to disk
results <- flow(subjects) |>
  stage("fit_model",
    f = function(subject) {
      brain_data <- load_brain(subject)
      model <- fit_complex_model(brain_data)  # 500MB object
      list(model = model, subject = subject)
    },
    schema = returns(
      model = artifact(),    # This becomes a file on disk
      subject = chr()        # This stays in memory
    ),
    sink = sink_spec(
      fields = "model",
      dir = "artifacts://brain_models"  # Smart, portable path
    )
  ) |>
  collect()

# Now results$model contains file paths, not huge objects!
print(results$model[[1]])
#> $path: "$PARADE_SCRATCH/parade-artifacts/brain_models/model_1.rds"
#> $bytes: 524288000
#> $sha256: "abc123..."

# Load a specific model when needed
model_50 <- readRDS(results$model[[50]]$path)

Quick Start: Your First Artifact Workflow

Reproducibility tip: When examples use random data (rnorm(), sampling, etc.), set a seed (e.g., set.seed(42)) so printed outputs are deterministic.

Step 1: Initialize parade’s path system

library(parade)
paths_init()  # Sets up smart paths for your environment

Step 2: Create an analysis with artifacts

# Small example with built-in data
data <- data.frame(
  sample = paste0("sample_", 1:10),
  size = sample(100:1000, 10)
)

results <- flow(data) |>
  stage("analyze",
    f = function(sample, size) {
      set.seed(42)  # reproducible example output
      # Simulate some analysis
      model <- lm(rnorm(size) ~ 1:size)
      metrics <- summary(model)$r.squared
      
      list(
        model = model,       # Large object
        metrics = metrics,   # Small value
        sample = sample      # Identifier
      )
    },
    schema = returns(
      model = artifact(),    # Save to disk
      metrics = dbl(),       # Keep in memory
      sample = chr()         # Keep in memory
    ),
    sink = sink_spec(
      fields = "model",
      dir = "artifacts://analysis/models"
    )
  ) |>
  collect()

# Results contain paths for models, values for metrics
print(results)
#> # A tibble: 10 × 3
#>    sample    model                    metrics
#>    <chr>     <list>                     <dbl>
#>  1 sample_1  <tibble [1 × 4]>          0.023
#>  2 sample_2  <tibble [1 × 4]>          0.045
#>  ...

Step 3: Work with artifacts efficiently

# Get just the metrics without loading models
high_r2 <- results[results$metrics > 0.05, ]

# Load specific models as needed
best_model <- readRDS(high_r2$model[[1]]$path)

# Share a specific result
cat("Model for sample_5 is at:", results$model[[5]]$path)

Understanding Artifact Storage

Where Do Artifacts Go?

The artifacts:// prefix is a smart alias that adapts to your environment:

Environment Artifacts Path Why
Your laptop /tmp/parade-artifacts/ Uses system temp
HPC cluster $PARADE_SCRATCH/parade-artifacts/ (preferred) or $SCRATCH/parade-artifacts/ Uses shared scratch storage
Custom setup Whatever you configure Full control
# Check where artifacts will go
paths_get()$artifacts
#> "$PARADE_SCRATCH/parade-artifacts"  # on HPC
#> "/var/folders/xy/temp/parade-artifacts"  # on Mac

# Configure custom location if needed
paths_set(artifacts = "/fast/storage/my_outputs")

Artifacts are just files, but Parade can help you discover them by scanning the sink sidecars (*.json) for provenance metadata.

# List artifacts under your artifacts root
artifact_catalog()

# Search by stage/field/row_key/path
artifact_catalog_search(query = "fit_model")

Directory Organization

Artifacts are organized hierarchically based on your sink specification:

sink_spec(
  fields = "model",
  dir = "artifacts://project_x/models",
  template = "{subject}/{session}_{task}.rds"
)

# Creates structure like:
# /scratch/alice/parade-artifacts/
#   └── project_x/
#       └── models/
#           ├── subj01/
#           │   ├── session1_rest.rds
#           │   └── session2_task.rds
#           └── subj02/
#               ├── session1_rest.rds
#               └── session2_task.rds

Real-World Scenarios

Scenario 1: Neuroimaging Pipeline with Mixed Output Sizes

Different stages produce different sized outputs - keep small stuff in memory, large stuff on disk:

neuro_pipeline <- flow(subjects) |>
  
  # Stage 1: Preprocessing (large outputs)
  stage("preprocess",
    f = function(subject) {
      # Simulate loading brain data (in production, use actual neuroimaging tools)
      # raw <- oro.nifti::readNIfTI(sprintf("data/%s_T1.nii.gz", subject))
      
      # For this example, simulate with a large matrix
      set.seed(as.numeric(factor(subject)))
      raw <- matrix(rnorm(256 * 256 * 176), nrow = 256)  # ~46MB
      
      # Simulate preprocessing
      cleaned <- raw / max(abs(raw))  # Normalize to [-1, 1]
      
      # Calculate quality control metrics
      qc_metrics <- list(
        snr = 20 * runif(1, 0.5, 1.5),  # Signal-to-noise ratio
        motion = runif(1, 0, 2),         # Motion estimate in mm
        outliers = rpois(1, 3)           # Number of outlier voxels
      )
      
      list(
        cleaned_brain = cleaned,
        qc = qc_metrics
      )
    },
    schema = returns(
      cleaned_brain = artifact(),  # Too big for memory
      qc = struct()                 # Small, keep in memory
    ),
    sink = sink_spec(
      fields = "cleaned_brain",
      dir = "artifacts://preprocessing",
      format = "rds"  # Use RDS here to match readRDS() below
    )
  ) |>
  
  # Stage 2: Analysis (moderate outputs)  
  stage("analyze",
    needs = "preprocess",  # Declare dependency
    f = function(preprocess.cleaned_brain, preprocess.qc) {
      # Load the artifact (matches format = "rds" above)
      brain <- readRDS(preprocess.cleaned_brain$path)
      
      # Only process if QC passed
      if (preprocess.qc$snr > 10) {
        # Simulate GLM fitting on brain data
        # In production: model <- fsl.glm(brain, design_matrix)
        
        # For this example, fit simple model on flattened data
        y <- as.vector(brain[1:100, 1:100])  # Use subset
        x <- seq_along(y)
        model <- lm(y ~ x)
        
        # Extract statistics
        stats <- list(
          beta = coef(model)[2],
          t_stat = summary(model)$coefficients[2, "t value"],
          p_value = summary(model)$coefficients[2, "Pr(>|t|)"]
        )
        
        list(model = model, stats = stats)
      } else {
        list(model = NULL, stats = NULL)
      }
    },
    schema = returns(
      model = maybe(artifact()),  # Might be NULL
      stats = maybe(struct())
    ),
    sink = sink_spec(
      fields = "model",
      dir = "artifacts://models",
      format = "rds"
    )
  )

results <- collect(neuro_pipeline)

# Memory usage stays manageable even with 1000 subjects!
# Only paths and small summaries in memory

Registering custom formats (e.g., NIfTI)

If you prefer domain formats (like NIfTI), register a reader/writer pair once and reference it by name in sink_spec():

# Requires the RNifti package
if (requireNamespace("RNifti", quietly = TRUE)) {
  register_sink_format(
    "nii",  # format name used in sink_spec(format = "nii")
    writer = function(x, path, ...) { RNifti::writeNifti(x, path, ...); invisible(path) },
    reader = function(path, ...) RNifti::readNifti(path, ...),
    ext = ".nii.gz"  # ensures files end with .nii.gz
  )
}

# Then in your sink, use the registered format name:
sink_spec(
  fields = "cleaned_brain",
  dir = "artifacts://preprocessing",
  format = "nii"
)

# Reading later (if autoload = FALSE):
# brain <- RNifti::readNifti(cleaned_brain$path)

Scenario 2: Machine Learning with Checkpointing

Save models at each epoch for resume capability:

ml_training <- flow(param_grid(
  epoch = 1:100,
  fold = 1:5
)) |>
  stage("train",
    f = function(epoch, fold) {
      # Check if already completed
      checkpoint_path <- sprintf("artifacts://ml/checkpoints/fold%d_epoch%03d.rds", 
                                fold, epoch)
      
      if (file.exists(resolve_path(checkpoint_path))) {
        # Resume from checkpoint
        model <- readRDS(resolve_path(checkpoint_path))
        message("Resumed from checkpoint")
      } else {
        # Train from scratch or previous epoch
        if (epoch > 1) {
          prev_path <- sprintf("artifacts://ml/checkpoints/fold%d_epoch%03d.rds",
                              fold, epoch - 1)
          model <- readRDS(resolve_path(prev_path))
        } else {
          # Initialize a new model (simple linear model for demo)
          model <- list(
            weights = rnorm(10),
            bias = 0,
            learning_rate = 0.01
          )
        }
        
        # Simulate one epoch of training
        set.seed(fold * 1000 + epoch)
        
        # Get mock training data for this fold
        n_samples <- 100
        X <- matrix(rnorm(n_samples * 10), ncol = 10)
        y <- X %*% rnorm(10) + rnorm(n_samples)
        
        # Simple gradient descent update (mock training)
        predictions <- X %*% model$weights + model$bias
        errors <- predictions - y
        gradient_w <- t(X) %*% errors / n_samples
        gradient_b <- mean(errors)
        
        # Update model
        model$weights <- model$weights - model$learning_rate * gradient_w
        model$bias <- model$bias - model$learning_rate * gradient_b
      }
      
      # Calculate loss on validation data
      val_X <- matrix(rnorm(50 * 10), ncol = 10)
      val_y <- val_X %*% rnorm(10) + rnorm(50)
      val_pred <- val_X %*% model$weights + model$bias
      loss <- mean((val_pred - val_y)^2)  # MSE loss
      
      list(
        model = model,
        fold = fold,
        epoch = epoch,
        loss = loss
      )
    },
    schema = returns(
      model = artifact(),
      fold = int(),
      epoch = int(),
      loss = dbl()
    ),
    sink = sink_spec(
      fields = "model",
      dir = "artifacts://ml/checkpoints",
      template = "fold{fold}_epoch{epoch:03d}",
      overwrite = "skip"  # Don't recompute existing
    )
  )

# Can interrupt and resume training!
results <- collect(ml_training)

# Plot training curves without loading models
library(ggplot2)
ggplot(results, aes(x = epoch, y = loss, color = factor(fold))) +
  geom_line() +
  labs(title = "Training Progress")

Scenario 3: Collaborative Analysis with Selective Sharing

Different team members work on different parts:

# Setup: Create subject list
all_subjects <- data.frame(
  subject_id = sprintf("sub_%03d", 1:100),
  group = rep(c("control", "treatment"), 50)
)

# Alice runs preprocessing
preprocessing <- flow(all_subjects[1:50, ]) |>
  stage("clean",
    f = function(subject_id, group) {
      # Simulate data cleaning
      set.seed(as.numeric(factor(subject_id)))
      raw_data <- data.frame(
        value = rnorm(1000, mean = ifelse(group == "control", 100, 110)),
        time = 1:1000
      )
      
      # Remove outliers and normalize
      cleaned <- raw_data[abs(scale(raw_data$value)) < 3, ]
      cleaned$value <- scale(cleaned$value)
      
      list(cleaned = cleaned)
    },
    schema = returns(cleaned = artifact()),
    sink = sink_spec(
      fields = "cleaned",
      dir = "artifacts://shared/preprocessing"
    ))

alice_results <- collect(preprocessing)

# Bob runs models on Alice's outputs
modeling <- flow(alice_results) |>
  stage("model",
    needs = "clean",  # Reference Alice's stage
    f = function(clean.cleaned) {
      # Load preprocessed data
      data <- readRDS(clean.cleaned$path)
      
      # Fit a simple time series model
      model <- lm(value ~ poly(time, 3), data = data)
      
      list(model = model)
    },
    schema = returns(model = artifact()),
    sink = sink_spec(
      fields = "model",
      dir = "artifacts://shared/models"
    ))

bob_results <- collect(modeling)

# Carol visualizes without needing full models
visualization <- flow(bob_results) |>
  stage("plot",
    needs = "model",  # Reference Bob's stage
    f = function(model.model) {
      # Load just the coefficients, not full model
      m <- readRDS(model.model$path)
      coefs <- coef(m)
      
      # Create a simple coefficient plot
      # In production, use ggplot2 for better visualizations
      pdf(NULL)  # Start null device to capture plot
      barplot(coefs[-1], 
              names.arg = paste0("Term", 1:(length(coefs)-1)),
              main = "Model Coefficients",
              col = "steelblue")
      plot_obj <- recordPlot()  # Capture the plot
      dev.off()
      
      list(plot = plot_obj)
    },
    schema = returns(plot = artifact()),
    sink = sink_spec(
      fields = "plot",
      dir = "artifacts://shared/figures",
      format = "rds"  # Save R plot objects as RDS
    ))

Advanced Patterns

Pattern 1: Mixed Storage Strategies

Different data types need different formats and locations:

# Configure different sinks for different data types
model_sink <- sink_spec(
  fields = "model",
  dir = "artifacts://models",
  format = "rds",
  compress = "gzip"  # Compress large R objects
)

image_sink <- sink_spec(
  fields = "brain_image",
  dir = "artifacts://images",
  format = "nii.gz",  # Standard neuroimaging format
  compress = FALSE    # Already compressed
)

summary_sink <- sink_spec(
  fields = "summary",
  dir = "project://summaries",  # Small files stay with project
  format = "json"     # Human-readable
)

Pattern 2: Conditional Artifacts

Save artifacts only when they meet criteria:

stage("process",
  f = function(data) {
    model <- fit_model(data)
    size <- object.size(model)
    
    if (size > 10*1024*1024) {  # > 10MB
      list(model = model, stored = TRUE)
    } else {
      list(model = model, stored = FALSE)
    }
  },
  schema = returns(
    model = maybe(artifact()),  # Conditional artifact
    stored = lgl()
  ),
  sink = sink_spec(
    fields = "model",
    dir = "artifacts://conditional",
    when = ~ stored  # Only sink when stored = TRUE
  )
)

Pattern 3: Artifact Versioning

Track different versions of analyses:

version <- format(Sys.Date(), "%Y%m%d")

sink_spec(
  fields = "model",
  dir = sprintf("artifacts://models/v_%s", version),
  template = "{subject}_{session}_{.row_key}"
)

# Creates versioned paths:
# artifacts://models/v_20240315/subj01_ses1_abc123.rds
# artifacts://models/v_20240316/subj01_ses1_def456.rds

Troubleshooting

Issue: “No space left on device”

# Check where artifacts are going
paths_get()$artifacts

# Switch to larger storage
paths_set(artifacts = "/large/storage/partition")

Issue: “Permission denied”

# Ensure write permissions
dir <- resolve_path("artifacts://test")
dir.create(dir, recursive = TRUE, mode = "0755")

Issue: Can’t find artifact files

# Check if paths match between systems
artifact_path <- results$model[[1]]$path
file.exists(artifact_path)

# Use relative paths in portable code
relative_path <- sub(paths_get()$artifacts, "artifacts://", artifact_path)

Best Practices

  1. Use artifacts for outputs > 10MB

    # Good: Large objects as artifacts
    schema = returns(big_model = artifact(), summary = dbl())
    
    # Bad: Everything as artifact (unnecessary I/O)
    schema = returns(tiny_value = artifact())
  2. Organize artifacts hierarchically

    # Good: Clear organization
    "artifacts://preprocessing/cleaned/"
    "artifacts://models/fitted/"
    "artifacts://results/figures/"
    
    # Bad: Flat structure
    "artifacts://everything/"
  3. Include metadata with artifacts

    sink_spec(
      fields = "model",
      dir = "artifacts://models",
      sidecar = "json"  # Creates .json with metadata
    )
  4. Clean up old artifacts periodically

    # Remove artifacts older than 30 days
    old_files <- list.files(
      resolve_path("artifacts://temp"),
      full.names = TRUE,
      recursive = TRUE
    )
    file.remove(old_files[file.mtime(old_files) < Sys.Date() - 30])

Next Steps

Now that you understand artifacts, learn about:

Quick Reference

Function Purpose Example
artifact() Declare field as artifact returns(model = artifact())
sink_spec() Configure artifact storage sink_spec(fields = "model", dir = "artifacts://")
resolve_path() Get absolute path resolve_path("artifacts://models")
readRDS() Load artifact readRDS(results$model[[1]]$path)
Sink Option Purpose Example
fields What to save fields = c("model", "data")
dir Where to save dir = "artifacts://outputs"
format File format format = "rds" or format = list(model = "rds", plot = "pdf")
template Naming pattern template = "{subject}/{session}.rds"
overwrite Resume behavior overwrite = "skip"

Summary

Artifacts solve the large output problem by:

  • Automatically saving large objects to disk during processing
  • Returning references instead of loading everything into memory
  • Using portable paths that work across different systems
  • Enabling selective loading of only the data you need

Start using artifacts when your outputs exceed ~10MB per item or when total memory usage becomes a concern. Your RAM (and your cluster admin) will thank you!