Skip to contents

Requires parade ≥ 0.6.0.

Note: Code evaluation is disabled to keep builds fast and avoid submitting jobs during checks. Copy code into an interactive session to run on your local machine or SLURM cluster.

Introduction to distributed computing with parade

parade separates the workflow definition (DAG) from the execution strategy (distribution plan), allowing you to run the same computational pipeline locally for development or on SLURM clusters for production. This separation enables:

  • Local development: Test workflows on your laptop with dist_local()
  • Cluster scaling: Execute the same workflow on SLURM with dist_slurm()
  • Resource optimization: Fine-tune parallelization and resource allocation
  • Flexible chunking: Balance job granularity and scheduling efficiency

The key insight is that parade workflows are portable - you define your computation once and choose how to distribute it based on your current needs.

“One big machine” (best-effort) on SLURM

On a cluster, it’s tempting to think in terms of a single machine with nodes * cpus_per_node cores. SLURM doesn’t natively provide that abstraction for arbitrary batch jobs; it schedules jobs that request resources.

parade doesn’t yet implement a true “SLURM pool” (single allocation + dispatcher / work-stealing queue), but you can get close today using:

  • Packed jobs for slurm_map(): run many tasks per SLURM job and use within‑node parallelism to fully utilize each node.
  • Fixed job count for flows (submit()): choose target_jobs so you keep an approximately constant number of jobs in flight, independent of the number of groups in your grid.
  • Dispatcher backends (dynamic scheduling): use a backend that can hand out work to persistent workers to reduce tail latency when task durations vary.

Example: 10 nodes × 196 cores

For task mapping, the most ergonomic option is slurm_map_cluster():

tasks <- 1:10000

jobs <- slurm_map_cluster(
  tasks,
  ~ .x^2,
  nodes = 10,
  cpus_per_node = 196,
  oversubscribe = 2,                # queue 2× concurrency to reduce stragglers
  .resources = list(time = "2h", mem = "64G")
)

results <- jobs |> progress() |> collect()

For flows, use dist_slurm_allocation() to express the allocation shape once:

fl <- fl |>
  distribute(dist_slurm_allocation(
    nodes = 10,
    cores_per_node = 196,
    within = "multicore",
    resources = list(time = "2h", mem = "64G"),
    target_jobs = 20               # ~10 nodes * oversubscribe 2
  ))

Important limits: dist_slurm_*() uses static partitioning (chunks are fixed at submit time). If task durations are highly variable (e.g., 10s–1h) and you have many tasks (50–10,000+), a dispatcher backend can reduce tail latency further.

When you want true dynamic scheduling

Dynamic scheduling means: “workers pull the next unit of work when they finish the last one.” On SLURM, that usually implies a persistent pool of workers behind a queue/dispatcher. In parade, the closest options today are:

  • dist_mirai(..., dispatcher = TRUE): low-latency, load-balanced scheduling with persistent daemons (which can be launched via SLURM).
  • dist_crew(controller = ...): schedule chunks on a crew controller (local or cluster).

A dedicated “SLURM pool” backend (one allocation + dispatcher + task queue) is intentionally not in parade yet, but the submit backend system is designed to support it in a follow-on package.

Local vs. SLURM distribution strategies

Local distribution for development

Before we switch to SLURM, it helps to understand what the distribution API does:

  • distribute() attaches an execution strategy to a flow. The strategy does not change your DAG — it only controls how rows of the parameter grid are scheduled.
  • dist_local() builds a local strategy (great for development) and lets you specify grouping (with by), the within‑job execution mode (within), and the amount of parallelism inside each job (workers_within).
  • dist_slurm() does the same for a SLURM cluster (we cover it in the next section).

The by argument refers to column names in your parameter grid. In this vignette we group by subject, so all rows for the same subject run in the same job.

If you want to “test on a subset,” simply filter your grid before building the flow (for example, grid_dev <- subset(grid, subject == "sub01")). The code below keeps the full grid and illustrates local parallelism.

library(parade)
library(progressr)

# Initialize project structure
paths_init()

# Create a parameter grid
grid <- param_grid(
  subject = c("sub01", "sub02", "sub03"),
  session = 1:2,
  condition = c("A", "B")
) # Results in 12 rows (3×2×2)

# Define a simple workflow
fl <- flow(grid) |>
  stage("process", function(subject, session, condition) {
    # Simulate computational work
    Sys.sleep(runif(1, 0.1, 0.5))
    list(result = paste(subject, session, condition, sep = "_"))
  })

# Local development: run locally (optionally on a subset)
fl_local <- fl |>
  distribute(dist_local(
    by = "subject",           # Group by subject
    within = "multisession",  # Use parallel workers within each group  
    workers_within = 2        # Use 2 cores per group
  ))

SLURM distribution for production

The same workflow can be scaled to a SLURM cluster:

# Row-wise: one job per parameter combination
fl_rowwise <- fl |> distribute(dist_slurm(
  by = NULL,                                      # No grouping - each row is independent
  within = "multisession",                        # Parallel execution within each job
  workers_within = 4,                             # 4 workers per job
  resources = batch_resources(                    # SLURM resource specification
    nodes = 1, 
    cpus_per_task = 4, 
    time = "30min",
    mem = "8GB"
  ),
  chunks_per_job = 1                             # One parameter row per job
))

# Subject-wise: group related work into fewer jobs
fl_subject <- fl |> distribute(dist_slurm(
  by = "subject",                                 # Group by subject
  within = "multisession",                        # Parallel within each job
  workers_within = 8,                             # More workers since we're grouping
  resources = batch_resources(
    nodes = 1, 
    cpus_per_task = 8, 
    time = "2h",                                  # Longer time for grouped work
    mem = "16GB"
  ),
  chunks_per_job = 1                             # One subject per job
))

Understanding grouping strategies

The by parameter controls how your parameter grid is partitioned:

  • by = NULL (row-wise): Each parameter combination becomes a separate group
    • Pro: Maximum parallelization, fine-grained control
    • Con: Many small jobs, scheduling overhead
    • Best for: Independent tasks that benefit from dedicated resources
  • by = "subject" (barrier groups): Group related parameter combinations
    • Pro: Fewer jobs, shared setup costs, better resource utilization
    • Con: Less parallelization, potential load imbalance
    • Best for: Tasks that share expensive initialization (data loading, model setup)

Resource allocation and optimization

Understanding the parallelization hierarchy

parade provides two levels of parallelization control:

  1. Job-level parallelization: How many SLURM jobs to submit
  2. Within-job parallelization: How many tasks run concurrently within each job

> Tip: If you maintain named resource profiles (via `slurm_defaults_set()` or
> `profile_register()`), you can avoid boilerplate by using the convenience helper
> `dist_slurm_profile()`:
>
> ```r
> fl |> distribute(dist_slurm_profile("standard", by = "subject", workers_within = 8))
> ```
>
> This resolves the profile through `slurm_resources()` and fills the `resources`
> argument of `dist_slurm()` for you.
r
# Example: 24 parameter combinations, grouped by subject (3 subjects × 8 combinations each)
grid <- param_grid(
  subject = c("sub01", "sub02", "sub03"),
  session = 1:4, 
  run = 1:2
) # 24 total combinations

fl <- flow(grid) |> 
  stage("analyze", 
    f = function(subject, session, run) {
      # Simulate expensive computation
      set.seed(as.numeric(factor(subject)) * 100 + session * 10 + run)
      
      # Mock expensive matrix operations
      n <- 1000
      X <- matrix(rnorm(n * n), nrow = n)
      result <- svd(X, nu = 10, nv = 10)  # Partial SVD
      
      list(
        singular_values = result$d[1:10],
        subject = subject,
        session = session, 
        run = run
      )
    },
    schema = returns(
      singular_values = lst(),
      subject = chr(),
      session = int(),
      run = int()
    )
  )

# Strategy 1: One job per subject, parallel within
fl |> distribute(dist_slurm(
  by = "subject",                    # 3 jobs (one per subject)
  workers_within = 8,                # 8 parallel tasks per job
  chunks_per_job = 1,                # Each subject in one job
  resources = batch_resources(
    cpus_per_task = 8,               # Match workers_within
    time = "4h",
    mem = "32GB"
  )
))

# Strategy 2: Split large groups across multiple jobs
fl |> distribute(dist_slurm(
  by = "subject",                    # Group by subject first
  workers_within = 4,                # 4 parallel tasks per job  
  chunks_per_job = 2,                # Split each subject into 2 jobs
  resources = batch_resources(       # Results in 6 total jobs (3 subjects × 2)
    cpus_per_task = 4,               # Match workers_within
    time = "2h",                     # Shorter time per chunk
    mem = "16GB"
  )
))

Throttling concurrent execution

The workers_within parameter acts like xargs -P, limiting concurrent tasks within each SLURM job:

# Without throttling: all tasks run simultaneously (may overwhelm resources)
dist_slurm(by = "subject", workers_within = NULL)

# With throttling: limit to 4 concurrent tasks per job
dist_slurm(by = "subject", workers_within = 4)

Guidelines for workers_within: - Match or slightly exceed cpus_per_task - Consider memory usage: total_memory / workers_within per task - Account for I/O bottlenecks in shared filesystems - Start conservatively and increase based on monitoring

Advanced chunking strategies

Optimizing job granularity

The chunks_per_job parameter controls how groups are split across SLURM jobs:

# Large groups: split for better parallelization
grid <- param_grid(
  subject = "subject01",  # One subject
  trial = 1:1000          # But 1000 trials
)

fl <- flow(grid) |> 
  stage("process_trial", 
    f = function(subject, trial) {
      # Simulate trial-level analysis
      set.seed(trial)
      
      # Mock signal processing
      signal <- rnorm(1000)  # Simulated time series
      filtered <- filter(signal, rep(1/5, 5), sides = 2)  # Moving average
      
      # Extract features
      features <- list(
        mean_amplitude = mean(filtered, na.rm = TRUE),
        peak_value = max(filtered, na.rm = TRUE),
        variance = var(filtered, na.rm = TRUE)
      )
      
      list(
        subject = subject,
        trial = trial,
        features = features
      )
    },
    schema = returns(
      subject = chr(),
      trial = int(),
      features = lst()
    )
  )

# Split into smaller chunks for better scheduling
fl |> distribute(dist_slurm(
  by = "subject",                    # Group by subject
  chunks_per_job = 100,              # 100 trials per job → 10 jobs total
  workers_within = 4,                # 4 parallel trials per job
  resources = batch_resources(
    cpus_per_task = 4,
    time = "1h",                     # Shorter jobs are easier to schedule
    mem = "8GB"
  )
))

Multi-dimensional grouping

# Complex parameter space
grid <- param_grid(
  subject = c("s01", "s02", "s03"),
  condition = c("rest", "task"),
  session = 1:3
)

# Define workflow with actual computation
fl <- flow(grid) |>
  stage("analyze",
    f = function(subject, condition, session) {
      # Simulate fMRI analysis
      set.seed(as.numeric(factor(subject)) * 100 + 
               as.numeric(factor(condition)) * 10 + session)
      
      # Mock connectivity analysis
      n_regions <- 50
      timeseries <- matrix(rnorm(n_regions * 200), nrow = n_regions)
      correlation_matrix <- cor(t(timeseries))
      
      list(
        mean_connectivity = mean(correlation_matrix[upper.tri(correlation_matrix)]),
        subject = subject,
        condition = condition,
        session = session
      )
    },
    schema = returns(
      mean_connectivity = dbl(),
      subject = chr(),
      condition = chr(),
      session = int()
    )
  )

# Group by multiple columns
fl |> distribute(dist_slurm(
  by = c("subject", "condition"),    # One job per subject-condition pair
  workers_within = 3,                # Parallel across sessions  
  chunks_per_job = 1,
  resources = batch_resources(
    cpus_per_task = 3,
    time = "30min",
    mem = "4GB"
  )
))

SLURM job submission and monitoring

Deferred execution workflow

# 1. Validate workflow locally (recommended)
preflight(fl_subject)

# 2. Submit to SLURM (returns immediately)
d <- submit(fl_subject, mode = "index")

The workflow is now running on the cluster. Use these commands to monitor progress:

# Check job status
deferred_status(d, show_progress = TRUE)

# Monitor logs in real-time 
# Use scripts/parade_tail.R for continuous monitoring

# Collect completed results
results <- deferred_collect(d, how = "index")

Resource specification examples

# CPU-intensive tasks
resources <- batch_resources(
  partition = "standard",
  nodes = 1,
  cpus_per_task = 16,
  time = "4h",
  mem = "32GB"
)

# Memory-intensive tasks  
resources <- batch_resources(
  partition = "himem", 
  nodes = 1,
  cpus_per_task = 4,
  time = "8h", 
  mem = "128GB"
)

# GPU computing
resources <- batch_resources(
  partition = "gpu",
  nodes = 1,
  ntasks_per_node = 1,
  cpus_per_task = 8,
  gres = "gpu:1",
  time = "2h",
  mem = "24GB"
)

# Multi-node MPI (if supported by your workflow)
resources <- batch_resources(
  partition = "standard",
  nodes = 4,
  ntasks_per_node = 16, 
  time = "12h",
  mem = "64GB"
)

Common patterns and best practices

Pattern 1: Development → Production scaling

# Define the analysis workflow once
analyze_workflow <- function(grid) {
  flow(grid) |>
    stage("preprocess",
      f = function(subject, session) {
        # Simulate preprocessing
        set.seed(as.numeric(factor(subject)) * 10 + session)
        data <- matrix(rnorm(1000 * 100), nrow = 1000)
        
        # Normalize and filter
        normalized <- scale(data)
        list(processed_data = normalized)
      },
      schema = returns(processed_data = lst())
    ) |>
    stage("analyze",
      needs = "preprocess",
      f = function(preprocess.processed_data) {
        # Run statistical analysis
        pca_result <- prcomp(preprocess.processed_data, rank. = 10)
        list(variance_explained = summary(pca_result)$importance[2, 1:10])
      },
      schema = returns(variance_explained = lst())
    )
}

# Development: quick test on subset
grid_dev <- param_grid(subject = "sub01", session = 1:2)
fl_dev <- analyze_workflow(grid_dev) |> 
  distribute(dist_local(workers_within = 2))

# Test locally
dev_results <- collect(fl_dev)
print(dev_results)  # Verify everything works

# Production: full dataset on cluster  
grid_prod <- param_grid(subject = sprintf("sub%02d", 1:100), session = 1:4)
fl_prod <- analyze_workflow(grid_prod) |> 
  distribute(dist_slurm(
    by = "subject",
    workers_within = 4,
    chunks_per_job = 1,
    resources = batch_resources(cpus_per_task = 4, time = "2h", mem = "16GB")
  ))

Pattern 2: Adaptive chunking based on data size

# Small datasets: group more aggressively
small_fl |> distribute(dist_slurm(
  by = "subject", 
  chunks_per_job = 1,        # One job per subject
  workers_within = 8         # High parallelism within job
))

# Large datasets: split into smaller chunks  
large_fl |> distribute(dist_slurm(
  by = "subject",
  chunks_per_job = 4,        # Split each subject across 4 jobs
  workers_within = 2         # Lower parallelism per job
))

Common pitfalls and solutions

Problem: Jobs fail with memory errors

Solution: Increase memory allocation or reduce workers_within:

# Before: likely to exceed memory
dist_slurm(workers_within = 16, resources = batch_resources(mem = "8GB"))

# After: more conservative memory usage
dist_slurm(workers_within = 4, resources = batch_resources(mem = "8GB"))
# Now each task gets ~2GB instead of ~512MB

Problem: Many short jobs overwhelming the scheduler

Solution: Increase chunks_per_job to create fewer, longer-running jobs:

# Before: 1000 small jobs
dist_slurm(by = NULL, chunks_per_job = 1)  # One job per parameter row

# After: 100 medium jobs
dist_slurm(by = NULL, chunks_per_job = 10)  # 10 parameter rows per job

Problem: Load imbalance in grouped jobs

Solution: Use row-wise distribution or smaller chunks:

# Before: uneven subject sizes cause load imbalance
dist_slurm(by = "subject", chunks_per_job = 1)

# After: more even distribution
dist_slurm(by = NULL, chunks_per_job = 5)  # 5 rows per job regardless of subject

Problem: I/O bottlenecks on shared filesystems

Solution: Reduce concurrent tasks and consider I/O patterns:

# Before: high I/O contention  
dist_slurm(workers_within = 32)

# After: reduced contention
dist_slurm(workers_within = 4)  # Fewer concurrent file operations