Join
A common use of buffers is to join together the results of two or more parallel branches. Crossflow provides a builtin join operation that can have two or more buffers connected into it. As soon as at least one message is present in each and every buffer connected to the join operation, the oldest message will be pulled from each buffer and combined into a single message that gets sent as the output of the join operation.
Join into struct
There are two ways to join buffers depending on whether you want to produce a struct or a collection. To join into a struct, you will need to specify a key (name) for each buffer connection. Each key will put the buffer value into a matching field in the struct.
The type of struct that gets created by the join operation is automatically inferred by what node the output of the join operation is connected to.
Note
A keyed join can be used to produce more than static structs. The values can also be gathered into a map, such as
HashMap,BTreeMap, or JSON where the key name of the connection will be used as the buffer value’s key in the map.
Each key can only be used once per join operation. If a buffer is used in multiple join operations, it can have a different key for each join.
No matter what struct you intend to create through the join, the workflow builder
will ensure that every necessary field has a corresponding buffer connected with
a matching key name and matching data type. When building workflows with the native
Rust API, any mismatch will produce a compilation error. When building a workflow
from a JSON diagram, you will get an IncompatibleLayout error.
Join into sequence
While the keyed join is generally the recommended way to do a join (explicit key
names have more semantic value), it is also possible to join into a sequence,
such as an array, Vec, tuple, or JSON.
For each connected buffer, specify a sequencing index that will determine where
in the sequence that element belongs.
When joining into an array or Vec, all the buffer values must have the same
data type. Joining into a tuple allows their data types to be mixed. Joining
into JSON requires the data to be serialized. Similar to error handling for the
keyed join, any incompatibility will produce a compilation error for the native
Rust API and an IncompatibleLayout error when building
from a JSON diagram.
Fetch by clone
When the conditions are met for a join operation to activate (at least one message is present in every connected buffer), the join operation will construct its output message by “fetching” the oldest message from each of its connected buffers. By default, “fetching” means to “pull” the message, removing it from the buffer and moving it into the output message.
Sometimes there may be a reason to clone the message out of the buffer instead of pulling it. For example if the buffer represents a checkpoint in your process that doesn’t need to be repeated, you can clone from the buffer to retain the checkpoint.
Suppose we want to make many apple pies with the same oven.
We can’t bake any pies until the oven is preheated, but once the oven has reached the right temperature, we do not need preheat it again. We can have our ingredient preparation branch repeatedly prepare the pie pans for baking, and all of those pans will be put in the oven as soon as the oven has finished preheating once. Any new pans that are prepared after the preheating is finished can go into the oven right away.
To express this behavior, we use fetch-by-clone for the buffer that tracks whether the oven is preheated, while we do the regular fetch-by-pull for the prepared pan. The “finished preheating” buffer will be able to retain its knowledge that the preheating has finished, while the prepared pan buffer will have its pans consumed (moved into the oven) each time that it can happen.