Distribution: local and SLURM (barriers, throttling, chunking)
parade-slurm-distribution.RmdRequires parade ≥ 0.6.0.
Note: Code evaluation is disabled to keep builds fast and avoid submitting jobs during checks. Copy code into an interactive session to run on your local machine or SLURM cluster.
Introduction to distributed computing with parade
parade separates the workflow definition (DAG) from the execution strategy (distribution plan), allowing you to run the same computational pipeline locally for development or on SLURM clusters for production. This separation enables:
-
Local development: Test workflows on your laptop
with
dist_local() -
Cluster scaling: Execute the same workflow on SLURM
with
dist_slurm() - Resource optimization: Fine-tune parallelization and resource allocation
- Flexible chunking: Balance job granularity and scheduling efficiency
The key insight is that parade workflows are portable - you define your computation once and choose how to distribute it based on your current needs.
“One big machine” (best-effort) on SLURM
On a cluster, it’s tempting to think in terms of a single machine
with nodes * cpus_per_node cores. SLURM doesn’t natively
provide that abstraction for arbitrary batch jobs; it schedules jobs
that request resources.
parade doesn’t yet implement a true “SLURM pool” (single allocation + dispatcher / work-stealing queue), but you can get close today using:
-
Packed jobs for
slurm_map(): run many tasks per SLURM job and use within‑node parallelism to fully utilize each node. -
Fixed job count for flows (
submit()): choosetarget_jobsso you keep an approximately constant number of jobs in flight, independent of the number of groups in your grid. - Dispatcher backends (dynamic scheduling): use a backend that can hand out work to persistent workers to reduce tail latency when task durations vary.
Example: 10 nodes × 196 cores
For task mapping, the most ergonomic option is
slurm_map_cluster():
tasks <- 1:10000
jobs <- slurm_map_cluster(
tasks,
~ .x^2,
nodes = 10,
cpus_per_node = 196,
oversubscribe = 2, # queue 2× concurrency to reduce stragglers
.resources = list(time = "2h", mem = "64G")
)
results <- jobs |> progress() |> collect()For flows, use dist_slurm_allocation() to express the
allocation shape once:
fl <- fl |>
distribute(dist_slurm_allocation(
nodes = 10,
cores_per_node = 196,
within = "multicore",
resources = list(time = "2h", mem = "64G"),
target_jobs = 20 # ~10 nodes * oversubscribe 2
))Important limits: dist_slurm_*() uses
static partitioning (chunks are fixed at submit time). If task
durations are highly variable (e.g., 10s–1h) and you have many tasks
(50–10,000+), a dispatcher backend can reduce tail latency further.
When you want true dynamic scheduling
Dynamic scheduling means: “workers pull the next unit of work when they finish the last one.” On SLURM, that usually implies a persistent pool of workers behind a queue/dispatcher. In parade, the closest options today are:
-
dist_mirai(..., dispatcher = TRUE): low-latency, load-balanced scheduling with persistent daemons (which can be launched via SLURM). -
dist_crew(controller = ...): schedule chunks on acrewcontroller (local or cluster).
A dedicated “SLURM pool” backend (one allocation + dispatcher + task queue) is intentionally not in parade yet, but the submit backend system is designed to support it in a follow-on package.
Local vs. SLURM distribution strategies
Local distribution for development
Before we switch to SLURM, it helps to understand what the distribution API does:
-
distribute()attaches an execution strategy to a flow. The strategy does not change your DAG — it only controls how rows of the parameter grid are scheduled. -
dist_local()builds a local strategy (great for development) and lets you specify grouping (withby), the within‑job execution mode (within), and the amount of parallelism inside each job (workers_within). -
dist_slurm()does the same for a SLURM cluster (we cover it in the next section).
The by argument refers to column names in your parameter
grid. In this vignette we group by subject, so all rows for
the same subject run in the same job.
If you want to “test on a subset,” simply filter your grid before
building the flow (for example,
grid_dev <- subset(grid, subject == "sub01")). The code
below keeps the full grid and illustrates local parallelism.
library(parade)
library(progressr)
# Initialize project structure
paths_init()
# Create a parameter grid
grid <- param_grid(
subject = c("sub01", "sub02", "sub03"),
session = 1:2,
condition = c("A", "B")
) # Results in 12 rows (3×2×2)
# Define a simple workflow
fl <- flow(grid) |>
stage("process", function(subject, session, condition) {
# Simulate computational work
Sys.sleep(runif(1, 0.1, 0.5))
list(result = paste(subject, session, condition, sep = "_"))
})
# Local development: run locally (optionally on a subset)
fl_local <- fl |>
distribute(dist_local(
by = "subject", # Group by subject
within = "multisession", # Use parallel workers within each group
workers_within = 2 # Use 2 cores per group
))SLURM distribution for production
The same workflow can be scaled to a SLURM cluster:
# Row-wise: one job per parameter combination
fl_rowwise <- fl |> distribute(dist_slurm(
by = NULL, # No grouping - each row is independent
within = "multisession", # Parallel execution within each job
workers_within = 4, # 4 workers per job
resources = batch_resources( # SLURM resource specification
nodes = 1,
cpus_per_task = 4,
time = "30min",
mem = "8GB"
),
chunks_per_job = 1 # One parameter row per job
))
# Subject-wise: group related work into fewer jobs
fl_subject <- fl |> distribute(dist_slurm(
by = "subject", # Group by subject
within = "multisession", # Parallel within each job
workers_within = 8, # More workers since we're grouping
resources = batch_resources(
nodes = 1,
cpus_per_task = 8,
time = "2h", # Longer time for grouped work
mem = "16GB"
),
chunks_per_job = 1 # One subject per job
))Understanding grouping strategies
The by parameter controls how your parameter grid is
partitioned:
-
by = NULL(row-wise): Each parameter combination becomes a separate group- Pro: Maximum parallelization, fine-grained control
- Con: Many small jobs, scheduling overhead
- Best for: Independent tasks that benefit from dedicated resources
-
by = "subject"(barrier groups): Group related parameter combinations- Pro: Fewer jobs, shared setup costs, better resource utilization
- Con: Less parallelization, potential load imbalance
- Best for: Tasks that share expensive initialization (data loading, model setup)
Resource allocation and optimization
Understanding the parallelization hierarchy
parade provides two levels of parallelization control:
- Job-level parallelization: How many SLURM jobs to submit
- Within-job parallelization: How many tasks run concurrently within each job
> Tip: If you maintain named resource profiles (via `slurm_defaults_set()` or
> `profile_register()`), you can avoid boilerplate by using the convenience helper
> `dist_slurm_profile()`:
>
> ```r
> fl |> distribute(dist_slurm_profile("standard", by = "subject", workers_within = 8))
> ```
>
> This resolves the profile through `slurm_resources()` and fills the `resources`
> argument of `dist_slurm()` for you.
r
# Example: 24 parameter combinations, grouped by subject (3 subjects × 8 combinations each)
grid <- param_grid(
subject = c("sub01", "sub02", "sub03"),
session = 1:4,
run = 1:2
) # 24 total combinations
fl <- flow(grid) |>
stage("analyze",
f = function(subject, session, run) {
# Simulate expensive computation
set.seed(as.numeric(factor(subject)) * 100 + session * 10 + run)
# Mock expensive matrix operations
n <- 1000
X <- matrix(rnorm(n * n), nrow = n)
result <- svd(X, nu = 10, nv = 10) # Partial SVD
list(
singular_values = result$d[1:10],
subject = subject,
session = session,
run = run
)
},
schema = returns(
singular_values = lst(),
subject = chr(),
session = int(),
run = int()
)
)
# Strategy 1: One job per subject, parallel within
fl |> distribute(dist_slurm(
by = "subject", # 3 jobs (one per subject)
workers_within = 8, # 8 parallel tasks per job
chunks_per_job = 1, # Each subject in one job
resources = batch_resources(
cpus_per_task = 8, # Match workers_within
time = "4h",
mem = "32GB"
)
))
# Strategy 2: Split large groups across multiple jobs
fl |> distribute(dist_slurm(
by = "subject", # Group by subject first
workers_within = 4, # 4 parallel tasks per job
chunks_per_job = 2, # Split each subject into 2 jobs
resources = batch_resources( # Results in 6 total jobs (3 subjects × 2)
cpus_per_task = 4, # Match workers_within
time = "2h", # Shorter time per chunk
mem = "16GB"
)
))
Throttling concurrent execution
The workers_within parameter acts like
xargs -P, limiting concurrent tasks within
each SLURM job:
# Without throttling: all tasks run simultaneously (may overwhelm resources)
dist_slurm(by = "subject", workers_within = NULL)
# With throttling: limit to 4 concurrent tasks per job
dist_slurm(by = "subject", workers_within = 4)Guidelines for workers_within: - Match
or slightly exceed cpus_per_task - Consider memory usage:
total_memory / workers_within per task - Account for I/O
bottlenecks in shared filesystems - Start conservatively and increase
based on monitoring
Advanced chunking strategies
Optimizing job granularity
The chunks_per_job parameter controls how groups are
split across SLURM jobs:
# Large groups: split for better parallelization
grid <- param_grid(
subject = "subject01", # One subject
trial = 1:1000 # But 1000 trials
)
fl <- flow(grid) |>
stage("process_trial",
f = function(subject, trial) {
# Simulate trial-level analysis
set.seed(trial)
# Mock signal processing
signal <- rnorm(1000) # Simulated time series
filtered <- filter(signal, rep(1/5, 5), sides = 2) # Moving average
# Extract features
features <- list(
mean_amplitude = mean(filtered, na.rm = TRUE),
peak_value = max(filtered, na.rm = TRUE),
variance = var(filtered, na.rm = TRUE)
)
list(
subject = subject,
trial = trial,
features = features
)
},
schema = returns(
subject = chr(),
trial = int(),
features = lst()
)
)
# Split into smaller chunks for better scheduling
fl |> distribute(dist_slurm(
by = "subject", # Group by subject
chunks_per_job = 100, # 100 trials per job → 10 jobs total
workers_within = 4, # 4 parallel trials per job
resources = batch_resources(
cpus_per_task = 4,
time = "1h", # Shorter jobs are easier to schedule
mem = "8GB"
)
))Multi-dimensional grouping
# Complex parameter space
grid <- param_grid(
subject = c("s01", "s02", "s03"),
condition = c("rest", "task"),
session = 1:3
)
# Define workflow with actual computation
fl <- flow(grid) |>
stage("analyze",
f = function(subject, condition, session) {
# Simulate fMRI analysis
set.seed(as.numeric(factor(subject)) * 100 +
as.numeric(factor(condition)) * 10 + session)
# Mock connectivity analysis
n_regions <- 50
timeseries <- matrix(rnorm(n_regions * 200), nrow = n_regions)
correlation_matrix <- cor(t(timeseries))
list(
mean_connectivity = mean(correlation_matrix[upper.tri(correlation_matrix)]),
subject = subject,
condition = condition,
session = session
)
},
schema = returns(
mean_connectivity = dbl(),
subject = chr(),
condition = chr(),
session = int()
)
)
# Group by multiple columns
fl |> distribute(dist_slurm(
by = c("subject", "condition"), # One job per subject-condition pair
workers_within = 3, # Parallel across sessions
chunks_per_job = 1,
resources = batch_resources(
cpus_per_task = 3,
time = "30min",
mem = "4GB"
)
))SLURM job submission and monitoring
Deferred execution workflow
# 1. Validate workflow locally (recommended)
preflight(fl_subject)
# 2. Submit to SLURM (returns immediately)
d <- submit(fl_subject, mode = "index")The workflow is now running on the cluster. Use these commands to monitor progress:
# Check job status
deferred_status(d, show_progress = TRUE)
# Monitor logs in real-time
# Use scripts/parade_tail.R for continuous monitoring
# Collect completed results
results <- deferred_collect(d, how = "index")Resource specification examples
# CPU-intensive tasks
resources <- batch_resources(
partition = "standard",
nodes = 1,
cpus_per_task = 16,
time = "4h",
mem = "32GB"
)
# Memory-intensive tasks
resources <- batch_resources(
partition = "himem",
nodes = 1,
cpus_per_task = 4,
time = "8h",
mem = "128GB"
)
# GPU computing
resources <- batch_resources(
partition = "gpu",
nodes = 1,
ntasks_per_node = 1,
cpus_per_task = 8,
gres = "gpu:1",
time = "2h",
mem = "24GB"
)
# Multi-node MPI (if supported by your workflow)
resources <- batch_resources(
partition = "standard",
nodes = 4,
ntasks_per_node = 16,
time = "12h",
mem = "64GB"
)Common patterns and best practices
Pattern 1: Development → Production scaling
# Define the analysis workflow once
analyze_workflow <- function(grid) {
flow(grid) |>
stage("preprocess",
f = function(subject, session) {
# Simulate preprocessing
set.seed(as.numeric(factor(subject)) * 10 + session)
data <- matrix(rnorm(1000 * 100), nrow = 1000)
# Normalize and filter
normalized <- scale(data)
list(processed_data = normalized)
},
schema = returns(processed_data = lst())
) |>
stage("analyze",
needs = "preprocess",
f = function(preprocess.processed_data) {
# Run statistical analysis
pca_result <- prcomp(preprocess.processed_data, rank. = 10)
list(variance_explained = summary(pca_result)$importance[2, 1:10])
},
schema = returns(variance_explained = lst())
)
}
# Development: quick test on subset
grid_dev <- param_grid(subject = "sub01", session = 1:2)
fl_dev <- analyze_workflow(grid_dev) |>
distribute(dist_local(workers_within = 2))
# Test locally
dev_results <- collect(fl_dev)
print(dev_results) # Verify everything works
# Production: full dataset on cluster
grid_prod <- param_grid(subject = sprintf("sub%02d", 1:100), session = 1:4)
fl_prod <- analyze_workflow(grid_prod) |>
distribute(dist_slurm(
by = "subject",
workers_within = 4,
chunks_per_job = 1,
resources = batch_resources(cpus_per_task = 4, time = "2h", mem = "16GB")
))Pattern 2: Adaptive chunking based on data size
# Small datasets: group more aggressively
small_fl |> distribute(dist_slurm(
by = "subject",
chunks_per_job = 1, # One job per subject
workers_within = 8 # High parallelism within job
))
# Large datasets: split into smaller chunks
large_fl |> distribute(dist_slurm(
by = "subject",
chunks_per_job = 4, # Split each subject across 4 jobs
workers_within = 2 # Lower parallelism per job
))Common pitfalls and solutions
Problem: Jobs fail with memory errors
Solution: Increase memory allocation or reduce
workers_within:
# Before: likely to exceed memory
dist_slurm(workers_within = 16, resources = batch_resources(mem = "8GB"))
# After: more conservative memory usage
dist_slurm(workers_within = 4, resources = batch_resources(mem = "8GB"))
# Now each task gets ~2GB instead of ~512MBProblem: Many short jobs overwhelming the scheduler
Solution: Increase chunks_per_job to
create fewer, longer-running jobs:
# Before: 1000 small jobs
dist_slurm(by = NULL, chunks_per_job = 1) # One job per parameter row
# After: 100 medium jobs
dist_slurm(by = NULL, chunks_per_job = 10) # 10 parameter rows per jobProblem: Load imbalance in grouped jobs
Solution: Use row-wise distribution or smaller chunks:
# Before: uneven subject sizes cause load imbalance
dist_slurm(by = "subject", chunks_per_job = 1)
# After: more even distribution
dist_slurm(by = NULL, chunks_per_job = 5) # 5 rows per job regardless of subjectProblem: I/O bottlenecks on shared filesystems
Solution: Reduce concurrent tasks and consider I/O patterns:
# Before: high I/O contention
dist_slurm(workers_within = 32)
# After: reduced contention
dist_slurm(workers_within = 4) # Fewer concurrent file operations