Creating a Node
Most of the useful work that happens inside a workflow is done by nodes. A node can be implemented by any provider. Providers will often be services—including blocking, async, and continuous services—but could also be callbacks or maps.
From a Service
Suppose we have a sum function defined to be a blocking service:
fn sum(In(input): BlockingServiceInput<Vec<f32>>) -> f32 {
let mut sum = 0.0;
for value in input.request {
sum += value;
}
sum
}
We can spawn this as a service and then use it to create a node inside a workflow:
// Spawn a service that we can use inside a workflow
let service = commands.spawn_service(sum);
// Spawn a workflow and use the service inside it
let workflow = commands.spawn_io_workflow(
move |scope, builder| {
let node = builder.create_node(service);
builder.connect(scope.start, node.input);
builder.connect(node.output, scope.terminate);
}
);
A few things to note:
- We spawn the service outside of the workflow and then use the
movekeyword to move it into the closure that builds the workflow. - We use Builder::create_node to create a workflow node that will run the
sumservice. - After the node is created, we can access its input through
node.inputand its output throughnode.output. builder.connect(ouput, input)will connect anOutputto anInputSlotscope.startis anOutputinside the scope that will fire off exactly one message per workflow session. That one message is the input message that was sent into the workflow. It gets fired off as soon as the workflow session begins.- By connecting
scope.starttonode.input, we are passing the input message of the workflow directly into the node. - To pass back the sum as the output of the workflow, we connect
node.outputtoscope.terminate. - As soon as any message is passed into
scope.terminate, all activity in the workflow will terminate, and the first message that was passed intoscope.terminatefor this session will be sent out of the workflow as the workflow’s output message.
- We don’t need to explicitly specify the
RequestandResponsetypes of the workflow because the compiler can infer those from the twobuilder.connect(_, _)calls.
Spawning a service inside workflow building closure
Recommended practice is to spawn services outside of the workflow building closure and then move them into the closure. This allows services to have greater reusability, as they could be copied into multiple different workflows. However if you do want to spawn the service from inside the workflow building closure, that option is available:
let workflow = commands.spawn_io_workflow(
move |scope, builder| {
// Spawn a service using the builder's commands
let service = builder.commands().spawn_service(sum);
// Create the node using the newly spawned service
let node = builder.create_node(service);
builder.connect(scope.start, node.input);
builder.connect(node.output, scope.terminate);
}
);
From a Callback
Using a callback instead of a service looks much the same. The only difference is that callbacks don’t need to be spawned:
// Define a closure to perform a sum
let f = |request: Vec<f32>| -> f32 {
request.into_iter().fold(0.0, |a, b| a + b)
};
// Convert the closure into a Callback
let callback = f.into_blocking_callback();
// Spawn a workflow and use the callback inside it
let workflow = commands.spawn_io_workflow(
move |scope, builder| {
let node = builder.create_node(callback);
builder.connect(scope.start, node.input);
builder.connect(node.output, scope.terminate);
}
);
The fact that it was implemented as a callback instead of as a service makes no difference to the workflow builder.
It still gets created as a Node with InputSlot and Output types that match the Request and Response types, respectively, of the callback.
From a Map
Maps are an extremely common element in workflows. Their ability to perform quick data transformations makes them invaluable for bridging the gap between different services. They are common enough that crossflow provides two special APIs to make them easier to write:
create_map_block
A blocking map is a short-lived closure that performs a quick calculation or data transformation. These are very useful for adapting message types in-between services that are chained together.
To create a blocking map you can simply pass a closure into the Builder::create_map_block method:
let workflow = commands.spawn_io_workflow(
|scope, builder| {
let node = builder.create_map_block(|request: Vec<f32>| {
request.into_iter().fold(0.0, |a, b| a + b)
});
builder.connect(scope.start, node.input);
builder.connect(node.output, scope.terminate);
}
);
You might notice that maps are often defined within the workflow building closure itself even though Services and Callbacks are usually created outside. This isn’t a requirement, but it is usually the most ergonomic way to add a map. Reusability is less of a concern with maps than it is for Services or Callbacks.
create_map_async
An async map is simply used to run a basic async function. Suppose you have an async function like:
async fn get_page_title(url: String) -> Option<String> {
let http_response = trpl::get(&url).await;
let response_text = http_response.text().await;
trpl::Html::parse(&response_text)
.select_first("title")
.map(|title| title.inner_html())
}
You can use this async function as a map by simply passing its name into Builder::create_map_async:
let workflow = commands.spawn_io_workflow(
|scope, builder| {
let node = builder.create_map_async(get_page_title);
builder.connect(scope.start, node.input);
builder.connect(node.output, scope.terminate);
}
);
Alternatively you can define the async map inline as a closure.
Just take note that Rust does not currently support async closures, so you will need your closure to return an async move { ... } block, like this:
let workflow = commands.spawn_io_workflow(
|scope, builder| {
let node = builder.create_map_async(|url: String| {
async move {
let http_response = trpl::get(&url).await;
let response_text = http_response.text().await;
trpl::Html::parse(&response_text)
.select_first("title")
.map(|title| title.inner_html())
}
});
builder.connect(scope.start, node.input);
builder.connect(node.output, scope.terminate);
}
);
Chain
Creating a map through the Builder API is necessary if you need to connect multiple outputs into the same map node, but in most cases you’ll want to create a map that just transforms data as it passes from one operation to another.
A more convenient way to do that is with a Chain, discussed later.