---
title: "Advanced DAGs - Dynamic Fan-out and Collect"
output: rmarkdown::html_vignette
vignette: >
  %\VignetteIndexEntry{Advanced DAGs - Dynamic Fan-out and Collect}
  %\VignetteEngine{knitr::rmarkdown}
  %\VignetteEncoding{UTF-8}
---
```{r echo=FALSE}
knitr::opts_chunk$set(
  collapse = FALSE,
  comment = "",
  out.width = "100%",
  cache = FALSE,
  asciicast_knitr_output = "html"
)

asciicast::init_knitr_engine(
  echo = TRUE,
  echo_input = FALSE,
  same_process = TRUE,
  startup = quote({
    library(maestro)
    set.seed(1) 
  })
)
options(asciicast_theme = "pkgdown")
```

More complex types of DAGs involve dynamically spawning new pipelines (dynamic fan-out) based on a list or vector and/or collecting inputs from multiple pipelines into a single pipeline.

## Dynamic Fan-out

Sometimes an upstream pipeline returns a collection of values and you want to run a downstream pipeline **once per element** — a pattern called fan-out or scatter. Add `@maestroMap` to the downstream pipeline to enable this. An empty `@maestroMap` tag iterates over each element of the upstream return value directly.

```
get_letters
  |-shout[1]
  |-shout[2]
  |-shout[3]
```

```{r echo=FALSE, warning=FALSE, message=FALSE}
invisible(dir.create("pipelines"))
writeLines(
  "
  #' @maestroFrequency daily
  get_letters <- function() {
    c('a', 'b', 'c')
  }

  #' @maestroInputs get_letters
  #' @maestroMap
  shout <- function(.input) {
    toupper(.input)
  }",
  con = "pipelines/fanout.R"
)
```

```{r eval=FALSE}
#' @maestroFrequency daily
get_letters <- function() {
  c("a", "b", "c")
}

#' @maestroInputs get_letters
#' @maestroMap
shout <- function(.input) {
  toupper(.input)
}
```

`shout` will execute three times — once for `"a"`, once for `"b"`, once for `"c"` — and the CLI output labels each branch with its iteration index in square brackets.

```{asciicast}
library(maestro)

schedule <- build_schedule(quiet = TRUE)

run_schedule(schedule, run_all = TRUE)

get_artifacts(schedule)
```

Note there is no `@maestroOutputs` equivalent for defining dynamic fan-out. Here, you must use `@maestroInputs` combined with `@maestroMap`.

### Iterating over a field of a list

When the upstream pipeline returns a **named list**, use `@maestroMap` to select which field to scatter over. The full list remains available as `.input` inside each branch, so other fields are still accessible.

```{r echo=FALSE, warning=FALSE, message=FALSE}
invisible(file.remove("pipelines/fanout.R"))
writeLines(
  "
  #' @maestroFrequency daily
  get_letters <- function() {
    list(
      letter = letters[1:3],
      greeting = 'hello'
    )
  }

  #' @maestroInputs get_letters
  #' @maestroMap .input$letter
  make_message <- function(.input) {
    paste(.input$greeting, toupper(.input$letter))
  }",
  con = "pipelines/fanout.R"
)
```

```{r eval=FALSE}
#' @maestroFrequency daily
get_letters <- function() {
  list(
    letter = letters[1:3],
    greeting = "hello"
  )
}

#' @maestroInputs get_letters
#' @maestroMap .input$letter
make_message <- function(.input) {
  paste(.input$greeting, toupper(.input$letter))
}
```

`make_message` runs once per element of `letter`, producing `"hello A"`, `"hello B"`, `"hello C"`. The `greeting` field is available in every branch because the full list is passed as `.input` each time.

```{asciicast}
library(maestro)

schedule <- build_schedule(quiet = TRUE)

run_schedule(schedule, run_all = TRUE)

get_artifacts(schedule)
```

If the field name in `@maestroMap` does not exist in the upstream return value, maestro records an informative error on the downstream pipeline rather than silently producing zero branches.

### Iterating over multiple fields simultaneously

You can supply multiple space-separated expressions to `@maestroMap` to zip across several fields at once — similar to `purrr::pmap()`. Each iteration receives `.input` with all specified fields replaced by their i-th element.

```{r eval=FALSE}
#' @maestroFrequency daily
get_data <- function() {
  list(
    letter = letters[1:3],
    greeting = c("hello", "cheers", "hi")
  )
}

#' @maestroInputs get_data
#' @maestroMap .input$letter .input$greeting
make_message <- function(.input) {
  paste(.input$greeting, toupper(.input$letter))
}
```

This produces `"hello A"`, `"cheers B"`, `"hi C"` — each branch receives a distinct `(letter, greeting)` pair.

All vectors must be the same length, or length 1 (in which case the scalar is recycled across all iterations). Mismatched lengths produce a pipeline error.

## Fan-in (Collect)

Fan-in is the complement of fan-out: multiple upstream pipelines are gathered into a single downstream pipeline. Wrap one or more upstream names with `collect()` in `@maestroInputs` to enable this.

```
letter_a ─┐
           |-+combine
letter_b ─┘
```

The downstream pipeline receives a named list as `.input`, where each name corresponds to an upstream pipeline and each value is that pipeline's return value.

```{r echo=FALSE, warning=FALSE, message=FALSE}
invisible(file.remove("pipelines/fanout.R"))
writeLines(
  "
  #' @maestroFrequency daily
  letter_a <- function() 'a'

  #' @maestroFrequency daily
  letter_b <- function() 'b'

  #' @maestroInputs collect(letter_a, letter_b)
  combine <- function(.input) {
    paste0(.input$letter_a, .input$letter_b)
  }",
  con = "pipelines/fanin.R"
)
```

```{r eval=FALSE}
#' @maestroFrequency daily
letter_a <- function() "a"

#' @maestroFrequency daily
letter_b <- function() "b"

#' @maestroInputs collect(letter_a, letter_b)
combine <- function(.input) {
  paste0(.input$letter_a, .input$letter_b)
}
```

`combine` fires only after both `letter_a` and `letter_b` have succeeded. Inside `combine`, `.input$letter_a` is `"a"` and `.input$letter_b` is `"b"`. Collect pipelines are shown with a `|-+` prefix in the CLI to distinguish them from regular downstream pipelines.

```{asciicast}
library(maestro)

schedule <- build_schedule(quiet = TRUE)

run_schedule(schedule, run_all = TRUE)

get_status(schedule)[, c("pipe_name", "invoked", "success", "input_run_id", "lineage")]
```

If any upstream pipeline fails, the collect pipeline will not fire. The failed pipeline's run ID is also excluded from `input_run_id` in `get_status()`.

### Fan-out into Fan-in

`@maestroMap` and `collect()` compose naturally. An upstream pipeline can fan out with `@maestroMap`, and a downstream pipeline can gather all successful iterations back together with `collect()`. Note that in the dynamic fan-out to fan-in case, the downstream pipeline will run if at least one upstream iteration has succeeded.

```
numbers
  |-multiply[1] ─┐
  |-multiply[2] ──|-+add
  |-multiply[3] ─┘
```

```{r echo=FALSE, warning=FALSE, message=FALSE}
invisible(file.remove("pipelines/fanin.R"))
writeLines(
  "
  #' @maestroFrequency daily
  numbers <- function() 1:3

  #' @maestroInputs numbers
  #' @maestroMap
  multiply <- function(.input) .input * 3

  #' @maestroInputs collect(multiply)
  add <- function(.input) {
    sum(unlist(.input))
  }",
  con = "pipelines/fanin.R"
)
```

```{r eval=FALSE}
#' @maestroFrequency daily
numbers <- function() 1:3

#' @maestroInputs numbers
#' @maestroMap
multiply <- function(.input) .input * 3

#' @maestroInputs collect(multiply)
add <- function(.input) {
  sum(unlist(.input))
}
```

Here `multiply` executes three times (once per element of `1:3`), then `add` collects all three results and sums them. The `.input` received by `add` is a list of the successful iteration return values.

```{asciicast}
library(maestro)

schedule <- build_schedule(quiet = TRUE)

run_schedule(schedule, run_all = TRUE)

get_artifacts(schedule)$add
```

```{r cleanup2, echo=FALSE, message=FALSE, warning=FALSE}
invisible(unlink("pipelines", recursive = TRUE))
```
