Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Section Builders

Sometimes you’ll want to provide users with a workflow element that does more than a node. Maybe you want to encapsulate a complex arrangement of operations as a single unit that users can drop into their workflows without worrying about the details of how it’s implemented. This is what we call a Section.

Section builders are able to generate a web of operations connected however necessary to fulfill the purpose of the section. You can register section builders in much the same way you register node builders. Once your section builder is registered, any diagram passed to your executor can include the section in its workflow.

Caution

A section is not related to scopes even though they superficially appear similar, as they both contain an arrangement of connected operations.

When a section is put into a workflow all operations in that section will exist in the original scope that the section has been placed in. This has important implications for session and buffer behavior. Each message that enters a scope will begin a new session, whereas no new session is created when a message enters a section.

Section Builder Options

Section builder options are essentially the same as node builder options. Refer to the node builder options guide to understand the fields in SectionBuilderOptions.

Closure

Just like the closure for node builders, section builders are implemented through a closure that takes in a &mut Builder and a config. The builder is used to create and connect whatever elements your section needs. The config—just like for node builders—is any deserializable data structure that provides the information needed to configure a section.

The key difference for a section is that it does not output a Node. Instead it outputs any struct that implements the Section trait:

use crossflow::{prelude::*, SectionBuilderOptions};

/// The kind of section produced by the "use_elevator" section builder.
#[derive(Section)]
struct UseElevatorSection {
    /// Begin using the elevator by having the robot enter it.
    begin: InputSlot<()>,
    /// Signal that the robot failed to enter the elevator.
    enter_elevator_failure: Output<MoveRobotError>,
    /// Signal that the elevator failed to reach its destination.
    move_elevator_error: Output<MoveElevatorError>,
    /// Retry moving the elevator. Trigger this when a move_elevator_error is
    /// resolved.
    retry_elevator_move: InputSlot<()>,
    /// Signal that localization failed.
    localization_error: Output<LocalizationError>,
    /// Retry localizing the robot at the new floor. Trigger this when a
    /// localization_error is resolved.
    retry_localization: InputSlot<()>,
    /// Signal that the robot failed to exit the elevator.
    exit_elevator_failure: Output<MoveRobotError>,
    /// Retry exiting the elevator. Trigger this when an exit_elevator_failure
    /// is resolved.
    retry_elevator_exit: InputSlot<()>,
    /// The robot has successfully exited the elevator at the destination floor.
    success: Output<()>,
}

/// The config data structure for the "use_elevator" section builder.
#[derive(Clone, Serialize, Deserialize, JsonSchema)]
struct UseElevatorConfig {
    elevator_id: String,
    robot_id: String,
    to_floor: String,
}

registry.register_section_builder(
    SectionBuilderOptions::new("use_elevator")
        .with_default_display_text("Use Elevator")
        .with_description("Have a robot use an elevator"),
    move |builder: &mut Builder, config: UseElevatorConfig| {
        let UseElevatorConfig { elevator_id, robot_id, to_floor } = config;

        // Create a node for entering the elevator
        let enter_elevator = builder.create_node(enter_elevator(&robot_id, &elevator_id));

        // Create a fork-result that splits based on whether the robot
        // successfully entered the elevator
        let (enter_elevator_result, enter_elevator_fork) = builder.create_fork_result();
        builder.connect(enter_elevator.output, enter_elevator_result);

        // Create a node to move the elevator if the robot successfully entered
        let move_elevator = builder.create_node(move_elevator(&elevator_id, &to_floor));
        builder.connect(enter_elevator_fork.ok, move_elevator.input);

        // Create a fork-result that splits based on whether the elevator
        // successfully arrived at its destination
        let (move_elevator_result, move_elevator_fork) = builder.create_fork_result();
        builder.connect(move_elevator.output, move_elevator_result);

        // Create a node to localize the robot once the elevator arrives at the
        // correct floor
        let localize_robot = builder.create_node(localize_robot(&robot_id, &to_floor));
        builder.connect(move_elevator_fork.ok, localize_robot.input);

        // Create a fork-result that splits based on whether the robot
        // successfully localized
        let (localize_result, localize_fork) = builder.create_fork_result();
        builder.connect(localize_robot.output, localize_result);

        // Create a node to exit the elevator after the robot has localized
        let exit_elevator = builder.create_node(exit_elevator(&robot_id, &elevator_id));
        builder.connect(localize_fork.ok, exit_elevator.input);

        // Create a fork-result that splits based on whether the robot
        // successfully exited the lift
        let (exit_elevator_result, exit_elevator_fork) = builder.create_fork_result();
        builder.connect(exit_elevator.output, exit_elevator_result);

        UseElevatorSection {
            begin: enter_elevator.input,
            enter_elevator_failure: enter_elevator_fork.err,
            move_elevator_error: move_elevator_fork.err,
            retry_elevator_move: move_elevator.input,
            localization_error: localize_fork.err,
            retry_localization: localize_robot.input,
            exit_elevator_failure: exit_elevator_fork.err,
            retry_elevator_exit: exit_elevator.input,
            success: exit_elevator_fork.ok,
        }
    }
);

In the above example we create a custom struct named UseElevatorSection to define what the inputs and outputs of our section are. The begin input begins the overall process of having a robot use an elevator. Each stage of using the elevator provides a signal to indicate if a problem has happened. Diagrams that use this section have the opportunity to handle each error however they would like, and then signal a retry_... input slot to resume the process.

Message Operation Support

When registering node builders, the Request, Response, and Streams message types also get registered. You can add support for more operations by chaining them onto the NodeRegistrationBuilder.

Sections are somewhat similar: The message type of each field in the section will be automatically registered. However it wouldn’t make sense to use chain methods to register additional operations for those message types, because there are arbitrary number of messages within the section, and we can’t assume that all the message types will support all the operations we want to add.

Instead we use Rust’s procedural macro system:

#[derive(Section)]
struct UseDoorSection {
    /// Robot and Door cannot be cloned, serialized, or deserialized, so we
    /// disable those operations. But the overall message can still be unzipped,
    /// so we enable the minimal version of unzip, which will register both Robot
    /// and Door, but without the common operations (clone, serialize, and deserialize).
    #[message(no_clone, no_serialize, no_deserialize, unzip_minimal)]
    begin: InputSlot<(Robot, Door)>,

    /// UseDoorError is cloneable, serializable, and deserializable, so we don't
    /// need to disable anything here. The overall message type is a result, so
    /// we can mark this as a result to register the fork-result operation.
    #[message(result)]
    outcome: Output<Result<(), UseDoorError>>,

    /// We can also expose buffers inside the section.
    door_state: Buffer<DoorState>,
}

Connecting to sections

The syntax for connecting to section inputs looks similar to builtin targets, except you replace "builtin" with the ops key of the section. For example:

{
  "version": "0.1.0",
  "start": { "use_elevator_West3": "begin" },
  "ops": {
    "use_elevator_West3": {
        "type": "section",
        "builder": "use_elevator",
        "config": {
            "elevator_id": "West3",
            "robot_id": "cleaning_bot_2",
            "to_floor": "L5"
        },
        "connect": {
            "exit_elevator_failure": "handle_exit_failure",
            "success": { "builtin": "terminate" }
        }
    },
    "handle_exit_failure": {
        "type": "node",
        "builder": "center_in_elevator",
        "config": {
            "elevator_id": "West3",
            "robot_id": "cleaning_bot_2"
        },
        "next": { "use_elevator_West3": "retry_elevator_exit" }
    }
  }
}

This refers to the UseElevatorSection example above. We can create a workflow with this section and then use its outputs and inputs to create a loop that lets us customize how elevator exit failures are handled.

To pass a message to one of the section inputs we use the syntax { "use_elevator_West3": _ } where _ is one of the field names of UseElevatorSection. While the example above doesn’t include any buffers, the syntax for referencing section buffers is the same—the workflow builder will know whether you’re referring to an input slot or a buffer based on where you are making the reference.

To connect the outputs of the section to other operations in the same scope as the section, use the "connect": field of the section operation. Note that all operations inside the section are in the same scope as operations outside of the section. In general, connections can only be made between operations that are in the same scope.