Connecting Nodes
The examples on the previous page are completely trivial workflows. In each one we’re just starting the workflow, running one service (or provider), and then terminating the workflow with the output of the service. The real value of a workflow is being able to assemble multiple services together into something more complex.
Here is an example of creating two nodes and chaining them:
let workflow = commands.spawn_io_workflow(
|scope, builder| {
let sum_node = builder.create_map_block(|request: Vec<f32>| {
request.into_iter().fold(0.0, |a, b| a + b)
});
let double_node = builder.create_map_block(|request: f32| {
2.0 * request
});
builder.connect(scope.start, sum_node.input);
builder.connect(sum_node.output, double_node.input);
builder.connect(double_node.output, scope.terminate);
}
);
A common pattern when building workflows is to declare the nodes or operations at the top and then connect them together below.
The above workflow still doesn’t accomplish anything that we couldn’t get from running a Series instead. It’s just a sequence of actions that feed into each other. What makes workflows interesting is their ability to branch, cycle, and flow freely through a directed graph structure.
Conditional Branching
One useful kind of control flow is conditional branching. Conditional branching is when the activity in a workflow reaches a “fork in the road” where a message must go down one branch or another but not both.
fork-result
Commonly used for error handling in workflows, the fork-result operation will take in Result messages and activate one of two branches—
the ok branch or the err branch—depending on whether the input message had an Ok or Err value.
let workflow: Service<Json, Result<SchemaV2, Error>> = commands.spawn_io_workflow(
|scope, builder| {
let parse_schema_v2 = builder.create_map_block(|request: Json| {
// Try parsing the JSON with Schema V2
serde_json::from_value::<SchemaV2>(request.clone())
// If an error happened, pass along the original request message
// so we can try parsing it with a different schema.
.map_err(|_| request)
});
let parse_schema_v1 = builder.create_map_block(|request: Json| {
// Try parsing the JSON with Schema V1 since V2 failed.
serde_json::from_value::<SchemaV1>(request)
// If the parsing was successful, upgrade the parsed value to
// SchemaV2.
.map(|value| value.upgrade_to_schema_v2())
});
let to_ok = builder.create_map_block(|request: SchemaV2| {
Ok(request)
});
// Create a fork-result operation. We get back a tuple whose first element
// is an InputSlot that lets us feed messages into the fork-result, and
// whose second element is a struct containing two fields: ok and err,
// each representing a different Output and therefore diverging branches
// in the workflow.
let (fork_result_input, fork_result) = builder.create_fork_result();
builder.connect(scope.start, parse_schema_v2.input);
builder.connect(parse_schema_v2.output, fork_result_input);
// If parsing SchemaV2 was successful, wrap it back in Ok and terminate
builder.connect(fork_result.ok, to_ok.input);
builder.connect(to_ok.output, scope.terminate);
// If we failed to parse the Json as SchemaV2 then try using SchemaV1 instead
builder.connect(fork_result.err, parse_schema_v1.input);
// If parsing SchemaV1 also fails then we have no more fallback, so just
// pass back the result, whether it was successful or failed.
builder.connect(parse_schema_v1.output, scope.terminate);
}
);
Note how in this example both branches converge back to the same terminate operation.
fork-option
Another common branching operation is fork-option.
Similar to fork-result, this creates two branches.
One sends the value contained inside Some inputs while the other will produce a trigger () when the input value was None.
let workflow: Service<(), f32> = commands.spawn_io_workflow(
|scope, builder| {
let get_random = builder.create_map_block(|request: ()| {
// Generate some random number between 0.0 and 1.0
rand::random::<f32>()
});
let less_than_half = builder.create_map_block(|value: f32| {
if value < 0.5 {
Some(value)
} else {
None
}
});
// Create a fork-option operation. We get back a tuple whose first element
// is an InputSlot that lets us feed messages into the fork-option, and
// whose second element is a struct containing two fields: some and none,
// each representing a different Output and therefore diverging branches
// in the workflow.
let (fork_option_input, fork_option) = builder.create_fork_option();
// Chain the three operations together.
builder.connect(scope.start, get_random.input);
builder.connect(get_random.output, less_than_half.input);
builder.connect(less_than_half.output, fork_option_input);
// Trigger the randomizer again if the value was not less than one-half.
// This creates a cycle in the workflow.
builder.connect(fork_option.none, get_random.input);
// Terminate the workflow if it was less than one-half.
// The value produced by the randomizer will be the workflow's output.
builder.connect(fork_option.some, scope.terminate);
}
);
Parallel Branches
Some kinds of forks result in parallel activity instead of conditionally activating branches. This is one of the most powerful features of workflows: being able to easily juggle large amounts of parallel activity.
racing
The fork-clone operation allows a message to be cloned and simultaneously activate multiple branches. Once activated, each branch will run independently and concurrently. This is useful if you have multiple separate concerns that need to be handled simultaneously.
Here is an example of a pick-and-place workflow where we have a parallel node that monitors the safety of the operation.
While the pick-and-place sequence is being executed, the emergency_stop service will watch the state of the workcell and issue an output if anything threatens the safety of the operation.
The pick-and-place operation and the emergency stop both connect to the terminate operation.
Whichever yields an output first will end the workflow. This is known as a race.
let workflow = commands.spawn_io_workflow(
|scope, builder| {
// Create nodes for performing a pick and place
let move_to_pick_pregrasp = builder.create_node(move_to_pick_pregrasp);
let grasp_item = builder.create_node(grasp_item);
let move_to_placement = builder.create_node(move_to_placement);
let release_item = builder.create_node(release_item);
// Also create a node that monitors whether an emergency stop is needed
let emergency_stop = builder.create_node(emergency_stop);
// Create a fork-clone operation. We get back a tuple whose first element
// is an InputSlot that lets us feed messages into the fork-clone, and
// whose second element is a struct that allows us to spawn outputs for
// the fork.
let (fork_clone_input, fork_clone) = builder.create_fork_clone();
// Send the scope input message to be cloned
builder.connect(scope.start, fork_clone_input);
// When the scope starts, begin moving the robot to the pregrasp pose
let cloned = fork_clone.clone_output(builder);
builder.connect(cloned, move_to_pick_pregrasp.input);
// When the scope starts, also start monitoring whether an emergency
// stop is needed. If this gets triggered it will terminate the workflow
// immediately.
let cloned = fork_clone.clone_output(builder);
builder.connect(cloned, emergency_stop.input);
// Connect the happy path together
builder.connect(move_to_pick_pregrasp.output, grasp_item.input);
builder.connect(grasp_item.output, move_to_placement.input);
builder.connect(move_to_placement.output, release_item.input);
builder.connect(release_item.output, scope.terminate);
// Connect the emergency stop to terminate
builder.connect(emergency_stop.output, scope.terminate);
}
);
joining
Another way to use fork-clone is to activate parallel branches that each control a different agent. This is often done when a process needs two agents to work independently until they both reach a synchronization point is reached. In that case instead of racing the two branches we will join them.
Here is an example of a robot that needs to use an elevator. The robot will start moving to the elevator lobby, and at the same time we will run a branch that watches the robot’s progress. When the robot is close enough to the elevator lobby, we will summon the elevator to come pick up the robot. When the robot and elevator both arrive in the elevator lobby, we will have the robot use the elevator.
let workflow = commands.spawn_io_workflow(
|scope, builder| {
// Create nodes for moving the robot and summoning the elevator.
let move_robot_to_elevator = builder.create_node(move_robot_to_elevator);
let on_robot_near_elevator = builder.create_node(on_robot_near_elevator);
let send_elevator_to_level = builder.create_node(send_elevator_to_location);
let use_elevator = builder.create_node(use_elevator);
// Create a fork-clone operation. We get back a tuple whose first element
// is an InputSlot that lets us feed messages into the fork-clone, and
// whose second element is a struct that allows us to spawn outputs for
// the fork.
let (fork_clone_input, fork_clone) = builder.create_fork_clone();
// Send the scope input message to be cloned
builder.connect(scope.start, fork_clone_input);
// When the scope starts, begin sending the robot to the elevator
let cloned = fork_clone.clone_output(builder);
builder.connect(cloned, move_robot_to_elevator.input);
// When the scope starts, also start detecting whether the robot is
// near the elevator so we know when to summon the elevator
let cloned = fork_clone.clone_output(builder);
builder.connect(cloned, on_robot_near_elevator.input);
// When the robot has made it close enough to the elevator, begin
// summoning the elevator
builder.connect(on_robot_near_elevator.output, send_elevator_to_level.input);
// Create a join operation that will activate when the robot has reached
// the elevator lobby and the elevator has arrived on the correct floor.
let both_arrived = builder.join((
move_robot_to_elevator.output,
send_elevator_to_level.output,
))
.output();
// When the robot has reached the elevator lobby and the elevator has
// arived on the correct floor, have the robot use the elevator.
builder.connect(both_arrived, use_elevator.input);
// When the robot is done using the elevator, the workflow is finished.
builder.connect(use_elevator.output, scope.terminate);
}
);
unzipping
It’s not often that a message can be trivially fed directly from the output of one service to the input of another service.
Most of the time the services that you want to connect to each other will have slight differences between the Response type that the first produces and the Request type that you’re trying to pass it along to.
If you have two or more parallel branches that expect different inputs from each other, fork-clone might not seem like a good fit because every branch will receive the same message.
We’ve seen how blocking maps can be used to perform quick data transforms. If we combine a blocking map with the unzip operation, we can perform parallel branching where a specific message is sent down each branch:
let workflow = commands.spawn_io_workflow(
|scope, builder| {
// Create ndoes for picking an item and moving to a pickup location
let pick_item = builder.create_node(pick_item);
let move_to_location = builder.create_node(move_to_location);
let hand_off_item = builder.create_node(hand_off_item);
// Create a blocking map to transform the workflow input data into two
// separate messages to send to two different branches.
//
// This returns a tuple with two elements. We will send each element to
// a different branch at the same time.
let transform_inputs = builder.create_map_block(|request: PickupTask| {
(
WorkcellTask {
workcell: request.workcell,
item: request.item,
},
MobileRobotTask {
vehicle: request.vehicle,
location: request.location,
}
)
});
// Create the unzip forking
let (unzip_input, unzip) = builder.create_unzip();
// Destructure the unzipped outputs
let (workcell_task, mobile_robot_task) = unzip;
// Synchronize when the workcell and mobile robot are both ready for the
// item to be handed off
let both_ready = builder.join((
pick_item.output,
move_to_location.output,
))
.output();
// Connect all the nodes
builder.connect(scope.start, transform_inputs.input);
builder.connect(transform_inputs.output, unzip_input);
builder.connect(workcell_task, pick_item.input);
builder.connect(mobile_robot_task, move_to_location.input);
builder.connect(both_ready, hand_off_item.input);
builder.connect(hand_off_item.output, scope.terminate);
}
);