Enhanced pipeline monitor with event feed
pipeline_top.RdWraps deferred_top() with an event feed from the event store and
classified error summaries. Provides a comprehensive live view of
pipeline execution.
Usage
pipeline_top(
run_id = NULL,
d = NULL,
refresh = 3,
max_events = 10L,
max_errors = 5L,
clear = TRUE
)Arguments
- run_id
Optional run ID. If NULL, uses the most recent run from the registry.
- d
Optional
parade_deferredobject. If provided,run_idis taken from it.- refresh
Refresh interval in seconds (default 3)
- max_events
Maximum recent events to show (default 10)
- max_errors
Maximum classified errors to show (default 5)
- clear
Whether to clear screen between updates
Examples
# \donttest{
grid <- data.frame(x = 1:6, g = rep(1:3, 2))
fl <- flow(grid) |>
stage("s", function(x) list(y = x^2), schema = returns(y = dbl())) |>
distribute(dist_local(by = "g", within = "sequential"))
d <- submit(fl)
#> [parade] submit prune: scanning 3 groups for cached outputs
#> [parade] submit prune complete in 0.0s (0 pruned, 3 pending)
pipeline_top(d = d, refresh = 1)
#> parade::pipeline_top -
#>
#> Run: 5fb00461 Backend: local Submitted: 2026-05-05 14:26:56.520417
#> Elapsed: 0:00:00 By: g
#> Stages: s
#>
#> Progress [########################] 100% (3/3 chunks)
#>
#> total=3 resolved=3 unresolved=0
#>
#> -- Recent Events --------------------------------------------------------------
#> 00:00:00 chunk 2 / s started (attempt 1)
#> 00:00:00 chunk 2 / s completed (0.0s)
#> 00:00:00 chunk 2 completed
#> 00:00:00 chunk 3 started
#> 00:00:00 chunk 3 / s started (attempt 1)
#> 00:00:00 chunk 3 / s completed (0.0s)
#> 00:00:00 chunk 3 / s started (attempt 1)
#> 00:00:00 chunk 3 / s completed (0.0s)
#> 00:00:00 chunk 3 completed
#> 00:00:00 run started
#>
#>
#> (All chunks completed)
#>
unlink(c(paths_get()$registry, paths_get()$artifacts), recursive = TRUE)
unlink("parade.log")
# }