Connecting Output Streams
Up until now all the examples of spawning a workflow have used spawn_io_workflow which spawns a service that takes in a single request and yields a single response with no output streams.
Just as services support output streams, so do workflows.
To stream messages out of your workflow while it is still running, you can use the stream out operation.
For starters, to stream out of a workflow you need to use the more general spawn_workflow method.
Unlike the Request and Response generic parameters, it is never possible for the Rust compiler to infer what type you want for the Streams generic parameter.
This is why the spawn_io_workflow exists: In cases where you don’t need to stream out of your workflow, all the generic parameters can usually be inferred.
The easiest way to specify the streams is to do it in the Scope.
The Request and Response parameters can still be inferred by putting a placeholder _ in for them:
let workflow = commands.spawn_workflow(
|scope: Scope<_, _, StreamOf<T>>, builder| {
/* ... */
}
);
Single Stream
If you workflow has a single output stream, you can use the StreamOf<T> struct to have a stream that produces messages of type T.
Here is how the stream out conceptual example of slicing apples would be written with the native Rust API:
let workflow = commands.spawn_workflow(
|scope: Scope<_, _, StreamOf<AppleSlice>>, builder| {
// Create the service nodes that will be involved in chopping apples
let deposit_apples = builder.create_node(deposit_apples);
let try_take_apple = builder.create_node(try_take_apple);
let (have_apple_input, have_apple) = builder.create_fork_option();
let chop_apple = builder.create_node(chop_apple);
// Create a buffer to hold apples that are waiting to be chopped, and
// an operation to access that buffer.
let apple_buffer = builder.create_buffer(BufferSettings::keep_all());
let access_apple_buffer = builder.create_buffer_access(apple_buffer);
// Connect the scope input message to the deposit_apples service
builder.connect(scope.start, deposit_apples.input);
// Connect the stream of incoming apples to the apple buffer
builder.connect(deposit_apples.streams, apple_buffer.input_slot());
// When done depositing apples, start trying to take them by accessing
// the buffer
builder.connect(deposit_apples.output, access_apple_buffer.input);
builder.connect(access_apple_buffer.output, try_take_apple.input);
// Try to take an apple and check if we ran out
builder.connect(try_take_apple.output, have_apple_input);
// If there's another apple, send it to be chopped
builder.connect(have_apple.some, chop_apple.input);
// If there are no more apples, terminate the workflow
builder.connect(have_apple.none, scope.terminate);
// Stream the apple slices out of the scope, and then cycle back to
// taking another apple when the current one is finished.
builder.connect(chop_apple.streams, scope.streams);
builder.connect(chop_apple.output, access_apple_buffer.input);
}
);
The workflow has a single stream output that produces AppleSlice objects.
You can also see how streams produced by the deposit_apples and chop_apple services are connected to other operations.
The Node struct has a streams field which, for these services, is a single Output, because the services are using StreamOf.
Stream Pack
If your workflow needs multiple stream outputs, you can use a custom StreamPack.
Here’s an example of a stream pack that might be provided by a navigation workflow:
#[derive(StreamPack)]
struct NavigationStreams {
log: String,
location: Vec2,
error: NavigationError,
}
With that defined, we can emit multiple output streams from our workflow. Here is an example of a robot navigating through a doorway and streaming out information while it goes along:
// This service will have a mobile robot approach a door.
let approach_door = commands.spawn_service(
|In(input): BlockingServiceInput<(), NavigationStreams>| {
input.streams.log.send(String::from("approaching door"));
/* ... approach the door ... */
}
);
// open_door is not a navigation service so it will only have one
// output stream: log messages.
let open_door = commands.spawn_service(
|In(input): BlockingServiceInput<(), StreamOf<String>>| {
input.streams.send(String::from("opening door"));
/* ... open the door ... */
}
);
// This service will have a mobile robot move through a door.
let move_through_door = commands.spawn_service(
|In(input): BlockingServiceInput<(), NavigationStreams>| {
input.streams.log.send(String::from("moving through door"));
/* ... move through the door ... */
}
);
// This workflow will handle the whole process of moving a robot through a door,
// while streaming navigation and log information from all the services it runs.
let workflow = commands.spawn_workflow(
|scope: Scope<_, _, NavigationStreams>, builder| {
let approach_door = builder.create_node(approach_door);
let open_door = builder.create_node(open_door);
let move_through_door = builder.create_node(move_through_door);
// Connect nodes together
builder.connect(scope.start, approach_door.input);
builder.connect(approach_door.output, open_door.input);
builder.connect(open_door.output, move_through_door.input);
builder.connect(move_through_door.output, scope.terminate);
// Connect node streams to scope streams
builder.connect(approach_door.streams.log, scope.streams.log);
builder.connect(approach_door.streams.location, scope.streams.location);
builder.connect(approach_door.streams.error, scope.streams.error);
builder.connect(open_door.streams, scope.streams.log);
builder.connect(move_through_door.streams.log, scope.streams.log);
builder.connect(move_through_door.streams.location, scope.streams.location);
builder.connect(move_through_door.streams.error, scope.streams.error);
}
);
Tip
In the above example we are connecting node stream outputs to the scope stream input slots, but this is an illustration, not a limitation. The fields in
Node::streamsare all regular Outputs that can be connected into anyInputSlot, and the fields inScope::streamsare regular InputSlots that can receive messages from anyOutput.