Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Building a Chain

Many workflows involve chaining services together rather than building complex graphs. The API examples on the previous page use the Builder API which is the most general API, able to build any kind of directed (cyclic) graph. However it is also maximally verbose, which—besides requiring more typing—can cause your workflow implementation to be scattered and difficult to follow.

We provide the Chain API as a streamlined alternative that suits cases where cycles aren’t needed. It allows you to build workflows by chaining methods together, a popular idiom in Rust. This can save typing time and also allows your code to express the structure of the workflow. Here we will recreate the workflows of the previous page, simplifying them using the Chain API.

simple sequence

Sequences of services can be chained together with a simple .then(_):

let workflow = commands.spawn_io_workflow(
    |scope, builder| {
        builder
            .chain(scope.start)
            .then(sum)
            .then(double)
            .connect(scope.terminate);
    }
);

A few notes about this example:

  • Builder::chain(input) allows us to begin creating a chain. In this case we begin the chain from scope.start, but you can begin a chain from any Output.
  • Chain::then supports the same arguments as Builder::create_node, meaning you can pass in Services or Callbacks.
  • Chain::connect takes in an InputSlot and ends the chain by feeding it into that input slot. This is useful for connecting the end of a chain into the terminate operation or looping it back to an earlier operation to create a cycle.

If you are dealing with maps and want to define in them inline when building the workflow, you can use Chain::map_block or Chain::map_async for that:

let workflow = commands.spawn_io_workflow(
    |scope, builder| {
        builder
            .chain(scope.start)
            .map_block(|request: Vec<f32>| {
                request.into_iter().fold(0.0, |a, b| a + b)
            })
            .map_block(|request: f32| {
                2.0 * request
            })
            .connect(scope.terminate);
    }
);

The Chain::map_block and Chain::map_async methods are just syntactic sugar around Chain::then that makes it easier to put maps into the chain. You can interleave calls to .then, .map_block, and .map_async however you would like when creating a sequence of nodes.

recreating fork-result

The Chain::fork_result method allows you to create two diverging branches when an output message has a Result type. It takes two closures as arguments, where each closure builds one of the diverging branches.

let workflow: Service<Json, Result<SchemaV2, Error>> = commands.spawn_io_workflow(
    |scope, builder| {
        builder
            .chain(scope.start)
            .map_block(|message: Json| {
                // Try parsing the JSON with Schema V2
                serde_json::from_value::<SchemaV2>(message.clone())
                    // If an error happened, pass along the original request message
                    // so we can try parsing it with a different schema.
                    .map_err(|_| message)
            })
            .fork_result(
                |ok| {
                    // If ok, wrap the message in Ok and connect it to terminate
                    ok.map_block(|msg| Ok(msg)).connect(scope.terminate);
                },
                |err| {
                    err
                    .map_block(|message: Json| {
                        // Try parsing the JSON with Schema V1 since V2 failed.
                        serde_json::from_value::<SchemaV1>(message)
                            // If the parsing was successful, upgrade the parsed
                            // value to SchemaV2.
                            .map(|value| value.upgrade_to_schema_v2())
                    })
                    // End this branch by feeding it into the terminate operation
                    .connect(scope.terminate);
                }
            );
    }
);

The chain operation has special methods for certain message types that can further simplify how you express the chain. For example the Result type gets access to Chain::branch_for_err that isolates the err branch of a fork-result and allows the rest of the chain to proceed with the ok branch:

let workflow: Service<Json, Result<SchemaV2, Error>> = commands.spawn_io_workflow(
    |scope, builder| {
        builder
            .chain(scope.start)
            .map_block(|request: Json| {
                // Try parsing the JSON with Schema V2
                serde_json::from_value::<SchemaV2>(request.clone())
                    // If an error happened, pass along the original request message
                    // so we can try parsing it with a different schema.
                    .map_err(|_| request)
            })
            // Create a branch that handles an Err value. This creates a
            // fork-result under the hood.
            .branch_for_err(|chain|
                chain
                .map_block(|request: Json| {
                    // Try parsing the JSON with Schema V1 since V2 failed.
                    serde_json::from_value::<SchemaV1>(request)
                        // If the parsing was successful, upgrade the parsed value to
                        // SchemaV2.
                        .map(|value| value.upgrade_to_schema_v2())
                })
                // End this branch by feeding it into the terminate operation
                .connect(scope.terminate)
            )
            // Continue the original chain, but only for Ok values.
            .map_block(|ok| Ok(ok))
            .connect(scope.terminate);
    }
);

This is typically used if the error handling branch is relatively small while the ok branch continues on for a long time.

recreating fork-option

It’s less obvious how to create cycles when using chains, but it is still completely possible! The key is to first create a Node so you can refer to the InputSlot later. Here’s an example of creating a cycle using a chain:

let workflow: Service<(), f32> = commands.spawn_io_workflow(
    |scope, builder| {
        // Make a small chain that returns a Node. We need to create an explicit
        // Node for get_random because we will need to refer to its InputSlot
        // later to create a cycle.
        let get_random: Node<(), f32> = builder
            .chain(scope.start)
            .map_block_node(|request: ()| rand::random::<f32>());

        builder
            .chain(get_random.output)
            .map_block(|value: f32| {
                if value < 0.5 {
                    Some(value)
                } else {
                    None
                }
            })
            // This creates a fork-option and sends None values back to the
            // get_random node. This creates a cycle in the workflow.
            .branch_for_none(|none| none.connect(get_random.input))
            // As we continue the chain, only Some(T) values will reach this
            // point, so the chain simplifies the Option<T> to just a T. We can
            // now connect this directly to the terminate operation.
            .connect(scope.terminate);
    }
);

A few notes about this example:

recreating racing

When you have diverging parallel branches, an easy way to create one of those branches is with a Chain::branch_clone. You just pass in a closure that builds off a new chain that will be fed a clone of the original message. The return value of branch_clone(_) will be a continuation of the original Chain that the branch_clone forked off of.

let workflow = commands.spawn_io_workflow(
    |scope, builder| {
        builder
            .chain(scope.start)
            .branch_clone(|chain|
                // This is a parallel branch fed with a clone of the scope input.
                chain
                .then(emergency_stop)
                .connect(scope.terminate)
            )
            // As we continue to build this chain, we are creating a branch that
            // will run in parallel to the one defined inside of .branch_clone(_).
            // This is where we'll define the happy path sequence of the
            // pick-and-place routine.
            .then(move_to_pick_pregrasp)
            .then(grasp_item)
            .then(move_to_placement)
            .then(release_item)
            .connect(scope.terminate);
    }
);

recreating joining

Chains even support synchronization operations like join. We can structure our fork-clone a bit differently than we did in the racing example to set it up for an easy join:

let workflow = commands.spawn_io_workflow(
    |scope, builder| {
        builder
            .chain(scope.start)
            .fork_clone((
                |chain: Chain<_>| {
                    // This branch moves the robot to the elevator
                    chain
                    .then(move_robot_to_elevator)
                    .output()
                },
                |chain: Chain<_>| {
                    // This branch monitors the robot and then summons the lift
                    chain
                    .then(on_robot_near_elevator)
                    .then(send_elevator_to_location)
                    .output()
                }
            ))
            .join(builder)
            .then(use_elevator)
            .connect(scope.terminate);
    }
);

A few notes about this example:

  • Chain::fork_clone takes in a tuple of any number of closures. Each closure takes a Chain<T> as an input argument where T is the message type of the original chain.
  • You will have to explicitly specify the : Chain<_> type of the closure arguments. The Rust compiler cannot infer this automatically due to limitations in what Traits can express, but you can always use the _ filler for the generic parameter of the chain.
  • The return value of Chain::fork_clone will be a tuple wrapping up all the return values of the closures. We have each closure end with Chain::output so we can gather up the plain outputs of all the branches into one tuple.
  • The join method can be applied to tuples of Outputs, allowing us to apply a join operation at the end of these branches to synchronize them.
  • The join method gives back a chain of the joined value that we can continue to build off of.

There are many ways to use Chain to structure a workflow. Sometimes you will find it more concise and intuitive, but other times you might find it messy and confusing. You can mix and match uses of Chain with uses of Builder however you would like. Ultimately both APIs boil down to InputSlots, Outputs, and Buffers (which will be covered later), making these APIs fully interoperable. Use whichever allows your workflow to be as understandable as possible.

recreating unzipping

Chains can also create forks using the unzip operation and join them together ergonomically:

let workflow = commands.spawn_io_workflow(
    |scope, builder| {
        builder
            .chain(scope.start)
            .map_block(|request: PickupTask| {
                (
                    WorkcellTask {
                        workcell: request.workcell,
                        item: request.item,
                    },
                    MobileRobotTask {
                        vehicle: request.vehicle,
                        location: request.location,
                    }
                )
            })
            .fork_unzip((
                |workcell_branch: Chain<_>| workcell_branch.then(pick_item).output(),
                |amr_branch: Chain<_>| amr_branch.then(move_to_location).output(),
            ))
            .join(builder)
            .then(hand_off_item)
            .connect(scope.terminate);
    }
);