---
title: "Running a Sharded Pipeline"
vignette: >
  %\VignetteIndexEntry{Running a Sharded Pipeline}
  %\VignetteEngine{quarto::html}
  %\VignetteEncoding{UTF-8}
knitr:
  opts_chunk:
    collapse: true
    comment: "#>"
---

```{r}
#| label: setup
#| include: false
# The live chunks materialise Parquet shards, so they need the fitting deps and
# duckplyr. Skip evaluation gracefully on a minimal runner rather than failing.
evaluate <- requireNamespace("ssddata", quietly = TRUE) &&
  requireNamespace("ssdtools", quietly = TRUE) &&
  requireNamespace("duckplyr", quietly = TRUE)
knitr::opts_chunk$set(eval = evaluate)
```

```{r}
#| label: library
library(ssdsims)
```

The ["Defining a Scenario"](defining-a-scenario.html) vignette ends with the
in-memory baseline runner, which threads each step's results forward in memory.
This vignette covers the next layer: materialising each step as
**Hive-partitioned Parquet shards** and linking the steps by reading parent
shards back from disk --- the storage hand-off the cluster
[targets](https://docs.ropensci.org/targets/) pipeline is built on
(`TARGETS-DESIGN.md` §5/§6).

The key idea is **two drivers over one execution core**. The per-task work
(draw, fit, hazard concentration) and its reproducible per-task RNG are shared;
only the *I/O and granularity* differ:

- `ssd_run_scenario_baseline()` --- single core, results threaded in memory.
- `ssd_run_scenario_shards()` --- single core, results written as Parquet shards
  and read back by name (**this vignette**).
- a `targets` pipeline --- the same shards, one named target each, runnable in
  parallel / on a cluster (**shown but not run below**).

Because each task installs its own `(seed, primer)`, all three produce
**byte-identical** per-task results: the storage layout is a free re-layout.

## `partition_by` and `bundle`: how tasks group into shards

A **shard** is one Parquet file: one cell of the `partition_by` *path* axes for
a step. The remaining axes --- the `bundle` (inner) axes --- become ordinary
columns *within* a shard, so several tasks share one file. The two are exact
complements (`path ⊎ inner = task_axes(step)`), and you can specify either,
per step, with at most one of `partition_by`/`bundle` naming a given step.

```{r}
#| label: scenario
scenario <- ssd_define_scenario(
  ssddata::ccme_boron,
  nsim = 2L,
  nrow = c(5L, 10L),
  seed = 42L,
  rescale = c(FALSE, TRUE),
  dists = c("lnorm", "gamma")
)
scenario
```

The `print()` shows, per step, the `partition_by` (across-shards) path axes and
the `bundle` (within-shard) inner axes. The default coarsens downstream:
`fit` shards on `(dataset, sim, nrow, rescale)`, while `hc` shards only on
`(dataset, sim)`. Steps partition **independently** --- there is no cross-step
constraint --- so an `hc` shard reads *several* `fit` shards and a `fit` shard
can read *several* `sample` shards. That **m:n** relationship is resolved when
the parent shards are read back, not by restricting `partition_by`.

The stored `partition_by` is the per-step path list (the `bundle`/inner axes
shown in `print()` are its complement):

```{r}
#| label: axes
scenario$partition_by$fit
```

## Run it single core, over shards

`ssd_run_scenario_shards()` runs the three steps and writes one Parquet per
shard under a Hive-partitioned tree, with **no** `targets` dependency. It
returns the shard paths per step.

```{r}
#| label: run
run <- ssd_run_scenario_shards(scenario)
run
```

The written tree is Hive-partitioned (`<step>/<axis=value>/.../part.parquet`),
so the path itself identifies the shard:

```{r}
#| label: layout
fit_files <- list.files(
  file.path(run$dir, "fit"),
  pattern = "part.parquet",
  recursive = TRUE
)
fit_files
```

One file per `partition_by` path cell --- here `2 (sim) x 2 (nrow) x 2 (rescale)
= 8` fit shards --- not one per task. Each file carries the step's inner axes
and the per-task results as columns.

### Results match the in-memory runner

`partition_by` is a free re-layout, so the sharded run's per-task results equal
the in-memory baseline's. The `hc` estimates, read back from the shards, line up
with `ssd_run_scenario_baseline()` joined on the task identity:

```{r}
#| label: oracle
base <- ssd_run_scenario_baseline(scenario)
base_est <- sort(do.call(rbind, base$hc$hc)$est)

summary_path <- ssd_summarize(
  file.path(run$dir, "sample"),
  file.path(run$dir, "fit"),
  file.path(run$dir, "hc"),
  file.path(run$dir, "summary.parquet")
)
summary_tbl <- dplyr::collect(duckplyr::read_parquet_duckdb(summary_path))
shard_est <- sort(summary_tbl$est)

all.equal(base_est, shard_est)
```

`ssd_summarize()` fans the `hc` layer in across shards (via duckplyr) into a
single `summary.parquet` --- the analysis-ready table --- without recomputing
anything.

## Scaling up: the `targets` pipeline

The sharded runner is single core. To run shards **in parallel** (or on a
cluster), the same building blocks plug into a static-branching `targets`
pipeline: `tarchetypes::tar_map()` mints one named target per shard, so
`targets` caches, invalidates, and reruns each shard independently. Two ready
example projects ship with the package --- a minimal `small` one and a fuller
`large` one (adapted from `scripts/example.R`, sweeping `nrow`, `proportion`,
and the estimation / CI methods). The `large` project parallelises its shards
across local workers with a mirai-backed `crew` controller
(`tar_option_set(controller = crew::crew_controller_local())`); swap in a
`crew.cluster` controller for SLURM/PBS:

```{r}
#| label: template-path
list.files(system.file("targets-templates", "small", package = "ssdsims"))
```

Each directory holds `scenario.R` (the study, shared by both drivers),
`_targets.R` (the pipeline), `run.R` (the **targets** driver), and
`run-serial.R` (the **single-core** driver). The whole `_targets.R` is just
*build a scenario and call the `ssd_scenario_targets()` factory* —

```{r}
#| label: factory
#| eval: false
library(targets)
library(tarchetypes)
library(ssdsims)
source("scenario.R") # builds `scenario`
ssd_scenario_targets(scenario) # the entire target list
```

— so editing the study means editing `scenario.R` only. Copy a directory's files
to your project root, edit `scenario.R`, then run either driver (the targets
path needs the `targets`/`tarchetypes` Suggests):

```{r}
#| label: tar-make
#| eval: false
# install.packages(c("targets", "tarchetypes"))
dir <- system.file("targets-templates", "small", package = "ssdsims")
file.copy(list.files(dir, full.names = TRUE), ".")

source("run.R") # targets: tar_make() -> one target per shard -> results/<step>/...
# or, from a shell:  Rscript run.R

source("run-serial.R") # single core via ssd_run_scenario_shards() -> results-serial/
# ...and, if results/ exists, it asserts the two drivers' estimates are identical
```

`ssd_scenario_targets()` mints one `format = "file"` target per shard (targets
passes the shard *path*, not its value, between steps) with `error = "null"` (a
failing whole shard records its error and goes `NULL` without aborting the run
--- the rest still build and the summary unions the survivors), and wires step
ordering with `tar_combine()` barriers (a directory read carries no automatic
dependency edge).

Because a shard's Hive-path depth depends on `partition_by`/`bundle`, replaying
with a *changed* split must not leave stale-granularity shards beside the new
ones (a `**` glob would union both). The two drivers handle this differently:
`run.R` (targets) writes each layout under its own root,
`scenario_results_dir(scenario)` = `results/layout=<hash>/`, so a split change is
a fresh tree; `run-serial.R` (single core) instead **owns** its `dir` and clears
each step subtree on every run.

Because both drivers `source("scenario.R")` --- the same study --- and
`partition_by` is a free re-layout, `run-serial.R` finishes by reading back its
own `summary.parquet` and the targets `scenario_results_dir(scenario)` summary
and asserting the per-task estimates are byte-identical. Whatever the driver --- baseline,
single-core shards, or `targets` --- the results are the same; choose the one
that fits the run.

## See also

- ["Defining a Scenario"](defining-a-scenario.html) --- the scenario object and
  the baseline runner.
- `TARGETS-DESIGN.md` §5 (tasks into shards), §6 (inter-shard linking, the Hive
  layout, the `targets` sketch).
- `?ssd_run_scenario_shards`, `?ssd_scenario_fit_shards`, `?ssd_run_fit_step`,
  `?ssd_summarize`.
