Distribution: local and SLURM (barriers, throttling, chunking)
parade-slurm-distribution.RmdRequires parade ≥ 0.6.0.
Note: Code evaluation is disabled to keep builds fast and avoid submitting jobs during checks. Copy code into an interactive session to run on your local machine or SLURM cluster.
Introduction to distributed computing with parade
parade separates the workflow definition (DAG) from the execution strategy (distribution plan), allowing you to run the same computational pipeline locally for development or on SLURM clusters for production. This separation enables:
-
Local development: Test workflows on your laptop
with
dist_local() -
Cluster scaling: Execute the same workflow on SLURM
with
dist_slurm() - Resource optimization: Fine-tune parallelization and resource allocation
- Flexible chunking: Balance job granularity and scheduling efficiency
The key insight is that parade workflows are portable - you define your computation once and choose how to distribute it based on your current needs.
“One big machine” (best-effort) on SLURM
On a cluster, it’s tempting to think in terms of a single machine
with nodes * cpus_per_node cores. SLURM doesn’t natively
provide that abstraction for arbitrary batch jobs; it schedules jobs
that request resources.
parade doesn’t yet implement a true “SLURM pool” (single allocation + dispatcher / work-stealing queue), but you can get close today using:
-
Packed jobs for
slurm_map(): run many tasks per SLURM job and use within‑node parallelism to fully utilize each node. -
Fixed job count for flows (
submit()): choosetarget_jobsso you keep an approximately constant number of jobs in flight, independent of the number of groups in your grid. - Dispatcher backends (dynamic scheduling): use a backend that can hand out work to persistent workers to reduce tail latency when task durations vary.
Example: 10 nodes × 196 cores
For task mapping, the most ergonomic option is
slurm_map_cluster():
tasks <- 1:10000
jobs <- slurm_map_cluster(
tasks,
~ .x^2,
nodes = 10,
cpus_per_node = 196,
oversubscribe = 2, # queue 2× concurrency to reduce stragglers
.resources = list(time = "2h", mem = "64G")
)
results <- jobs |> progress() |> collect()For flows, dist_slurm_allocation() is a convenience
wrapper around dist_slurm() for this common scenario:
“I have N nodes with M cores each — fill them up.”
Instead of manually computing target_jobs,
cpus_per_task, and workers_within, you
describe the allocation shape and parade derives the rest:
fl <- fl |>
distribute(dist_slurm_allocation(
nodes = 10,
cores_per_node = 196,
within = "multicore",
resources = list(time = "2h", mem = "64G"),
target_jobs = 20 # ~10 nodes * oversubscribe 2
))This is equivalent to writing:
fl <- fl |>
distribute(dist_slurm(
within = "multicore",
workers_within = 196, # all cores on the node
target_jobs = 20, # 20 SLURM jobs
resources = list(
nodes = 1, ntasks = 1, # one full node per job
cpus_per_task = 196,
time = "2h", mem = "64G"
)
))By default target_jobs = nodes, so parade creates one
SLURM job per node and packs groups evenly across them. Setting
target_jobs higher (e.g., nodes * 2)
oversubscribes: parade creates more jobs than nodes, so when fast jobs
finish early, waiting jobs backfill immediately — reducing tail latency
from uneven task durations.
When to use which helper:
| You know… | Use |
|---|---|
| Exact per-job resources (CPUs, memory, walltime) |
dist_slurm() directly |
| “I have N nodes × M cores, fill them” | dist_slurm_allocation() |
| A named site profile (“standard”, “highmem”) | dist_slurm_profile() |
Important limits: dist_slurm_*() uses
static partitioning (chunks are fixed at submit time). If task
durations are highly variable (e.g., 10s–1h) and you have many tasks
(50–10,000+), a dispatcher backend can reduce tail latency further.
When you want true dynamic scheduling
Dynamic scheduling means: “workers pull the next unit of work when they finish the last one.” On SLURM, that usually implies a persistent pool of workers behind a queue/dispatcher. In parade, the closest options today are:
-
dist_mirai(..., dispatcher = TRUE): low-latency, load-balanced scheduling with persistent daemons (which can be launched via SLURM). -
dist_crew(controller = ...): schedule chunks on acrewcontroller (local or cluster).
A dedicated “SLURM pool” backend (one allocation + dispatcher + task queue) is intentionally not in parade yet, but the submit backend system is designed to support it in a follow-on package.
Local vs. SLURM distribution strategies
Local distribution for development
Before we switch to SLURM, it helps to understand what the distribution API does:
-
distribute()attaches an execution strategy to a flow. The strategy does not change your DAG — it only controls how rows of the parameter grid are scheduled. -
dist_local()builds a local strategy (great for development) and lets you specify grouping (withby), the within‑job execution mode (within), and the amount of parallelism inside each job (workers_within). -
dist_slurm()does the same for a SLURM cluster (we cover it in the next section).
The by argument refers to column names in your parameter
grid. In this vignette we group by subject, so all rows for
the same subject run in the same job.
If you want to “test on a subset,” simply filter your grid before
building the flow (for example,
grid_dev <- subset(grid, subject == "sub01")). The code
below keeps the full grid and illustrates local parallelism.
library(parade)
library(progressr)
# Initialize project structure
paths_init()
# Create a parameter grid
grid <- param_grid(
subject = c("sub01", "sub02", "sub03"),
session = 1:2,
condition = c("A", "B")
) # Results in 12 rows (3×2×2)
# Define a simple workflow
fl <- flow(grid) |>
stage("process", function(subject, session, condition) {
# Simulate computational work
Sys.sleep(runif(1, 0.1, 0.5))
list(result = paste(subject, session, condition, sep = "_"))
})
# Local development: run locally (optionally on a subset)
fl_local <- fl |>
distribute(dist_local(
by = "subject", # Group by subject
within = "multisession", # Use parallel workers within each group
workers_within = 2 # Use 2 cores per group
))SLURM distribution for production
The same workflow can be scaled to a SLURM cluster:
# Row-wise: one job per parameter combination
fl_rowwise <- fl |> distribute(dist_slurm(
by = NULL, # No grouping - each row is independent
within = "multisession", # Parallel execution within each job
workers_within = 4, # 4 workers per job
resources = batch_resources( # SLURM resource specification
nodes = 1,
cpus_per_task = 4,
time = "30min",
mem = "8GB"
),
chunks_per_job = 1 # One parameter row per job
))
# Subject-wise: group related work into fewer jobs
fl_subject <- fl |> distribute(dist_slurm(
by = "subject", # Group by subject
within = "multisession", # Parallel within each job
workers_within = 8, # More workers since we're grouping
resources = batch_resources(
nodes = 1,
cpus_per_task = 8,
time = "2h", # Longer time for grouped work
mem = "16GB"
),
chunks_per_job = 1 # One subject per job
))Understanding grouping strategies
The by parameter controls how your parameter grid is
partitioned:
-
by = NULL(row-wise): Each parameter combination becomes a separate group- Pro: Maximum parallelization, fine-grained control
- Con: Many small jobs, scheduling overhead
- Best for: Independent tasks that benefit from dedicated resources
-
by = "subject"(barrier groups): Group related parameter combinations- Pro: Fewer jobs, shared setup costs, better resource utilization
- Con: Less parallelization, potential load imbalance
- Best for: Tasks that share expensive initialization (data loading, model setup)
Resource allocation and optimization
Understanding the parallelization hierarchy
parade provides two levels of parallelization control:
- Job-level parallelization: How many SLURM jobs to submit
- Within-job parallelization: How many tasks run concurrently within each job
> Tip: If you maintain named resource profiles (via `slurm_defaults_set()` or
> `profile_register()`), you can avoid boilerplate by using the convenience helper
> `dist_slurm_profile()`:
>
> ```r
> fl |> distribute(dist_slurm_profile("standard", by = "subject", workers_within = 8))
> ```
>
> This resolves the profile through `slurm_resources()` and fills the `resources`
> argument of `dist_slurm()` for you.
r
# Example: 24 parameter combinations, grouped by subject (3 subjects × 8 combinations each)
grid <- param_grid(
subject = c("sub01", "sub02", "sub03"),
session = 1:4,
run = 1:2
) # 24 total combinations
fl <- flow(grid) |>
stage("analyze",
f = function(subject, session, run) {
# Simulate expensive computation
set.seed(as.numeric(factor(subject)) * 100 + session * 10 + run)
# Mock expensive matrix operations
n <- 1000
X <- matrix(rnorm(n * n), nrow = n)
result <- svd(X, nu = 10, nv = 10) # Partial SVD
list(
singular_values = result$d[1:10],
subject = subject,
session = session,
run = run
)
},
schema = returns(
singular_values = lst(),
subject = chr(),
session = int(),
run = int()
)
)
# Strategy 1: One job per subject, parallel within
fl |> distribute(dist_slurm(
by = "subject", # 3 jobs (one per subject)
workers_within = 8, # 8 parallel tasks per job
chunks_per_job = 1, # Each subject in one job
resources = batch_resources(
cpus_per_task = 8, # Match workers_within
time = "4h",
mem = "32GB"
)
))
# Strategy 2: Split large groups across multiple jobs
fl |> distribute(dist_slurm(
by = "subject", # Group by subject first
workers_within = 4, # 4 parallel tasks per job
chunks_per_job = 2, # Split each subject into 2 jobs
resources = batch_resources( # Results in 6 total jobs (3 subjects × 2)
cpus_per_task = 4, # Match workers_within
time = "2h", # Shorter time per chunk
mem = "16GB"
)
))
Throttling concurrent execution
The workers_within parameter acts like
xargs -P, limiting concurrent tasks within
each SLURM job:
# Without throttling: all tasks run simultaneously (may overwhelm resources)
dist_slurm(by = "subject", workers_within = NULL)
# With throttling: limit to 4 concurrent tasks per job
dist_slurm(by = "subject", workers_within = 4)Guidelines for workers_within: - Match
or slightly exceed cpus_per_task - Consider memory usage:
total_memory / workers_within per task - Account for I/O
bottlenecks in shared filesystems - Start conservatively and increase
based on monitoring
Packed nodes with callr process pool
When your scripts or stage functions use internal
parallelism (e.g., they call furrr,
mclapply, or rely on threaded BLAS/OpenMP), the
"multicore" and "multisession" within-modes
are a poor fit — those distribute individual rows across furrr
workers, which conflicts with the script’s own parallelism.
Instead, use within = "callr" to run each
group as a fully independent R process. Parade
maintains a pool of up to workers_within concurrent
processes; when one finishes, the next group is launched automatically
(work-queue pattern). Each process is free to use as many cores as it
needs internally — parade doesn’t interfere.
Example: packing a 196-core node with 20 concurrent analyses
Suppose each subject analysis uses 4 cores internally (via
furrr with 4 workers). You want to run 20 subjects at a
time on a single node, filling ~80 of the 196 cores, and cycle through
all 81 subjects:
fl |> distribute(dist_slurm(
by = "subject",
within = "callr", # Each group = independent R process
workers_within = 20L, # Up to 20 concurrent processes
target_jobs = 1L, # Pack everything onto one SLURM job
resources = list(
account = "rrg-mylab",
time = "12:00:00",
mem = "0", # All node memory
cpus_per_task = 196L,
nodes = 1L
)
))This produces:
SLURM job (1 node, 196 cores)
└─ parade parent process (orchestrator)
└─ callr pool, max 20 concurrent:
├─ r_bg(): subject_01 (4 cores via internal furrr)
├─ r_bg(): subject_02 (4 cores via internal furrr)
├─ ...
├─ r_bg(): subject_20
│ subject_03 finishes → launch subject_21
│ subject_07 finishes → launch subject_22
└─ ... until all 81 subjects done
When to use which within mode:
| Mode | Parallelism unit | Best for |
|---|---|---|
"sequential" |
None | Script already saturates the node |
"multicore" |
Rows (via furrr forks) | Many small, single-threaded rows |
"multisession" |
Rows (via furrr sessions) | Same as multicore, works in RStudio |
"callr" |
Groups (independent R processes) | Scripts with internal parallelism; packing a full node |
The callr mode also works with dist_local() for
development:
# Test locally: 3 groups, 2 concurrent R processes
fl |> distribute(dist_local(
by = "subject",
within = "callr",
workers_within = 2L
))Advanced chunking strategies
Optimizing job granularity
The chunks_per_job parameter controls how groups are
split across SLURM jobs:
# Large groups: split for better parallelization
grid <- param_grid(
subject = "subject01", # One subject
trial = 1:1000 # But 1000 trials
)
fl <- flow(grid) |>
stage("process_trial",
f = function(subject, trial) {
# Simulate trial-level analysis
set.seed(trial)
# Mock signal processing
signal <- rnorm(1000) # Simulated time series
filtered <- filter(signal, rep(1/5, 5), sides = 2) # Moving average
# Extract features
features <- list(
mean_amplitude = mean(filtered, na.rm = TRUE),
peak_value = max(filtered, na.rm = TRUE),
variance = var(filtered, na.rm = TRUE)
)
list(
subject = subject,
trial = trial,
features = features
)
},
schema = returns(
subject = chr(),
trial = int(),
features = lst()
)
)
# Split into smaller chunks for better scheduling
fl |> distribute(dist_slurm(
by = "subject", # Group by subject
chunks_per_job = 100, # 100 trials per job → 10 jobs total
workers_within = 4, # 4 parallel trials per job
resources = batch_resources(
cpus_per_task = 4,
time = "1h", # Shorter jobs are easier to schedule
mem = "8GB"
)
))Multi-dimensional grouping
# Complex parameter space
grid <- param_grid(
subject = c("s01", "s02", "s03"),
condition = c("rest", "task"),
session = 1:3
)
# Define workflow with actual computation
fl <- flow(grid) |>
stage("analyze",
f = function(subject, condition, session) {
# Simulate fMRI analysis
set.seed(as.numeric(factor(subject)) * 100 +
as.numeric(factor(condition)) * 10 + session)
# Mock connectivity analysis
n_regions <- 50
timeseries <- matrix(rnorm(n_regions * 200), nrow = n_regions)
correlation_matrix <- cor(t(timeseries))
list(
mean_connectivity = mean(correlation_matrix[upper.tri(correlation_matrix)]),
subject = subject,
condition = condition,
session = session
)
},
schema = returns(
mean_connectivity = dbl(),
subject = chr(),
condition = chr(),
session = int()
)
)
# Group by multiple columns
fl |> distribute(dist_slurm(
by = c("subject", "condition"), # One job per subject-condition pair
workers_within = 3, # Parallel across sessions
chunks_per_job = 1,
resources = batch_resources(
cpus_per_task = 3,
time = "30min",
mem = "4GB"
)
))SLURM job submission and monitoring
Deferred execution workflow
# 1. Validate workflow locally (recommended)
preflight(fl_subject)
# 2. Submit to SLURM (returns immediately)
d <- submit(fl_subject, mode = "index")The workflow is now running on the cluster. Use these commands to monitor progress:
# Check job status
deferred_status(d, show_progress = TRUE)
# Monitor logs in real-time
# Use scripts/parade_tail.R for continuous monitoring
# Collect completed results
results <- deferred_collect(d, how = "index")Resource specification examples
# CPU-intensive tasks
resources <- batch_resources(
partition = "standard",
nodes = 1,
cpus_per_task = 16,
time = "4h",
mem = "32GB"
)
# Memory-intensive tasks
resources <- batch_resources(
partition = "himem",
nodes = 1,
cpus_per_task = 4,
time = "8h",
mem = "128GB"
)
# GPU computing
resources <- batch_resources(
partition = "gpu",
nodes = 1,
ntasks_per_node = 1,
cpus_per_task = 8,
gres = "gpu:1",
time = "2h",
mem = "24GB"
)
# Multi-node MPI (if supported by your workflow)
resources <- batch_resources(
partition = "standard",
nodes = 4,
ntasks_per_node = 16,
time = "12h",
mem = "64GB"
)Common patterns and best practices
Pattern 1: Development → Production scaling
# Define the analysis workflow once
analyze_workflow <- function(grid) {
flow(grid) |>
stage("preprocess",
f = function(subject, session) {
# Simulate preprocessing
set.seed(as.numeric(factor(subject)) * 10 + session)
data <- matrix(rnorm(1000 * 100), nrow = 1000)
# Normalize and filter
normalized <- scale(data)
list(processed_data = normalized)
},
schema = returns(processed_data = lst())
) |>
stage("analyze",
needs = "preprocess",
f = function(preprocess.processed_data) {
# Run statistical analysis
pca_result <- prcomp(preprocess.processed_data, rank. = 10)
list(variance_explained = summary(pca_result)$importance[2, 1:10])
},
schema = returns(variance_explained = lst())
)
}
# Development: quick test on subset
grid_dev <- param_grid(subject = "sub01", session = 1:2)
fl_dev <- analyze_workflow(grid_dev) |>
distribute(dist_local(workers_within = 2))
# Test locally
dev_results <- collect(fl_dev)
print(dev_results) # Verify everything works
# Production: full dataset on cluster
grid_prod <- param_grid(subject = sprintf("sub%02d", 1:100), session = 1:4)
fl_prod <- analyze_workflow(grid_prod) |>
distribute(dist_slurm(
by = "subject",
workers_within = 4,
chunks_per_job = 1,
resources = batch_resources(cpus_per_task = 4, time = "2h", mem = "16GB")
))Pattern 2: Adaptive chunking based on data size
# Small datasets: group more aggressively
small_fl |> distribute(dist_slurm(
by = "subject",
chunks_per_job = 1, # One job per subject
workers_within = 8 # High parallelism within job
))
# Large datasets: split into smaller chunks
large_fl |> distribute(dist_slurm(
by = "subject",
chunks_per_job = 4, # Split each subject across 4 jobs
workers_within = 2 # Lower parallelism per job
))Common pitfalls and solutions
Problem: Jobs fail with memory errors
Solution: Increase memory allocation or reduce
workers_within:
# Before: likely to exceed memory
dist_slurm(workers_within = 16, resources = batch_resources(mem = "8GB"))
# After: more conservative memory usage
dist_slurm(workers_within = 4, resources = batch_resources(mem = "8GB"))
# Now each task gets ~2GB instead of ~512MBProblem: Many short jobs overwhelming the scheduler
Solution: Increase chunks_per_job to
create fewer, longer-running jobs:
# Before: 1000 small jobs
dist_slurm(by = NULL, chunks_per_job = 1) # One job per parameter row
# After: 100 medium jobs
dist_slurm(by = NULL, chunks_per_job = 10) # 10 parameter rows per jobProblem: Load imbalance in grouped jobs
Solution: Use row-wise distribution or smaller chunks:
# Before: uneven subject sizes cause load imbalance
dist_slurm(by = "subject", chunks_per_job = 1)
# After: more even distribution
dist_slurm(by = NULL, chunks_per_job = 5) # 5 rows per job regardless of subjectProblem: I/O bottlenecks on shared filesystems
Solution: Reduce concurrent tasks and consider I/O patterns:
# Before: high I/O contention
dist_slurm(workers_within = 32)
# After: reduced contention
dist_slurm(workers_within = 4) # Fewer concurrent file operations