Nodes
You will typically need to supply your registry with node builders in order to execute any useful JSON diagrams. The registry will maintain a dictionary of the node builders that you give to it. When a diagram is built into a workflow, the registered node builders will be used to construct the workflow.
This page will teach you all the details of how to register node builders.
Node Builder Options
Each time you register a node builder you will need to set its NodeBuilderOptions.
The only required field in NodeBuilderOptions is the id:
let options = NodeBuilderOptions::new("multiply_by");
Display Text
This ID must be unique for each node builder added to the registry. Registering a second node builder with the same ID as an earlier one will remove the earlier one from the registry.
When you only provide the ID, graphical editors will typically use that ID as display text for its associated nodes when visualizing a diagram. This isn’t always a good idea since the unique ID could be an unintelligible UUID, or it could be mangled with namespaces or version numbers. Instead you can add a default display text to the node builder options:
let options = NodeBuilderOptions::new("multiply_by")
.with_default_display_text("Multiply By");
The diagram element registry can be serialized into JSON and exported from the executor. The serialized registry data can be provided to a diagram editor or visualization frontend, which can then look up the display text and render it to the user.
Description
When someone is manually editing or viewing a workflow diagram, the purpose of a node might not be obvious from the ID or the display text. Therefore your node builder options can also include a description for the nodes that will be built:
let description = "Multiply the input value by the configured value.";
let options = NodeBuilderOptions::new("multiply_by")
.with_default_display_text("Multiply By")
.with_description(description);
Similar to the display text, the description will be included in the serialized registry, allowing diagram editors or visualizers to render the description for each node.
Closure
Along with the node builder options, you need to provide a closure when registering a node builder.
The closure is what does the heavy lifting of creating the node for the workflow.
It will be provided with two arguments: a &mut Builder and a config.
The Builder API allows your closure to create any kind of node.
You can use it to call create_map_block or create_map_async for simple functions.
For services or callbacks you can call create_node.
If you need to spawn a service within the closure, you can use Builder::commands to get a &mut Commands.
Just keep in mind that services do not get automatically despawned when no longer used, so you should avoid spawning a new service for each node that gets created.
Unless you do something to periodically clean up those services, you could end up with an arbitrarily growing number of services in your executor.
Tip
The closure is expected to be
FnMut, which means you can cache data inside of it that can be reused or updated each time the closure gets run.
The return type of your closure must be Node<Request, Response, Streams>.
That happens to be the return type of create_map_block, create_map_async, and create_node, so your entire closure could be as simple as calling one of those methods on Builder:
registry.register_node_builder(
NodeBuilderOptions::new("add"),
|builder, config: f32| {
builder.create_map_block(move |request: f32| {
request + config
})
}
);
The Request, Response, and Streams of the Node that you return will be automatically detected by the registry.
They will be recorded as the input, output, and stream message types for nodes created by your node builder.
Technically the closure is not limited to only creating a single node.
You could have the closure additionally create buffers or anything else, as long as the final return value is a Node.
But it falls on you to ensure that the connected collection of elements that your closure builds can operate similar to a node, or else it may inflict confusing behavior on users and visualization tools.
Tip
If you need to generate something more complex than a simple
Node, consider registering a section builder instead.
Config
Each node builder can decide on its own config data structure.
This config will be the second argument passed to the closure.
In the earlier example the config data structure is a simple floating point value, f32.
Any type that implements Deserialize and JsonSchema can be used as a config type.
For complex node configurations, you can define your own custom struct and derive the necessary traits:
/// A custom node builder config for a greeting node
#[derive(Deserialize, JsonSchema)]
struct GreetConfig {
/// How should the person be greeted?
greeting: String,
/// Should the greeting be printed out?
print: bool,
}
registry.register_node_builder(
NodeBuilderOptions::new("greet"),
|builder, config: GreetConfig| {
let GreetConfig { greeting, print } = config;
builder.create_map_block(
move |name: String| {
let message = format!("{greeting}{name}");
if print {
println!("{message}");
}
message
}
)
}
);
Deriving JsonSchema for the config type allows us to save a schema for the config in the registry.
This helps graphical editing tools and diagram generation tools to ensure that all the nodes have valid configurations.
You could even auto-generate UIs tailored to the config of each node builder.
Tip
If your node builder doesn’t need any
configinformation then just use the unit-type()as the config type.
Examples
Despite being provided with a config schema, human users may still struggle to figure out how to correctly configure a node to get what they want out of it. To mitigate this problem, you can provide example configurations in your node builder options:
use crossflow::{prelude::*, ConfigExample};
use serde::{Serialize, Deserialize};
/// A custom node builder config for a greeting node
#[derive(Serialize, Deserialize, JsonSchema)]
struct GreetConfig {
/// How should the person be greeted?
greeting: String,
/// Should the greeting be printed out?
print: bool,
}
let examples = vec![
ConfigExample::new(
"Say hello and print the message",
GreetConfig {
greeting: String::from("Hello, "),
print: true,
}
),
ConfigExample::new(
"Say guten tag and do not print the message",
GreetConfig {
greeting: String::from("Guten tag "),
print: false,
}
),
];
registry.register_node_builder(
NodeBuilderOptions::new("greet")
.with_description("Turn a name into a greeting")
.with_config_examples(examples),
|builder, config: GreetConfig| {
let GreetConfig { greeting, print } = config;
builder.create_map_block(
move |name: String| {
let message = format!("{greeting}{name}");
if print {
println!("{message}");
}
message
}
)
}
);
Crossflow graphical editing tools are encouraged to make these examples visible to users, and allow users to copy/paste the examples.
Tip
If you want something more flexible than a static struct for your config, you can always just use
JsonMessageas the config type. Just be sure to include comprehensive examples so human users know what a valid config would be.
Fallible
There might be cases where a node builder cannot successfully build a node.
Maybe there is a semantic error in the config (even though the parsing was successful), or maybe some resource needed by the builder has become unavailable.
The node builder API above assumes that building the node will always be successful. To allow the node building to fail, you can use the fallible API instead:
use anyhow::anyhow;
registry.register_node_builder_fallible(
NodeBuilderOptions::new("divide_by"),
|builder, config: f64| {
if config == 0.0 {
return Err(anyhow!("Cannot divide by zero"));
}
let node = builder.create_map_block(move |request: f64| request / config);
Ok(node)
}
);
You can return any error that can be converted into an anyhow::Error.
The easiest option is to use the anyhow! macro.
Note
If the
"config"in the diagram cannot be successfully deserialized into the data structure of your closure’sconfig, this will be automatically caught by the registry, and aConfigErrorwill be returned instead of aService. Your node builder does not have to handle this error mode.
Message Operation Support
When you use register_node_builder from DiagramElementRegistry, the message types of Request, Response, and Streams will automatically be registered.
By default all registered messages will also register the ability to serialize, deserialize, and clone, which allows those message types to support operations like fork-clone and conversion to/from JsonMessage.
For certain message types there may be additional operations that can be performed on them.
For example if a node returns a Result type then users should be able to apply a fork-result to it.
A tuple message should be able to unzip.
Unfortunately the Rust programming language does not yet support specialization as a stable feature.
To get around this, the register_node_builder returns a NodeRegistrationBuilder that lets you register support for additional operations.
At the end of registering a node, you can chain support for additional operators:
/// A request to get some information from a web page.
///
/// We implement Joined so this struct can be created by the join operation.
///
/// We implement Clone, Serialize, Deserialize, and JsonSchema so this struct
/// can support the default message operations.
#[derive(Joined, Clone, Serialize, Deserialize, JsonSchema)]
struct WebPageQuery {
url: String,
element: String,
}
registry
.register_node_builder(
NodeBuilderOptions::new("get_url_header"),
|builder, config: ()| {
builder.create_map_async(|query: WebPageQuery| {
async move {
let page = fetch_content_from_url(query.url).await?;
page
.get(&query.element)
.cloned()
.ok_or_else(|| FetchError::ElementMissing(query.element))
}
})
}
)
.with_join()
.with_result();
In the above example WebPageQuery implements the Joined trait, which means it can be the output of a join operation.
To register that ability for the message, we chain .with_join() after .register_node_builder().
At the same time the return type of node is a Result.
We can also chain .with_result() to register fork-result support for the node’s output message type.
When adding support for an operation, the message we are adding support for must be compatible with the operation.
This is ensured by the compiler.
If the node in the example above were not returning a Result type then the Rust compiler would emit a compilation error when we try to register .with_result() for it.
Each operation support that we add using NodeRegistrationBuilder will only apply to either the input message type or the output message type depending on whether the operation would produce the message or consume the message, respectively.
For finer grain control over exactly what operations are registered for each message type, continue to the next section.
Special-case Message Registration
In most cases your messages types and their supported operations can be registered as described above—by registering a node and then chaining on any additional operations needed by the messages. However there are some special cases that don’t quite fit that pattern.
opt-out
Crossflow diagrams can support message types that don’t implement clone or support serialization/deserialization. By default the node registration API will assume that your node’s messages do support all of those traits, because those are very typical traits for “plain old data” types. We think if those operations weren’t registered by default, there is a high likelihood that users would forget to register the operations, so by default we try to register them.
If your node produces or consumes messages that are not plain old data, you will need to explicitly opt out of some traits in order to register the node builder. If you forget to opt-out of the default operations for message types that don’t support them, you’ll see a compilation error.
Here’s an example of a node with an input value that supports none of the default operations:
use tokio::sync::mpsc::UnboundedReceiver;
registry
.opt_out()
.no_cloning()
.no_serializing()
.no_deserializing()
.register_node_builder(
NodeBuilderOptions::new("stream_out"),
|builder, config: ()| {
builder.create_map(|input: AsyncMap<UnboundedReceiver<f32>, StreamOf<f32>>| {
async move {
let mut receiver = input.request;
let stream = input.streams;
while let Some(msg) = receiver.recv().await {
stream.send(msg);
}
}
})
}
)
.with_common_response();
The UnboundedReceiver does not support serialization, deserialization, or even cloning.
To register this node builder, we first need to use .opt_out() and then specify
With those opted out, we can use the UnboundedReceiver as a message in the node.
To opt back into the default operations for the response, we apply .with_common_response().
A similar method exists if we were opting back in for the request type instead.
Streams
Registering additional operations for request and response message types is fairly straightforward, but there isn’t a clean way to do this for the message types that are present in output streams. If you have output streams that produce custom data structures, they will be registered with the default message operations (unless you opted out), but to register any additional operations for those types, you will need to do it directly.
Instead of using the NodeRegistrationBuilder API, you will need to use register_message:
#[derive(Clone, Serialize, Deserialize, JsonSchema)]
struct State {
position: [f32; 2],
battery_level: f32,
}
#[derive(Clone, Serialize, Deserialize, JsonSchema)]
enum Error {
LostConnection
}
registry
.register_node_builder(
NodeBuilderOptions::new("navigate_to"),
|builder, config: ()| {
builder.create_map(
|input: AsyncMap<[f32; 2], StreamOf<Result<State, Error>>>| {
async move {
let destination = input.request;
let stream = input.streams;
let mut update_receiver = navigate_to(destination);
while let Some(update) = update_receiver.recv().await {
stream.send(update);
}
}
}
)
}
);
// Explicitly register the message of the stream so we can add .with_result to it.
registry
.register_message::<Result<State, Error>>()
.with_result();