Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

What is Crossflow?

Crossflow is a general-purpose Rust library for reactive and async programming. It simplifies the challenges of implementing highly async software systems that may have parallel activities with interdependencies, conditional branching, and/or cycles. Its specialty is creating event-driven multi-agent state machines.

Implemented in Rust on the Bevy ECS, crossflow has high performance and guaranteed-safe parallelism, making it suitable for highly responsive low-level state machines, just as well as high-level visually programmed device orchestrators.

Services

The basic unit of work in crossflow is encapsulated by a service. Services take an input message and eventually yield an output message—perhaps immediately or perhaps after some long-running routine has finished.

apple-pie-service

In crossflow services are defined by Bevy Systems that take an input and produce an output. As Bevy Systems, the crossflow services can integrate into an application by interacting with the Bevy World through entities, components, and resources of Bevy’s ECS. This allows services to have enormous versatility in how they are implemented, while still being highly parallelizable, memory-safe, and efficient.

A service can be executed by sending it a request at any time using this line of code:

#![allow(unused)]
fn main() {
let outcome = commands.request(input, service).outcome();
}

This line is non-blocking, meaning the service will be executed concurrently with the rest of the application’s activity. The outcome is a receiver that can be polled—or better yet awaited in an async context—until the service has sent its response or has been cancelled.

Workflows

Best practice when creating complex systems is to encapsulate services into the simplest possible building blocks and assemble those blocks into increasingly sophisticated structures. This is where workflows come in.

sense-think-act

Workflows allow you to assemble services into a directed graph—cycles are allowed—that form a more complex behavior, feeding the output of each service as input to another service. Workflows are excellent for defining state machines that have async state transitions or that have lots of parallel activity that needs to be managed and synchronized.

When you create a workflow you will ultimately be creating yet another service that can be treated exactly the same as a service created using a Bevy System. This workflow-based service can even be used as a node inside of another workflow. In other words, you can build hierarchical workflows.

Execution

You can run as many service requests as you want at the same time for as many services or workflows as you want, including making multiple simultaneous requests for the same service or workflow. Each time request is called, a new “session” will be spawned that executes the workflow or service independently from any other that is running. However, if any of the services or workflows interact with “the world” (either the Bevy World, the actual physical world, or some external resource) then those services or workflows may indirectly interact with each other.

Tip

Check out the live web demo to get a sense for what a workflow might look like.

Try passing in [20, 30] as the request message and run the workflow to see the message get split and calculated.

Getting Started

To get you started with crossflow, the next chapter will teach you how to spawn a basic service. After that you will see how to run a service, then how to assemble services into a workflow (which is itself a service) and execute it.

To learn the fundamental concepts around what a “workflow” is in the crossflow library, see the Introduction to Workflows chapter.

How to Spawn a Service

For crossflow to be useful, you first need to have a Service to run. You may be able to find thirdparty libraries that provide some services for you to use, but this chapter will teach you how to spawn them yourself just in case you need to start from scratch.

What is a service?

In its most distilled essence, a service is something that can take an input message (request) and produce an output message (response):

apple-pie-service

Each service expresses its request and response types as generic parameters in the Service struct. These Request and Response parameters can be any data structures that can be passed between threads.

Tip

We mean “data structures” in the broadest possible sense, not only “plain data”. For example, you can pass around utilities like channels and publishers as messages, or use them as fields inside of messages. Any valid Rust struct that can be safely moved between threads can be used as a message.

The Service data structure itself is much like a function pointer. It contains nothing besides an Entity (a lightweight identifier that points to the service’s location in the World) and the type information that ensures you send it the correct Request type and that you know what Response type to expect from it. When you copy or clone a Service, you are really just making a copy of this identifier. The underlying service implementation that will be called for the copy is the exact same as the original.

Spawn a Blocking Service

The simplest type of service to spawn is called a “blocking” service. These services are much like ordinary functions: They receive an input, run once for that input, and immediately return an output value. Much as the name implies, a blocking service will block all other activity in the schedule until it is done running. Therefore blocking services must be short-lived.

To define a blocking service, create a function whose input argument is a BlockingServiceInput:

fn sum(In(input): BlockingServiceInput<Vec<f32>>) -> f32 {
    let mut sum = 0.0;
    for value in input.request {
        sum += value;
    }
    sum
}

This function will define the behavior of our service: The request (input) message is passed in through the BlockingServiceInput argument. The request type is a Vec<f32>, and the purpose of the function is to sum up the elements in that vector. The output of the service is a simple f32.

Before we can run this function as a service, we need to spawn an instance of it. We can use the AddServicesExt trait for this:

let sum_service: Service<Vec<f32>, f32> = app.spawn_service(sum);

We can spawn the service while building our Bevy App to make sure that it’s available whenever we need it. A common practice is to save your services into Components or Resources so they can be accessed at runtime when needed.

Now that you’ve seen how to spawn a system, you could move on to How to Run a Service. Or you can continue on this page to learn about the more sophisticated abilities of services.

Services as Bevy Systems

A crucial concept in Bevy is systems. Bevy uses an Entity-Component-System (ECS) architecture for structuring applications. Systems are how an ECS architecture queries and modifies the data in an application. In crossflow, services are Bevy systems, except instead of being scheduled like most systems, a service only gets run when needed, taking in an input argument (request) and returning an output value (response). Since they are Bevy systems, services can query and modify the entities, components, and resources in the World.

In the example from the previous section, you may have noticed In(input): .... That In is a special type of SystemParam for a value that is being directly passed into the system rather than being accessed from the world. For blocking services we pass in a BlockingService as the input, which contains the request data, output streams, and some other fields that represent metadata about the service.

Just like any other Bevy system, you can add as many system params to your service as you would like. Here is an example of a blocking service that includes a Query:

#[derive(Component, Deref)]
struct Offset(Vec2);

fn apply_offset(
    In(input): BlockingServiceInput<Vec2>,
    offsets: Query<&Offset>,
) -> Vec2 {
    let offset = offsets
        .get(input.provider)
        .map(|offset| **offset)
        .unwrap_or(Vec2::ZERO);

    input.request + offset
}

First we define a component struct named Offset which simply stores a Vec2. The job of our apply_offset service is to query for Offset stored for this service and apply it to the incoming Vec2 of the request.

To query the Offset we add offsets: Query<&Offset> as an argument (a.k.a. system param) for our service function (a.k.a. system). One of the fields in BlockingService is provider, which is the Entity that provides this service. You can use this entity to store data that allows the behavior of the service to be configured externally. In this case we will store an Offset component in the provider to externally configure what kind of Offset is being applied.

When spawning the service, you can use .with to initialize the provider entity:

let apply_offset_service: Service<Vec2, Vec2> = app.spawn_service(
    apply_offset
    .with(|mut srv: EntityWorldMut| {
        srv.insert(Offset(Vec2::new(-2.0, 3.0)));
    })
);

In general you can use Service::provider to access the provider entity at any time, and use all the normal Bevy mechanisms for managing the components of an entity.

Full Example

use crossflow::bevy_app::App;
use crossflow::prelude::*;

use bevy_derive::*;
use bevy_ecs::prelude::*;
use glam::Vec2;

fn main() {
    let mut app = App::new();
    app.add_plugins(CrossflowExecutorApp::default());

    let offset = Vec2::new(-2.0, 3.0);

    let service = app.spawn_service(apply_offset.with(|mut srv: EntityWorldMut| {
        srv.insert(Offset(offset));
    }));

    let mut outcome = app
        .world_mut()
        .command(|commands| commands.request(Vec2::ZERO, service).outcome());

    for _ in 0..5 {
        if let Some(response) = outcome.try_recv() {
            let response = response.unwrap();
            assert_eq!(response, offset);
            println!("Successfully applied offset: {response:?}");
            return;
        }

        app.update();
    }

    panic!("Service failed to run after multiple updates");
}

#[derive(Component, Deref)]
struct Offset(Vec2);

fn apply_offset(In(input): BlockingServiceInput<Vec2>, offsets: Query<&Offset>) -> Vec2 {
    let offset = offsets
        .get(input.provider)
        .map(|offset| **offset)
        .unwrap_or(Vec2::ZERO);

    input.request + offset
}

More kinds of services

If you are interested in non-blocking service types, continue on to Async Services and Continuous Services.

If you need your service to be a portable object that isn’t associated with an entity, take a look at Callbacks. If you don’t care about your service being a Bevy system at all (i.e. it should just be a plain function with a single input argument) then take a look at Maps.

If blocking services are enough to get you started, then you can skip ahead to How to Run a Service.

Async Services

We’ve seen how to spawn a blocking service, but blocking services have an important drawback: While a blocking service is running, no other systems or services in the system schedule can run. This allows blocking services to have unfettered instant access to all resources in the bevy World, but long-running blocking services would disrupt the app schedule. This does not mean that blocking services should be avoided—they are the fastest and most CPU efficient choice for any short-lived service, especially when accessing Bevy Components or Resources. Each kind of service fits a different shape of usage, so go ahead and use blocking services when they fit.

When it comes to long-running services, it’s likely that an async service is what you want. In crossflow, async services allow you to take full advantage of the async/await language feature of Rust when implementing a service. For a detailed explanation of the language feature you might want to look at the official Rust handbook. Using async services does not require a deep understanding of async in Rust, but some peculiarities might make more sense if you have a better grasp of it.

What is an async service?

A normal Bevy app has a system schedule which can be thought of as the main event loop of the application. The system schedule is able to run many systems at once as long as those systems do not have any read/write conflicts in which world resources they need to access. Bevy will automatically identify which systems can be run in parallel and run them together unless you specify otherwise.

Some systems demand exclusive world access, meaning no other systems can run alongside it. While this reduces opportunities for parallel processing, it empowers the exclusive systems to themselves dynamically run systems whose data access requirements are not known when the schedule is first being built.

The flush_execution system of crossflow drives all the services that need to be executed. Since we never know ahead of time which services might need to be run or what they will need to acess from the world, flush_execution is an exclusive system.

Inside of flush_execution we will execute any services that are ready to be executed—one at a time since we can’t be sure which might have read/write conflicts with each other. When we execute a blocking service, we pass the request into it and get back the response immediately. We can then pass that response message along to its target service, and then immediately execute that target service. An arbitrarily long chain of blocking services can all be executed within a single run of flush_execution, unless flush_loop_limit is set.

async-task-pool

If the blocking service runs for a very long time, the entire system schedule would be held up, which could be detrimental to how the application behaves. In a GUI application, users would see the window freeze up. In a workflow execution application, clients would think that all execution has frozen. This means blocking services are not suitable for any service that represents a physical process or involves i/o with external resources.

Fortunately Bevy provides an async task pool that runs in parallel to the system schedule and supports async functions. An async service task advantage of this by producing a Future instead of immediately returning a response. Crossflow sends that Future into the async task pool to be processed efficiently alongside other async tasks. Once the Future has yielded its final response, flush_execution will receive the response message and pass it along to the next service in the chain.

We get two advantages with async services:

  • They are processed in the async task pool, allowing them to run in parallel to the system schedule, no matter which systems are running at any given time.
  • They can use await in their their Future, which is a powerful language feature of Rust that allows efficient and ergonomic use of i/o and multi-threading.

However there are some disadvantages to be mindful of:

  • Sending the Future to the async task pool and receiving the response has some overhead (albeit relative small in most cases).
  • The response of an async service will generally not arrive within the same schedule update that the service was activated. This means a chain of N async services will typically take at least N schedule updates before finishing.
  • Each time the Future of an async service needs to access the world (using its async channel) it takes up to one whole schedule update to get that access.

Spawn an Async Service

Spawning an async service is similar to spawning a blocking service, except that your function should take in an AsyncServiceInput<Request> and be async:

async fn trivial_async_service(In(srv): AsyncServiceInput<String>) -> String {
    return srv.request;
}

If your function matches these requirements, then you can spawn it with the exact same API as the blocking service:

let async_service: Service<String, String> = app.spawn_service(trivial_async_service);

Notice that even though this is an async service and has some different behavior than blocking services under the hood, it is still captured as a Service. Once spawned as a service, blocking and async services will appear exactly the same to users.

await

Since async services are defined via async functions, they are able to use Rust’s await feature, which is often used for network i/o and communicating between parallel threads. Here’s an example of a service that gets the title of a webpage based on a URL passed in as the request message:

async fn page_title_service(In(srv): AsyncServiceInput<String>) -> Option<String> {
    let response = trpl::get(&srv.request).await;
    let response_text = response.text().await;
    trpl::Html::parse(&response_text)
        .select_first("title")
        .map(|title| title.inner_html())
}

This example is adapted from the Rust handbook. Note that this service returns an Option<String> since we can’t guarantee that the input URL points to an actual website or that we will successfully retrieve its title even if it does.

Use the Async Channel

One drawback of using an async service is that it doesn’t have free access to the World (particularly Components and Resources) the way a blocking service does. It would be impossible to provide that kind of access for something running in the async task pool because any time it tries to access data that’s stored in the world, it could encounter a conflict with a scheduled system that’s accessing the same data.

Nevertheless, async services may need to push or pull data into/from the World while they make progress through their async task. To accommodate this, we provide async services with a Channel that supports querying or sending commands to the World:

/// A component that stores a web page title inside an Entity
#[derive(Component)]
struct PageTitle(String);

/// A request that specifies a URL whose page title should be stored inside an Entity
struct PageTitleRequest {
    url: String,
    insert_into: Entity,
}

/// A service that fetches the page title of a URL and stores that into an Entity.
async fn insert_page_title(
    In(srv): AsyncServiceInput<PageTitleRequest>
) -> Result<(), ()> {
    let response = trpl::get(&srv.request.url).await;
    let response_text = response.text().await;
    let title = trpl::Html::parse(&response_text)
        .select_first("title")
        .map(|title| title.inner_html())
        .ok_or(())?;

    let insert_into = srv.request.insert_into;
    // Use the async channel to insert a PageTitle component into the entity
    // specified by the request, then await confirmation that the command is finished.
    srv.channel.commands(
        move |commands| {
            commands.entity(insert_into).insert(PageTitle(title));
        }
    )
        .await;

    Ok(())
}

In the above example, the insert_page_title service ends by inserting the result of its http request into the component of an entity. This is done by invoking srv.channel.command(_) and passing in a closure that says what should be done with the Bevy commands.

Note that this command will itself be run asynchronously. It will be executed the next time that flush_execution is run by the system schedule. If we want to make sure our service does not return its response until the command is carried out, we should .await the output of srv.channel.command(_). The command will eventually be carried out even if we return without awaiting it, but if the next service in the chain assumes the command has already finished then you could experience race conditions.

Warning

Each time you await srv.channel it may take up to an entire update cycle of the schedule before your response arrives. It’s a good idea to batch as many queries or commands as you can before awaiting to avoid wasting update cycles.

Async Services as Bevy Systems

It’s been mentioned that the task (a.k.a. Future) of an async service can only access the Bevy World through the async channel that it’s provided with. However, prior to its task being spawned, an async service does have a way to immediately access the entire World. This is useful when there is some data you know your task will need. You can query it from the World at no cost as soon as your async service is activated.

However there is a catch: Bevy system parameters (such as Query and Res) are not compatible with the async fn syntax that is commonly used to define async functions. The incompatibility is related to a logical conflict between the way Bevy manages borrows of World data and the way async fn creates a portable state machine that can be executed by an async task pool. The details of this conflict aren’t important for using crossflow, but suffice it to say you can’t use most Bevy system params and async fn in the same function.

But there is a workaround! You can create an async function without using the async fn syntax. You can use the regular fn syntax and use a return type of impl Future<Output = Response>:

#[derive(Clone, Component, Deref)]
struct Url(String);

use std::future::Future;
fn fetch_page_title(
    In(srv): AsyncServiceInput<Entity>,
    url: Query<&Url>,
) -> impl Future<Output = Result<String, ()>> + use<> {
    // Use a query to get the Url component of this entity
    let url = url.get(srv.request).cloned();

    async move {
        // Make sure the query for the Url component was successful
        let url = url.map_err(|_| ())?.0;

        // Fetch the page title of the website stored in the Url component of
        // the requested entity.
        let response = trpl::get(&url).await;
        let response_text = response.text().await;
        trpl::Html::parse(&response_text)
            .select_first("title")
            .map(|title| title.inner_html())
            .ok_or(())
    }
}

At the start of your function it will behave just like a normal Bevy system. Your arguments can be any Bevy system parameters that you’d like, and you can use them freely in the initial lines of your function. But to satisfy the signature of the function, you eventually have to return something that implements the Future<Output = Response> trait.

The easiest way to create a Future is with an async block. That block will produce an anonymous data structure that implements Future<Output=Response> where Response will be whatever value the block yields. By ending our function with an async move { ... }, it will behave like a regular blocking function in the start and end as an async function. From the outside, it is indistinguishable from an async function.

Tip

If you need your async service to start by querying the World, it’s a good idea to always follow this template exactly:

fn my_async_service(
    In(srv): AsyncServiceInput<Request>,
    /* ... Bevy System Params Go Here ... */
) -> impl Future<Output = Response> + use<> {
    /* ... Use Bevy System params, cloning data as needed ... */
    async move {
        /* ... Perform async operations ... */
    }
}

You can deviate from this template if you know what you are doing, but first make sure you have a sound understanding of how async works in Rust or else you might get unexpected behavior.

Full Example

Here is an example of an async service that uses everything discussed in this chapter.

Tip

This example shows how a tokio runtime can be used with an async service. Crossflow does not use tokio itself, since Bevy’s async execution is based on smol instead.

Many async functions in the Rust ecosystem depend on a tokio runtime, so this example may help you find ways to incorporate tokio into your async services.

use crossflow::bevy_app::App;
use crossflow::prelude::*;

use bevy_derive::*;
use bevy_ecs::prelude::*;
use clap::Parser;
use std::{future::Future, sync::Arc};
use tokio::runtime::Runtime;

/// Program that demonstrates an async service in crossflow
#[derive(Parser, Debug)]
#[command(version, about)]
struct Args {
    /// Url that we should fetch a page title from
    url: String,
}

fn main() {
    let args = Args::parse();
    let mut app = App::new();
    app.add_plugins(CrossflowExecutorApp::default());

    let service = app.spawn_service(update_page_title);

    let entity = app.world_mut().spawn(Url(args.url)).id();

    let mut outcome = app
        .world_mut()
        .command(|commands| commands.request(entity, service).outcome());

    // Create a tokio runtime and drive it on another thread
    let (finish, finished) = tokio::sync::oneshot::channel();
    let rt = Arc::new(tokio::runtime::Runtime::new().unwrap());
    app.world_mut()
        .insert_resource(TokioRuntime(Arc::clone(&rt)));
    let tokio_thread = std::thread::spawn(move || {
        let _ = rt.block_on(finished);
    });

    let start = std::time::Instant::now();
    let time_limit = std::time::Duration::from_secs(5);
    while std::time::Instant::now() - start < time_limit {
        if let Some(response) = outcome.try_recv() {
            match response {
                Ok(_) => {
                    let title = app.world().get::<PageTitle>(entity).unwrap();
                    println!("Fetched title: {}", **title);
                }
                Err(err) => {
                    println!("Error encountered while trying to update title: {err}");
                }
            }

            let _ = finish.send(());
            let _ = tokio_thread.join();
            return;
        }

        app.update();
    }

    panic!("Service failed to run within time limit of {time_limit:?}");
}

/// A component that stores a web page title inside an Entity.
#[derive(Component, Deref)]
struct PageTitle(String);

/// A component that stores what url is assigned to an Entity.
#[derive(Clone, Component, Deref)]
struct Url(String);

/// A resource that provides access to a tokio runtime
#[derive(Resource, Deref)]
struct TokioRuntime(Arc<Runtime>);

/// A service that checks the Url assigned to an entity and then updates the page
/// title set for that entity.
fn update_page_title(
    In(srv): AsyncServiceInput<Entity>,
    url: Query<&Url>,
    runtime: Res<TokioRuntime>,
) -> impl Future<Output = Result<(), ()>> + use<> {
    // Use a query to get the Url component of this entity
    let url = url.get(srv.request).cloned();
    let rt = Arc::clone(&**runtime);

    async move {
        // Make sure the query for the Url component was successful
        let url = url.map_err(|_| ())?.0;

        // Fetch the page title of the website stored in the Url component of
        // the requested entity. This is run inside a tokio runtime because the
        // trpl library uses reqwest, which is a tokio-based http implementation.
        let title = rt
            .spawn(async move {
                let response = trpl::get(&url).await;
                let response_text = response.text().await;
                trpl::Html::parse(&response_text)
                    .select_first("title")
                    .map(|title| title.inner_html())
                    .ok_or(())
            })
            .await
            .map_err(|_| ())??;

        let entity = srv.request;
        srv.channel
            .commands(move |commands| {
                commands.entity(entity).insert(PageTitle(title));
            })
            .await;

        Ok(())
    }
}

Continuous Services

Most of the time a service can be written as a blocking service or an async service depending on whether the service is known to be short-lived or long-lived and whether it needs to use the async feature of Rust. But there’s one more shape a service can take on: continuous.

One thing that blocking and async services share in common is that they are each defined by a function that takes in a request as an argument and eventually yields a response. On the other hand a continuous service is defined by a Bevy system that runs continuously in the system schedule. Each time the continuous service is woken up in the schedule, it can make some incremental progress on the set of active requests assigned to it—which we call the order queue.

What is a Continuous Service?

Each service—whether blocking, async, or continuous—is associated with a unique Entity that we refer to as the service provider. For blocking and async services the main purpose of the service provider is to

  1. store the system that implements the service, and
  2. allow users to configure the service by inserting components on the service provider entity.

For continuous services the service provider has one additional purpose: to store the order queue component. This component is used to keep track of its active requests—i.e. the requests aimed at the continuous service which have not received a response yet. When a request for the continuous service shows up, flush_execution will send that request into the order queue component of the service that it’s meant for.

continuous-service-schedule

Each continuous service will be run in the regular Bevy system schedule just like every regular Bevy system in the application. This means continuous services can run in parallel to each other, as well as run in parallel to all other regular Bevy systems, as long as there are no read/write conflicts in what the systems need to access.

Each time a continuous service is woken up by the schedule, it should check its order queue component and look through any active requests it has. It should try to make progress on those tasks according to whatever its intended behavior is—perhaps try to complete one at a time or try to complete all of them at once. There is no requirement to complete any number of the orders on any given wakeup. There is no time limit or iteration limit for completing any orders. Any orders that do not receive a response will simply carry over to the next schedule update.

Once an order is complete, the continuous service can issue a response to it. After issuing a response, the request of the order will immediately become inaccessible, even within the current update cycle. Responding to an order will fully drop the order from the perspective of the continuous service. This prevents confusion that might otherwise lead to accidentally sending two responses for the same order. Nevertheless the continuous service can respond to any number of unique orders within a single update cycle.

Spawn a Continuous Service

Spawning a continuous service has some crucial differences from spawning a blocking or an async service. These differences come from two facts:

  1. The request information (order queue) persists between service updates as a component
  2. The service needs to be added to the app’s system schedule

This first difference means that continuous services must query for their order queue. In the hello_continuous_service example below you can see the query argument takes in a ContinuousQuery whose generic parameters perfectly match the generic parameters of ContinuousServiceInput above it. This pattern is mandatory for continuous services so they can access their order queue.

fn hello_continuous_service(
    In(srv): ContinuousServiceInput<String, String>,
    mut query: ContinuousQuery<String, String>,
) {
    let Some(mut orders) = query.get_mut(&srv.key) else {
        // The service provider has despawned, so we can no longer do anything
        return;
    };

    orders.for_each(|order| {
        let name = order.request();
        let response = format!("Hello, {name}!");
        order.respond(response);
    });
}

Unlike other service types, the srv input arguments contains nothing but a key that must be used by query to access the orders belonging to this service. This access is fallible because it’s possible for a user or a system to despawn the provider of this service while the service is still inside the system schedule. In most cases when the order queue can no longer be accessed, your continuous service can just return immediately. Any requests sent to it will be automatically cancelled.

When you do get access to your orders—in this case through the orders variable—you can iterate through each order, using .request() to borrow the request value and then .respond(_) to send your response for the order.

Inside the System Schedule

The other important difference with continuous services is that they need to be added to the app’s system schedule. While blocking and async services can be spawned anywhere that you have access to a Commands, continuous services can only be spawned when you have access to an App:

let continuous_service: Service<String, String> = app.spawn_continuous_service(
    Update,
    hello_continuous_service
);

Instead of spawn_service which gets used by blocking and async services, continuous services require you to call spawn_continuous_service. You’ll also notice that an additional argument is needed here: the label for which schedule the continuous service should run in.

Note

Currently Bevy does not support dynamically adding systems to the schedule, so continuous services will generally need to be added during App startup.

Despite these differences in how continuous services get spawned, you will still ultimately receive a Service<Request, Response>, exactly the same as blocking and async services. From the outside, continuous services appear no different from other kinds of service; the differences in how it behaves are just implementation details.

Serial Orders

In the hello_continuous_service example above we respond to each request on the same update frame that the request arrives. Very often continuous services are managing a process that needs to be spread out across multiple frames. The example below shows an example of responding to a sequence of orders, one at a time, where each order might need multiple frames before it’s complete.

#[derive(Resource, Deref, DerefMut)]
struct Position(Vec2);

#[derive(Component, Deref)]
struct Speed(f32);

fn move_base_vehicle_to_target(
    In(srv): ContinuousServiceInput<Vec2, Result<(), ()>>,
    mut query: ContinuousQuery<Vec2, Result<(), ()>>,
    speeds: Query<&Speed>,
    mut base_position: ResMut<Position>,
    time: Res<Time>,
) {
    let Some(mut orders) = query.get_mut(&srv.key) else {
        // The service provider has been despawned, so this continuous service
        // can no longer function.
        return;
    };

    // Get the oldest active request for this service. Orders will be indexed
    // from 0 to N-1 from oldest to newest. When an order is completed, all
    // orders after it will shift down by an index on the next update cycle.
    let Some(order) = orders.get_mut(0) else {
        // There are no active requests, so no need to do anything
        return;
    };

    let dt = time.delta_secs_f64() as f32;
    let Ok(speed) = speeds.get(srv.key.provider()) else {
        // The velocity setting has been taken from the service, so it can no
        // longer complete requests.
        order.respond(Err(()));
        return;
    };

    let target = *order.request();
    match move_to(**base_position, target, **speed, dt) {
        Ok(_) => {
            // The vehicle arrived
            **base_position = target;
            println!("Base vehicle arrived at {target}");
            order.respond(Ok(()));
        }
        Err(new_position) => {
            // The vehicle made progress but did not arrive
            **base_position = new_position;
        }
    }
}

On each update of the continuous service we only look at the oldest request in the order queue, ignoring all others. This effectively makes the service “serial” meaning no matter how many requests are coming in to it, they will only be handled one at a time.

Parallel Orders

One of the advantages that continuous services have over blocking and async is the ability to handle many requests at once, potentially with interactions happening between them. The example below shows how orders.for_each(_) can be used to iterate over all active requests in one update frame, making incremental progress on all of them at once.

#[derive(Clone, Copy)]
struct DroneRequest {
    target: Vec2,
    speed: f32,
}

fn send_drone_to_target(
    In(srv): ContinuousServiceInput<DroneRequest, ()>,
    mut query: ContinuousQuery<DroneRequest, ()>,
    mut drone_positions: Local<HashMap<Entity, Vec2>>,
    base_position: Res<Position>,
    time: Res<Time>,
) {
    let Some(mut orders) = query.get_mut(&srv.key) else {
        return;
    };

    orders.for_each(|order| {
        let DroneRequest { target, speed } = *order.request();
        let position = drone_positions.entry(order.id()).or_insert_with(|| {
            println!(
                "Drone {} taking off from {}, heading to {target}",
                order.id().index(),
                **base_position,
            );
            **base_position
        });
        let dt = time.delta_secs_f64() as f32;
        match move_to(*position, target, speed, dt) {
            Ok(_) => {
                println!("Drone {} arrived at {target}", order.id().index());
                order.respond(());
            }
            Err(new_position) => {
                *position = new_position;
            }
        }
    });

    // Remove any old task IDs that are no longer in use
    drone_positions.retain(|id, _| orders.iter().any(|order| order.id() == *id));
}

Full Example

Here is an example that incorporates everything above and also demonstrates how continuous systems can be configured in the schedule just like regular Bevy systems. Notice the use of .configure(_) when spawning move_base and send_drone. This allows us to make sure the base vehicle is up to date when the drone service is updated.

use crossflow::bevy_app::{App, Update};
use crossflow::prelude::*;

use bevy_derive::*;
use bevy_ecs::prelude::*;
use bevy_time::{Time, TimePlugin};
use glam::Vec2;

use std::collections::HashMap;

#[derive(Debug, Hash, PartialEq, Eq, Clone, SystemSet)]
struct MoveBaseVehicle;

fn main() {
    let mut app = App::new();
    app.add_plugins((CrossflowExecutorApp::default(), TimePlugin::default()))
        .insert_resource(Position(Vec2::ZERO));

    let move_base = app.spawn_continuous_service(
        Update,
        move_base_vehicle_to_target
            .with(|mut srv: EntityWorldMut| {
                // Set the speed component for this service provider
                srv.insert(Speed(1.0));
            })
            .configure(|config| {
                // Put this service into a system set so that we can order other
                // services before or after it.
                config.in_set(MoveBaseVehicle)
            }),
    );
    let send_drone = app.spawn_continuous_service(
        Update,
        send_drone_to_target.configure(|config| {
            // This service depends on side-effects from move_base, so we should
            // always schedule it afterwards.
            config.after(MoveBaseVehicle)
        }),
    );

    let move_vehicle_to_random_position = move |app: &mut App| {
        app.world_mut()
            .command(|commands| commands.request(random_vec2(20.0), move_base).outcome())
    };

    let launch_drone_to_random_position = move |app: &mut App| {
        app.world_mut().command(|commands| {
            let request = DroneRequest {
                target: random_vec2(20.0),
                speed: 1.0,
            };
            commands.request(request, send_drone).detach();
        });
    };

    let mut base_moving = move_vehicle_to_random_position(&mut app);
    let mut last_launch = std::time::Instant::now();
    loop {
        app.update();

        if base_moving.is_available() {
            // Send the base to a new location
            base_moving = move_vehicle_to_random_position(&mut app);
        }

        if last_launch.elapsed() > std::time::Duration::from_secs(1) {
            launch_drone_to_random_position(&mut app);
            last_launch = std::time::Instant::now();
        }
    }
}

fn random_vec2(width: f32) -> Vec2 {
    width * Vec2::new(rand::random::<f32>(), rand::random::<f32>())
}

#[derive(Resource, Deref, DerefMut)]
struct Position(Vec2);

#[derive(Component, Deref)]
struct Speed(f32);

fn move_base_vehicle_to_target(
    In(srv): ContinuousServiceInput<Vec2, Result<(), ()>>,
    mut query: ContinuousQuery<Vec2, Result<(), ()>>,
    speeds: Query<&Speed>,
    mut base_position: ResMut<Position>,
    time: Res<Time>,
) {
    let Some(mut orders) = query.get_mut(&srv.key) else {
        // The service provider has been despawned, so this continuous service
        // can no longer function.
        return;
    };

    // Get the oldest active request for this service. Orders will be indexed
    // from 0 to N-1 from oldest to newest. When an order is completed, all
    // orders after it will shift down by an index on the next update cycle.
    let Some(order) = orders.get_mut(0) else {
        // There are no active requests, so no need to do anything
        return;
    };

    let dt = time.delta_secs_f64() as f32;
    let Ok(speed) = speeds.get(srv.key.provider()) else {
        // The velocity setting has been taken from the service, so it can no
        // longer complete requests.
        order.respond(Err(()));
        return;
    };

    let target = *order.request();
    match move_to(**base_position, target, **speed, dt) {
        Ok(_) => {
            // The vehicle arrived
            **base_position = target;
            println!("Base vehicle arrived at {target}");
            order.respond(Ok(()));
        }
        Err(new_position) => {
            // The vehicle made progress but did not arrive
            **base_position = new_position;
        }
    }
}

#[derive(Clone, Copy)]
struct DroneRequest {
    target: Vec2,
    speed: f32,
}

fn send_drone_to_target(
    In(srv): ContinuousServiceInput<DroneRequest, ()>,
    mut query: ContinuousQuery<DroneRequest, ()>,
    mut drone_positions: Local<HashMap<Entity, Vec2>>,
    base_position: Res<Position>,
    time: Res<Time>,
) {
    let Some(mut orders) = query.get_mut(&srv.key) else {
        return;
    };

    orders.for_each(|order| {
        let DroneRequest { target, speed } = *order.request();
        let position = drone_positions.entry(order.id()).or_insert_with(|| {
            println!(
                "Drone {} taking off from {}, heading to {target}",
                order.id().index(),
                **base_position,
            );
            **base_position
        });
        let dt = time.delta_secs_f64() as f32;
        match move_to(*position, target, speed, dt) {
            Ok(_) => {
                println!("Drone {} arrived at {target}", order.id().index());
                order.respond(());
            }
            Err(new_position) => {
                *position = new_position;
            }
        }
    });

    // Remove any old task IDs that are no longer in use
    drone_positions.retain(|id, _| orders.iter().any(|order| order.id() == *id));
}

fn move_to(current: Vec2, target: Vec2, speed: f32, dt: f32) -> Result<(), Vec2> {
    let dx = f32::max(0.0, speed * dt);
    let dp = target - current;
    let distance = dp.length();
    if distance <= dx {
        return Ok(());
    }

    let Some(u) = dp.try_normalize() else {
        // If dp can't be normalized then it's close to zero, so there's
        // nowhere to go.
        return Ok(());
    };

    return Err(current + u * dx);
}

Output Streams

So far we’ve seen services that return a single output as a final response to each request it receives. This is enough for chaining together sequential atomic services, but some actions are carried out over a period of time and involve stages or incremental progress that you may want to track.

Crossflow services have the option of using output streams to transmit messages while processing a request. Before the final response is sent out, a service can stream out any number of messages over any number of output streams. Each stream can have its own message type.

Streams can be anonymous—having no name, identified simply by what its output type is—but best practice is to use named streams. Giving names to streams adds clarity to the purpose or meaning of each stream.

Tip

If you want to know how to receive data from output streams, see Receiving from Output Streams.

To see how to connect output streams inside a workflow, see Connecting Output Streams.

Anonymous Streams

While it’s not the recommended approach, we’ll start by explaining how anonymous streams work. If you are confident that your service will only ever produce one stream or that the type information is enough to make the purpose of the stream clear, then anonymous streams may be acceptable for your use case.

In this example we implement a Fibonacci sequence service that streams out the values of the sequence and then ultimately just returns a trigger () to indicate that the service is finished running:

fn fibonacci_example(
    In(input): BlockingServiceInput<u32, StreamOf<u32>>
) {
    let order = input.request;
    let stream = input.streams;

    let mut current = 0;
    let mut next = 1;
    for _ in 0..order {
        stream.send(current);

        let sum = current + next;
        current = next;
        next = sum;
    }
}

We use StreamOf<u32> to define an anonymous (unnamed) stream of u32. This stream allows us to emit a stream of outputs where each message is a new item in the Fiboanacci sequence, as opposed to the conventional “final response” that would require us to return a single message of Vec<u32>. While a Fibonacci sequence is a trivial use case, this same approach can be used to transform any single input into a stream of any number of outputs.

Multiple Anonymous Streams

In the previous example you might have noticed

let stream = input.streams;

The plural input.streams was renamed to a singular stream variable. This is because in general services expect that multiple streams need to be supported. In the case of StreamOf<T> the potentially multiple streams get reduced to one stream.

To get multiple anonymous streams, you can use a tuple. Here’s a slightly tweaked version of the previous example that additionally streams out a stringified version of the sequence:

fn fibonacci_string_example(
    In(input): BlockingServiceInput<u32, (StreamOf<u32>, StreamOf<String>)>,
) {
    let order = input.request;
    let u32_stream = input.streams.0;
    let string_stream = input.streams.1;

    let mut current = 0;
    let mut next = 1;
    for _ in 0..order {
        u32_stream.send(current);
        string_stream.send(format!("{current}"));

        let sum = current + next;
        current = next;
        next = sum;
    }
}

Now input.streams contains two streams, packed together as a tuple. Since they are packed into a tuple, they continue to be anonymous (unnamed), but they can be accessed separately and send different outputs.

Warning

When you use multiple anonymous output streams, you must not include the same message type multiple times, e.g. (StreamOf<u32>, StreamOf<u32>). This cannot be caught at compile time and will lead to confusing behavior, because when this service is spawned it will appear to have multiple output streams, but only one of the Outputs will receive all of the messages, even if the service sends messages to both.

Stream Pack

While anonymous streams are an option, it’s best to always use named streams, especially if you are creating services for a diagram.

To create named streams, you should define a struct where each field represents a different output stream. The data type of each field will be the message type of its associated stream. Simply apply #[derive(StreamPack)] to that struct, and now you can use those streams in your system:

#[derive(StreamPack)]
struct FibonacciStreams {
    integers: u32,
    strings: String,
}

fn fibonacci_stream_pack_example(
    In(input): BlockingServiceInput<u32, FibonacciStreams>,
) {
    let order = input.request;
    let streams = input.streams;

    let mut current = 0;
    let mut next = 1;
    for _ in 0..order {
        streams.integers.send(current);
        streams.strings.send(format!("{current}"));

        let sum = current + next;
        current = next;
        next = sum;
    }
}

Now input.streams contains a separate named field for each of the streams in the stream pack. Each of those fields lets you send output messages for each stream. The compiler will ensure that the messages you send are compatible with the stream’s type.

Tip

Unlike with anonymous streams, you can have multiple named streams with the same message type.

Async Streams

The previous examples show how to use streams in a blocking service to generate a stream of data from a single input. While that is a valid use of streams, there is another conceptually important use: streaming updates or products out of an ongoing live service. The streams that come out of a blocking service will be delivered at the same time at the final response due to the nature of blocking services, so live updates are only relevant for async and continuous services.

Below is a stub of a navigation service. It takes in a NavigationRequest that includes a destination and a BufferKey for the robot’s position. While the robot makes progress towards its destination, the service will emit NavigationStreams including location data, log data, and any errors that come up along the way.

#[derive(StreamPack)]
struct NavigationStreams {
    log: String,
    location: Vec2,
    error: NavigationError,
}

#[derive(Clone)]
struct NavigationRequest {
    destination: Vec2,
    robot_position_key: BufferKey<Vec2>,
}

fn navigate(
    In(input): AsyncServiceInput<NavigationRequest, NavigationStreams>,
    nav_graph: Res<NavigationGraph>,
) -> impl Future<Output = Result<(), NavigationError>> + use<> {
    // Clone the navigation graph resource so we can move the clone into the
    // async block.
    let nav_graph = (*nav_graph).clone();

    // Create a callback for fetching the latest position
    let get_position = |
        In(key): In<BufferKey<Vec2>>,
        access: BufferAccess<Vec2>,
    | {
        access.get_newest(&key).cloned()
    };
    let get_position = get_position.into_blocking_callback();

    // Unpack the input into simpler variables
    let NavigationRequest { destination, robot_position_key } = input.request;
    let location_stream = input.streams.location;

    // Begin an async block that will run in the AsyncComputeTaskPool
    async move {
        loop {
            // Fetch the latest position using the async channel
            let position = input.channel.request_outcome(
                robot_position_key.clone(),
                get_position.clone()
            )
                .await
                .ok()
                .flatten();

            let Some(position) = position else {
                // Position has not been reported yet, so just try again later.
                continue;
            };

            // Send the current position out over an async stream
            location_stream.send(position);

            // TODO: Command the robot to proceed towards its destination
            // TODO: Break the loop when the robot arrives at its destination
        }

        Ok(())
    }
}

A few important details of the above example:

  • We use the fn _(AsyncServiceInput) -> impl Future<Output = _> syntax so we can have an async service that accesses the NavigationGraph resource at startup.
  • We use the async move { ... } syntax to create the long-running Future that will run in the AsyncComputeTaskPool.
  • All relevant input data is unpacked and then moved into the async block.
  • We create a callback to periodically fetch data from the position buffer while our async block is running using the async channel. This is the most effective way for async tasks to access Bevy ECS data.
  • The streams provided to an async service can be moved into an async block, allowing messages to be streamed out while the async task is still being processed.

Continuous Streams

The output streams of blocking and async services both have a similar ergonomics, even if their behavior has some notable differences. For continuous services, output streams allow you to stream out messages while a request is still in flight, similar to async services.

However the API has an important difference: a continuous service can see all its active requests (orders) at once, and its streams are isolated per order:

fn continuous_navigate(
    In(srv): ContinuousServiceInput<NavigationRequest, Result<(), NavigationError>, NavigationStreams>,
    mut continuous: ContinuousQuery<NavigationRequest, Result<(), NavigationError>, NavigationStreams>,
    position_access: BufferAccess<Vec2>,
    nav_graph: Res<NavigationGraph>,
) {
    let Some(mut orders) = continuous.get_mut(&srv.key) else {
        // The service provider has despawned, so we can no longer do anything
        return;
    };

    orders.for_each(|order| {
        let NavigationRequest { destination, robot_position_key } = order.request().clone();
        let Some(position) = position_access.get_newest(&robot_position_key) else {
            // Position is not available yet
            return;
        };

        order.streams().location.send(*position);

        // TODO: Command the robot to proceed towards its destination
        // TODO: Use order.respond(Ok(())) when the robot arrives at its destination
    });
}

In the above example you can see that the output streams are accessed through order.streams() instead of input.streams. Each order comes from a different request message passed into the continuous service—potentially from many different workflows or sessions at once. Forcing you to send streams through the OrderMut API ensures that the messages you stream out are only going to the specific order that they’re meant for.

Delivery Instructions

A common problem with highly parallelized async systems is when multiple instances of one service are active at the same time, but the service is designed for exclusive usage. This can happen when a service interacts with global resources like a mouse cursor or physical assets like the joints of a robot. If multiple conflicting requests are being made for the same set of resources, there could be deadlocks, misbehaviors, or even critical failures.

Resource contention is a broad and challenging problem with no silver bullet solution, but crossflow offers a delivery instructions mechanism that can help to handle simple cases. With the help of delivery instructions, you can implement your service to not worry about exclusive access, and then wrap the service in instructions that tell crossflow to only execute the service once-at-a-time.

More advanced use of delivery instructions allow you to have specific sets of requests for a service run in serial while others run in parallel at the same time. You can have specific queues where requests in the same queue run in serial while the queues themselves run in parallel.

Tip

Delivery instructions are only relevant for async and continuous services. They have no effect on blocking services since blocking services always run in serial no matter what.

Always-Serial Services

Warning

Always-serial services are also serial across different workflows and different sessions of the same workflow. When a service gets used in multiple different nodes, those nodes will only execute once-at-a-time, even if the nodes belong to different workflows or are being triggered across different sessions.

You can choose for an async or continuous service to always be run in serial, meaning when multiple requests are sent to it, they will always be run once-at-a-time. This does not block any other services from running at the same time.

To make any service always run serially, just add .serial() while spawning it:

let service = commands.spawn_service(
    my_service
    .serial()
);

Note

Apply .serial() to the name of the service itself, inside the parenthesis of .spawn_service(_). You cannot chain it outside the parentheses of .spawn_service(_).

Delivery Label

Always-serial is often too restrictive. When a service is used across different workflows or in multiple nodes at the same time, the requests being sent to it might be accessing unrelated resources. In this case, you can assign delivery instructions to the service before making the request.

The first step is to create a delivery label:

#[derive(Debug, Clone, PartialEq, Eq, Hash, DeliveryLabel)]
struct MyDeliveryLabel {
    set: String,
}

A delivery label can be any struct that implements Debug, Clone, PartialEq, Eq, Hash, and DeliveryLabel. The #[derive(DeliveryLabel)] will fail at compile time if any of the required traits are missing, so the compiler will help ensure that your struct has all the traits it needs. Other than those traits, there is no restriction on what can be used as a delivery label.

To use a delivery label, spawn a service as normal, but apply instructions with .instruct(_) when passing it into a request:

// Spawn the service as normal
let service = commands.spawn_service(my_service);

// Create delivery instructions
let instructions = DeliveryInstructions::new(
    MyDeliveryLabel {
        set: String::from("my_set")
    }
);

// Add the instructions while requesting
let outcome = commands.request(
    String::from("hello"),
    service.instruct(instructions),
).outcome();

To create the DeliveryInstructions, you must pass in an instance of your delivery label.

Every request made with a matching delivery label value will be put into the same queue and run in serial with each other. Requests made with different delivery labels will be put in separate queues which can run in parallel.

Pre-empt

Sometimes waiting for a service to execute once-at-a-time isn’t really what you want. When multiple requests are contending for the same resource, sometimes what you really want is to discard the earlier requests so the newest one can take over.

We call this pre-emption, and provide it as an option for DeliveryInstructions:

// Create a label
let label = MyDeliveryLabel {
    set: String::from("my_set")
};

// Make instructions that include preempting
let preempt_instructions = DeliveryInstructions::new(label).preempt();

let outcome = commands.request(
    String::from("hello"),
    service.instruct(preempt_instructions)
).outcome();

When .preempt() is applied to DeliveryInstructions, that request will discard all requests that were queued up before it. Even a live task that is in the process of being executed will be cancelled when a pre-empting request arrives.

Note

Earlier we said that delivery instructions don’t have an effect on blocking services, but this is not true in the case of pre-emption. If multiple requests with the same delivery label have gotten queued up for a blocking service before it’s had a chance to flush, a pre-empting request will clear out the earlier requests before they can ever execute.

Warning

When a service being used by a workflow gets pre-empted, that will result in a disposal.

Ensure

Sometimes pre-emption may be too much of a scorched-earth approach. If you have some requests that should be pre-empted while others are too crucial to pre-empt, you can apply .ensure():

let instructions = DeliveryInstructions::new(label)
    .preempt()
    .ensure();

let outcome = commands.request(
    String::from("hello"),
    service.instruct(instructions)
).outcome();

Tip

.preempt() and .ensure() can be used independently of each other per request. A request can be pre-emptive but not ensured, or ensured but not pre-emptive, or it can be both pre-emptive and ensured.

Full Example

use async_std::future::{pending, timeout};
use crossflow::bevy_app::App;
use crossflow::prelude::*;

#[derive(Debug, Clone, PartialEq, Eq, Hash, DeliveryLabel)]
struct MyDeliveryLabel {
    set: String,
}

impl MyDeliveryLabel {
    fn new(set: String) -> Self {
        Self { set }
    }
}

fn main() {
    let mut app = App::new();
    app.add_plugins(CrossflowExecutorApp::default());

    let waiting_time = std::time::Duration::from_secs(2);

    let waiting_service =
        app.world_mut()
            .spawn_service(move |In(input): AsyncServiceInput<String>| async move {
                // Create a future that will never finish
                let never = pending::<()>();
                // Wait on the never-ending future until a timeout finishes.
                // This creates an artifical delay for the async service.
                let _ = timeout(waiting_time, never).await;

                println!("{}", input.request);
            });

    // We will fire off 10 requests at once for three different sets where each
    // set has delivery instructions, making them have serial (one-at-a-time)
    // delivery within each set. Since the sets themselves have independent labels,
    // the requests in each set can be processed in parallel.
    //
    // The service itself does nothing but waits two seconds before printing its input message.
    for set in ["A", "B", "C"] {
        let instructions = DeliveryInstructions::new(MyDeliveryLabel::new(set.to_string()));
        for i in 1..=10 {
            let message = format!("Message #{i} for set {set}");
            let service = waiting_service.instruct(instructions.clone());
            app.world_mut().command(|commands| {
                commands.request(message, service).detach();
            });
        }
    }

    app.run();
}

Tip

To set delivery instructions for the services used in nodes, simply apply .instruct(_) to the service before passing it to builder.create_node(_)

Callbacks

When you spawn a service you get back a Service instance. The implementation of that Service will be stored inside the Bevy ECS until you choose to despawn it.

In some cases you might not want the implementation to exist inside the Bevy ECS. You might prefer an object that you can pass around and which will have RAII—freeing its memory when it is no longer needed.

Callbacks are an alternative to services whose lifecycle is outside of the Bevy ECS but still act as Bevy systems—able to interact with entities, components, and resources. They also fulfill the role of services—taking in a Request message and passing back a Response message, potentially with output streams as well.

There are three key differences between a service and a callback:

  • Callbacks do not need to be spawned with Commands.
  • Callbacks are not associated with any Entity and therefore do not have any provider that you can store components on.
  • A callback will automatically deallocate when all references to it are dropped.

Tip

The more general term that we use to refer to services and service-like things—such as callbacks—is provider.

How to use

To use a callback, simply define the callback either as a fn or a closure, and then apply .as_callback() to its name. Note the use of BlockingCallbackInput instead of BlockingServiceInput:

// We can access this resource from the callback
#[derive(Resource)]
struct Greeting {
    prefix: String,
}

// Make an fn that defines the callback implementation
fn perform_greeting(
    In(input): BlockingCallbackInput<String>,
    greeting: Res<Greeting>,
) -> String {
    let name = input.request;
    let prefix = &greeting.prefix;
    format!("{prefix}{name}")
}

// Convert the fn into a callback.
// This is necessary to initialize the fn as a bevy system.
let callback = perform_greeting.as_callback();

// Use the callback in a request
let outcome = commands.request(String::from("Bob"), callback).outcome();
Async

Async callbacks are implemented in much the same way as async services, just replacing AsyncServiceInput with AsyncCallbackInput:

async fn page_title_callback(In(srv): AsyncCallbackInput<String>) -> Option<String> {
    let response = trpl::get(&srv.request).await;
    let response_text = response.text().await;
    trpl::Html::parse(&response_text)
        .select_first("title")
        .map(|title| title.inner_html())
}

let callback = page_title_callback.as_callback();
let outcome = commands.request(String::from("https://example.com"), callback).outcome();

Same as for blocking callbacks, you turn the fn definition into a callback by applying .as_callback() to it.

Note

There is no callback equivalent to continuous services. Continuous services must exist inside the Bevy ECS. There is currently no way around this.

Closures

You can also turn a closure into a callback. Sometimes the syntax for this is confusing, but the easiest way to make it work is to first define the closure as a variable and then convert that variable into a callback:

// Make an closure that defines the callback implementation
let perform_greeting = |
    In(input): BlockingCallbackInput<String>,
    greeting: Res<Greeting>,
| {
    let name = input.request;
    let prefix = &greeting.prefix;
    format!("{prefix}{name}")
};

// Convert the fn into a callback.
// This is necessary to initialize the fn as a bevy system.
let callback = perform_greeting.as_callback();

// Use the callback in a request
let outcome = commands.request(String::from("Bob"), callback).outcome();

This also works for async callbacks, but you need to use the async block syntax:

let page_title_callback = |In(srv): AsyncCallbackInput<String>| {
    async move {
        let response = trpl::get(&srv.request).await;
        let response_text = response.text().await;
        trpl::Html::parse(&response_text)
            .select_first("title")
            .map(|title| title.inner_html())
    }
};

let callback = page_title_callback.as_callback();
let outcome = commands.request(String::from("https://example.com"), callback).outcome();

Service/Callback Agnostic Implementation

In some cases you might want to implement a Bevy system provider but don’t want to commit to choosing a service or a callback. For that you can create a regular Bevy system that takes an input and convert it into a service or callback later.

Whether you make it a service or a callback, the Request message type will match the input of the Bevy system, and the Response message type will match either its return value (for blocking) or the output of its Future (for async).

Here’s an example of converting a blocking function into a service and a closure:

fn perform_greeting(
    In(name): In<String>,
    greeting: Res<Greeting>,
) -> String {
    let prefix = &greeting.prefix;
    format!("{prefix}{name}")
}

// Use as service
let greeting_service = commands.spawn_service(
    perform_greeting.into_blocking_service()
);
let outcome = commands.request(String::from("Bob"), greeting_service).outcome();

// Use as callback
let greeting_callback = perform_greeting.into_blocking_callback();
let outcome = commands.request(String::from("Bob"), greeting_callback).outcome();

And here’s an example for an async function:

fn get_page_element(
    In(element): In<String>,
    url: Res<Url>,
) -> impl Future<Output = Option<String>> + use<> {
    let url = (**url).clone();
    async move {
        let content = fetch_content_from_url(url).await?;
        content.get(&element).cloned()
    }
}

let element = String::from("title");

// Use as service
let title_service = commands.spawn_service(
    get_page_element.into_async_service()
);
let outcome = commands.request(element.clone(), title_service).outcome();

// Use as callback
let title_callback = get_page_element.into_async_callback();
let outcome = commands.request(element, title_callback).outcome();

Caution

.into_async_callback() does not work for systems whose only system parameter is the input. Trying to convert such a function into a callback will result in a compilation error. This problem is being tracked by #159.

Maps

Services and callbacks are two kinds of providers that can both function as Bevy systems. Bevy systems have the benefit of granting you full access to the Bevy ECS, but there is a small amount of overhead involved in initializing and running systems. If you want a provider that does not need access to Bevy’s ECS, then you should consider using a map instead.

Maps are providers that have the minimum possible overhead. They are defined with as functions that don’t have any Bevy system parameters. Maps are good for doing quick transformations of data or for calling non-Bevy async functions.

Warning

Since blocking maps do not have any system parameters, they cannot access data inside of buffers.

Async maps can use the async channel to access buffer data, but each query needs to wait until the next execution flush takes place.

Simple usage

When building a series of services you can use Series::map_block(_) or Series::map_async(_) to quickly and easily create a map.

Similarly when chaining service in a workflow you can use Chain::map_block(_) or Chain::map_async(_).

These functions allow you to convert simple blocking or async functions (or closures) into providers with no additional steps.

Streams and Async Channel

Even though maps cannot directly access any Bevy system params, they can still have output streams. Async maps even get access to the async channel which means they can query for and modify Bevy ECS data while running in the async task pool.

To get access to these, you must define your function explicitly as a blocking or async map:

fn fibonacci_map_example(input: BlockingMap<u32, StreamOf<u32>>) {
    let order = input.request;
    let stream = input.streams;

    let mut current = 0;
    let mut next = 1;
    for _ in 0..order {
        stream.send(current);

        let sum = current + next;
        current = next;
        next = sum;
    }
}

let outcome = commands.request(10, fibonacci_map_example.as_map()).outcome();

Before passing your function into commands.request(_) you must apply .as_map() to it to convert it into a map.

Tip

If your map is part of a series or a chain, you can use Series::map(_) or Chain::map(_) instead to avoid the need to apply .as_map(). This is useful if you want to define your map as a closure inline in the series or chain.

Blocking maps do not get to access the async channel—neither do blocking services nor blocking callbacks—but async maps do. Just like accessing streams, you just need to make it explicit that you have an async map:

async fn navigate(
    input: AsyncMap<NavigationRequest, NavigationStreams>,
) -> Result<(), NavigationError> {
    // Clone the navigation graph resource so we can move the clone into the
    // async block.
    let nav_graph = input
        .channel
        .world(|world| {
            world.resource::<NavigationGraph>().clone()
        })
        .await;

    // Create a callback for fetching the latest position
    let get_position = |
        In(key): In<BufferKey<Vec2>>,
        access: BufferAccess<Vec2>,
    | {
        access.get_newest(&key).cloned()
    };
    let get_position = get_position.into_blocking_callback();

    // Unpack the input into simpler variables
    let NavigationRequest { destination, robot_position_key } = input.request;
    let location_stream = input.streams.location;

    loop {
        // Fetch the latest position using the async channel
        let position = input.channel.request_outcome(
            robot_position_key.clone(),
            get_position.clone()
        )
            .await
            .ok()
            .flatten();

        let Some(position) = position else {
            // Position has not been reported yet, so just try again later.
            continue;
        };

        // Send the current position out over an async stream
        location_stream.send(position);

        // TODO: Command the robot to proceed towards its destination
        // TODO: Break the loop when the robot arrives at its destination
    }

    Ok(())
}

With that, navigate can be passed into commands.request(_, navigate.as_map()) or into series.map(navigate) or chain.map(navigate).

Provider Overview

There are three large categories of providers:

and up to three flavors within each category:

All of these different types of providers can be used as elements of a series or workflow, but they all have slightly different characteristics. For an in-depth explanation of the differences between them, you can look at the intro section for Callbacks and Maps, but the table on this page provides a quick overview to help remind you at a glance.

AdvantageCaveatProviders
BlockingNo thread-switching overhead

Instant access to all data in the Bevy ECS world

Sequences of blocking providers finish in a single flush
While running, block progress of all series, workflows, and scheduled systems in the app

(these do not block async providers)
✅ Service
✅ Callback
✅ Map
AsyncExecuted in parallel in the async thread pool

Can query and modify the world asynchronously from the threadpool

Can use .await
Query and modifications of the world take time to flush

Moving data between threads and spawning async tasks has non-zero overhead
✅ Service
✅ Callback
✅ Map
ContinuousRun every system schedule update cycle, making them good for incremental actions

Run in parallel with other Bevy systems in the schedule

Instant access to all data in the Bevy ECS world
They wake up every system schedule update cycle, even if there are no requests queued for them✅ Service
❌ Callback
❌ Map

Service

Callbacks

  • Stored in a Callback object—not stored in the Bevy ECS.
  • Can be passed around as an object and cloned.
  • Drops when no longer used anywhere.
  • Can be used as a Bevy system.

Maps

  • Minimal execution overhead.
  • Cannot be used as a Bevy system.

All

How to Run a Service

Once you have spawned a service or have some other type of “provider” available [1][2][3], you can run it by passing in a request. This can be done from inside of any Bevy system by including a Commands:

let mut outcome = commands.request(request_msg, service).outcome();

The .request(_, _) method comes from the RequestExt trait provided by crossflow. This method takes in a request_msg (the input message for the service) and any type of “provider”, which is usually a Service.

The simplest thing to do with a request is to take the outcome using .outcome(). This will provide you with an Outcome which you can use to receive the response of the service once it finishes.

Sync Outcome

You can use an Outcome in a sync (blocking, non-async) function using try_recv:

match outcome.try_recv() {
    Some(Ok(response)) => {
        println!("The final response is {response}");
    }
    Some(Err(cancellation)) => {
        println!("The request was cancelled or undeliverable: {cancellation}");
    }
    None => {
        println!("The request is still being processed, try again later")
    }
}

Warning

Using outcomes in sync code has a crucial disadvantage that you need to repeatedly poll the outcome to know when it has finished. In most cases this is inefficient busywork.

You are recommended to await outcomes in async code instead.

Async Outcome

The most efficient and ergonomic way to use an Outcome is to .await it in an async function. Awaiting the Outcome will consume it and return its final result as soon as that final result is available:

match outcome.await {
    Ok(response) => {
        println!("The final response is {response}");
    }
    Err(cancellation) => {
        println!("The request was cancelled: {cancellation}");
    }
}

The result will either be the final response from the service or an error explaining why the request was cancelled.

More Ways to Manage Requests

There are often times where you’ll want to immediately feed the result of one service into another in a chain of service calls. We call this a Series, and you can continue to the next page to find out how to do this.

Some services have output streams in addition to a response, and you may need to receive data from those. You can learn about how to receive from output streams in Receiving from Output Streams.

If simply receiving the final response of a service is enough for your needs, then you can move along to the Introduction to Workflows section.

How to Run a Series

Very often when running a service you may want to feed its output directly into another service in order to emulate a more complex service or a multi-stage action. For one-off requests you can chain services together by building a Series.

Tip

If you need to assemble services into a more complex structure than a linear sequence (e.g. parallel threads, conditional branching, loops), you can build a workflow instead.

The advantage of using a Series over a workflow is that you can run a series once and forget about it (cleanup is automatic), whereas when you build a workflow you will spawn a Service that needs to be managed afterwards.

Every time you run .request(_, _), you immediately receive a Series. When you use .request(_, _).take_response() you are actually calling Series::take_response which terminates the construction of the series after a single service call.

To build a more complex series you can use the chaining methods provided by the Series struct:

  • .then(_): Specify a service (or other kind of provider) to pass the last response into, getting back a new response.
  • .map_block(_): Specify a FnOnce(T) -> U to transform the last response into a new value. The function you provide will block the system schedule from running, so it should be short-lived.
  • .map_async(_): Specify a FnOnce(T) -> impl Future<Output=U> to transform the last response into a new value. The Future will be evaluated in the async task pool, so put all long-running routines into the async block.

The following example shows a simple series that feeds the output of one service into another service with a .map_block between them to transform the first service’s output into a data type that can be consumed by the second service.

let storage = commands.spawn(()).id();

commands
    // Ask for three values to be summed
    .request(vec![-1.1, 5.0, 3.1], sum_service)
    // Convert the resulting value to a string
    .map_block(|value| value.to_string())
    // Send the string through the parsing service
    // which may produce streams of u32, i32, and f32
    .then(parsing_service)
    // Collect the parsed values in an entity
    .collect_streams(storage)
    // Detach this series so we can safely drop the tail
    .detach();

Dependencies and Detachment

You may notice the .detach() at the end of the previous example series. Ordinarily a series will automatically stop executing if its result is no longer needed, which we detect based on whether the Outcome of the final response gets dropped. This allows us to avoid running services needlessly. However sometimes you want services to run even if you won’t be observing its final result, because you are interested in side-effects from the service rather than the final response of the service. You can insert .detach() anywhere in a series to ensure that everything before the .detach() gets run even if the part of the series after the .detach() gets dropped.

There are several ways to terminate a series, and each has drop conditions that affect what happens to the series when it gets dropped. You can find a table of the different terminating operations and their drop conditions in the Series::detach() documentation.

Receiving from Output Streams

Some services also have output streams that you may want to receive data from. In that case you will need to take a [Capture][Capture] instead of only taking the [Outcome]:

let mut capture = commands.request(String::from("-3.14"), parsing_service).capture();

The parsing_service provides this ParsedStreams stream output pack:

#[derive(StreamPack)]
struct ParsedStreams {
    /// Values that were successfully parsed to u32
    parsed_as_u32: u32,
    /// Values that were successfully parsed to i32
    parsed_as_i32: i32,
    /// Values that were successfully parsed to f32
    parsed_as_f32: f32,
}

The string value "3.14" should be able to parse to f32 but not u32 or i32. We can receive whichever values were produced with the following code:

let _ = capture.outcome.await;
println!("The service has finished running.");
while let Some(value) = capture.streams.parsed_as_u32.recv().await {
    println!("Parsed an unsigned integer: {value}");
}

while let Some(value) = capture.streams.parsed_as_i32.recv().await {
    println!("Parsed a signed integer: {value}");
}

while let Some(value) = capture.streams.parsed_as_f32.recv().await {
    println!("Parsed a floating point number: {value}");
}

The streams field of Recipient will itself contain one field for each field of the service’s stream pack, and the names those fields will match the field names in the stream pack.

Each field in Recipient::streams will be a receiver that can be used to receive values streamed out by the requested service, either in a sync or async context.

In the snippet above, .recv() will provide a “Future” that can efficiently wait for the next value from the stream receiver that it’s called on. Calling .await on that Future will let the async task rest until the future is available. Calling .recv().await in a while-loop expression ensures that we keep draining the streams until all messages have been received.

The Recipient also has a response field which the above snippet uses to detect that the service is finished running. In general Recipient::response will provide the final output message of the service, but for parsing_service that is just a unit trigger ().

You are not required to await the response or the streams in any particular order. You could await or poll the streams while the request is still being processed in order to receive live updates from long-running requests. There are many tools in the Rust ecosystem that allow for sophisticated async programming patterns, such as tokio’s select macro that allows you to await on multiple streams at once and immediately receive the next message that emerges from any one of them:

use tokio::select;

let next_u32 = capture.streams.parsed_as_u32.recv();
let next_i32 = capture.streams.parsed_as_i32.recv();
let next_f32 = capture.streams.parsed_as_f32.recv();
select! {
    recv = next_u32 => {
        if let Some(value) = recv {
            println!("Received an unsigned integer: {value}");
        }
    }
    recv = next_i32 => {
        if let Some(value) = recv {
            println!("Received a signed integer: {value}");
        }
    }
    recv = next_f32 => {
        if let Some(value) = recv {
            println!("Received a floating point number: {value}");
        }
    }
}

Collecting Streams

If you are not interested in managing stream channels and just want to store the stream outputs for later use, you can use Series::collect_streams to store all the stream outputs in an entity of your choosing:

// Spawn an entity to be used to store the values coming out of the streams.
let storage = commands.spawn(()).id();

// Request the service, but set an entity to collect the streams before we
// take the response.
let outcome = commands
    .request(String::from("-5"), parsing_service)
    .collect_streams(storage)
    .outcome();

// Save the entity in a resource to keep track of it.
// You could also save this inside a component or move it
// into a callback, or anything else that suits your needs.
commands.insert_resource(StreamStorage(storage));

Then you can schedule a system to query the Collection component of that entity to inspect the streams. When using a stream pack with named streams make sure to use NamedValue<T> for the inner type of the Collection:

use crossflow::Collection;

#[derive(Resource, Deref)]
struct StreamStorage(Entity);

fn print_streams(
    storage: Res<StreamStorage>,
    mut query_u32: Query<&mut Collection<NamedValue<u32>>>,
    mut query_i32: Query<&mut Collection<NamedValue<i32>>>,
    mut query_f32: Query<&mut Collection<NamedValue<f32>>>,
) {
    if let Ok(mut collection_u32) = query_u32.get_mut(**storage) {
        for item in collection_u32.items.drain(..) {
            println!(
                "Received {} from a stream named {} in session {}",
                item.data.value,
                item.data.name,
                item.session,
            );
        }
    }

    if let Ok(mut collection_i32) = query_i32.get_mut(**storage) {
        for item in collection_i32.items.drain(..) {
            println!(
                "Received {} from a stream named {} in session {}",
                item.data.value,
                item.data.name,
                item.session,
            );
        }
    }

    if let Ok(mut collection_f32) = query_f32.get_mut(**storage) {
        for item in collection_f32.items.drain(..) {
            println!(
                "Received {} from a stream named {} in session {}",
                item.data.value,
                item.data.name,
                item.session,
            );
        }
    }
}

Introduction to Workflows

If you need to assemble services in a more complex way than a series, you can build a workflow. Building a workflow will ultimately leave you with a Service which you can use to run the workflow.

Fundamentally, workflows define how the output of one service should connect to the input of another service. Along the way, the data that is being passed might undergo transformations.

service-chain

There are three strategies available for creating workflows, each with its own benefits and use-cases:

DescriptionBenefits
native Rust APIWrite Rust code to define nodes and workflows using the native Rust API of crossflow🠚 compile-time validation
🠚 access to all native features
🠚 easily import plugins
runtime generationGenerate a JSON diagram based on the output of a planner or a description of some process🠚 runtime validation of diagram
🠚 visualize generated diagrams
🠚 implement highly dynamic systems
visual editorVisually design and configure a workflow with a graphical editor🠚 no-code programming
🠚 validate diagram while editing
🠚 runtime validation of diagram

This chapter will introduce concepts that are relevant to all three. For building workflows using the native Rust API, you can go to the How to Build a Workflow chapter. To learn about runtime generation and visual (no-code) editing of workflows, go to JSON Diagrams.

Tip

See our live web demo of the open source crossflow diagram editor.

Node

To put a service into a workflow you create a node by specifying a service that will be run when the node is given an input:

workflow-node

In this case we’ve created a node that will run the bake_pie service, taking an apple and turning it into a pie. You can include the same service any number of times in a workflow by creating a node for each place that you want to run the service.

Input Slots and Outputs

A node has one input slot and one final output. There must be at least one output connected into a node’s input slot in order for the node to ever be activated, but there is no upper limit on how many outputs can connect to an input slot. The node will simply be activated any time a message is sent to its input slot, no matter what the origin of the message is.

The output of a node must be connected to no more than one input slot or operation. If an output is not connected to anything, we call that output “disposed”. If you want to connect your node’s output to multiple input slots, you will need to pass it through an operation like clone, unzip, or split depending on how you want the message’s data to be distributed.

To connect an output to an input slot, the data type of the output must match the data type expected by the input slot. When using the native Rust API, you can use a map node to transform an output message into a compatible input message. When building a JSON diagram, you can use the transform operation. A data type mismatch will either cause a compilation error (native Rust API) or a workflow building error (JSON Diagram). Either way, the workflow will not be allowed to run until the mismatch is resolved.

Streams

If the service used by your node has output streams then you will receive a separate Output for each stream:

output-streams

In this example, the pollinate service has a side effect of producing more flowers and producing honey. We can represent these side effects as output streams of the service.

Each of these streamed outputs can be connected to a separate input slot or operation. Each stream can carry a different data type, so make sure that you are connecting each stream to a compatible input slot or operation.

When building a workflow, streamed outputs behave essentially the same as the regular “final” output. There are just two practical characteristics that make streamed outputs different:

  • Output streams will only produce messages while the node is active. After the final output message has been sent, no more stream messages can appear until the node is activated again by a new input message.
  • For as long as the node is active, an output stream may produce any number of messages, including zero. This means you cannot rely on getting any messages from a stream unless you know something about the implementation of the service that the node is using.

Operations

Besides just running services, workflows need to manage the flow of data between those services. That includes conditional branching, parallelism, and synchronization. Continue to the next page to learn about the other kinds of operations that you can use in a workflow to achieve the behavior that you want.

Branching

What distinguishes a workflow from a series is that workflows support additional operations besides just chaining services together. For example, fork-result creates a fork (a point with diverging branches) in the workflow where one of two branches will be run based on the value of the input message:

fork-result

The input message of a fork-result must be the Result<T, E> type. Result is an enum with two variants: Ok and Err. The Ok variant will contain an object of type T while Err will contain a type E. T and E can be any two data structures of your choosing.

Each variant gets its own branch coming out of the fork-result operation. Each time a message is sent into the fork-result operation, exactly one branch will activate depending on whether the message is an Ok and Err variant. In Rust, an instance of an enum can only have a value of exactly one of its variants.

We say that fork-result does “conditional branching” because it creates a fork in the workflow where one or more branches might not be activated when a message arrives.

Trigger

Another conditional branching operation is fork-option. Similar to Result<T, E>, the Option<T> type in Rust is an enum with two variants: Some and None. What makes Option different from Result is that the None variant has no inner value. An Option<T> either contains some value T or it contains no value at all (similar to std::optional in C++).

When we fork an Option we still produce two possible branches:

fork-option

The branch for Some will carry the inner T message that the Option contained, but the None branch has no value to carry. Instead it contains a message of the unit type represented in Rust code as an empty tuple ().

The unit type concept is a useful one in workflows. It’s a way of sending a signal that some branch in the workflow should activate but there is no meaningful data to transmit. We refer to this pattern as a “trigger”, and operations that produce a unit type are called triggers.

In the above example we check whether an apple is available. If it is, we send it to the kitchen to be baked into a pie. If no apple was available then we go to the supermarket and bring back an apple to send to the kitchen. The context of the workflow is enough for us to know that we need to pick up an apple from the supermarket (this information could be embedded in the service configuration), so a trigger with no message data is sufficient to activate that service.

Note

If you have a custom enum type that you would like to use for forking, we have an open issue ticket to add support for this.

Parallelism

Clone

Besides conditionally running one branch or another, some operations can run multiple branches in parallel. For example, the fork-clone operation takes in any cloneable message and then sends one copy down each each of the branches coming out of it:

fork-clone

Each branch coming out of the fork-clone will run independently and in parallel. Unlike in a behavior tree, these branches are completely decoupled from here on out, unless you choose to synchronize them later. You are free to do any other kind of branching, cycling, connecting for each of these branches, without needing to consider what any of the other branches are doing.

Unzip

Another way to create parallel branches is to unzip a tuple message into its individual elements:

fork-unzip

The tuple can have any number of elements (up to 12), and the fork-unzip will have as many output branches as its tuple had elements. Just like fork-clone, each branch will be fully independent.

Note

If you would like to unzip a regular struct with fields, where each field gets sent down a different branch, issue #145 is open to track this.

Spread

Warning

At the time of this writing, the spread operation is not yet available as a JSON diagram operation. This is being tracked by #59. In the meantime it can be put into a JSON diagram via the section builder operation.

Tip

A single node can be activated any number of times simultaneously.

Parallelism in workflows is not limited to forking into parallel branches. A single branch can support any amount of parallel activity. In other words, a single node can be activated multiple times simultaneously, with each run of its service being processed independently. One easy way to parallelize a single branch is with the spread operation:

spread

The spread operation takes an iterable message type (e.g. Vec, array, or anything that implements IntoIterator) and spreads it out into N separate messages where N is however many elements were in the collection.

All of the messages coming out of the spread operation will be sent to the same input slot. In principle they are all sent to the input slot “at the same time” although the exact outcome will vary based on what kind of operation they are being sent to:

  • If they are sent to a blocking service then they will be processed one-at-a-time but all of them will be processed within the same “flush” of the workflow (i.e. within one update frame of the Bevy schedule).
  • If they are sent to an async service then the service will be activated with one message at a time, but the Futures of all the calls will be processed in parallel by the async compute task pool.
  • If they are sent to a continuous service then they will all be queued up for the continuous service together, and the continuous service will see all of them in its queue the next time it gets run.

The inverse of the spread operation is the collect operation.

Split

One very useful operation can do conditional branching, parallel branching, and spreading all at once. That operation is called split:

split-keyed

Split takes in anything splittable, such as collections and maps, and sends its elements down different branches depending on whether the element is associated with a certain key or location in a sequence:

split-sequence

In a sequenced split, the first element in the collection will go down the seq: 0 branch, the second element will go down the seq: 1 branch, etc, and all the rest will go down the remaining branch. The meaning of “first”, “second”, etc is determined by how the splittable trait was implemented for the input message. For vectors, arrays, etc this will naturally be the order that the elements appear in the container. For ordered sets and maps (e.g. BTreeSet and BTreeMap) it will be based on the sorted order of the element or key, respectively. For unordered sets and maps (e.g. HashSet and HashMap) the ordering will be arbitrary, and may be different in each run, even if the input value of the message is the same.

Tip

Keys and sequences can be used at the same time in one split operation. Elements will prefer to go down a branch that has a matching key, regardless of where the element would be in a sequence. If an element gets matched to a branch with a key, then that element will not count as part of the “sequence” at all, meaning the next element that doesn’t match any keyed branch will take its place in the sequence.

As a result, the branch for seq: n will never receive a message until all lower sequence branches up to seq: n-1 have received a message first.

Any elements that don’t match one of the keyed or a sequenced branches will be sent along the remaining branch. If multiple elements in the collection match the same key or if multiple elements fail to match any keyed or sequenced branches, then there will be multiple messages sent down the same branch, similar to the spread operation.

The split operation also behaves similar to conditional branching because there may be keyed or sequenced branches that have no matching element in the collection. When this happens, no message will be sent down those unmatched branches. It is also possible that all elements will be matched to a keyed or sequenced branch, in which case no messages will be sent down the remaining branch.

Note

Splitting only works for data structures who can be split into a collection of messages that all share the same type. However serializing into a JsonMessage, splitting it into elements of JsonMessage, and then deserializing those messages later is one potential way to work around this.

For splitting apart the fields of data structures without needing to serialize or deserialize, see issue #145.

Synchronization

Unfettered parallelism empowers your workflow to aggressively carry out its tasks without being concerned about what other activities it may be carrying out simultaneously. However, sometimes those parallel threads of activity relate to each other in important ways. Sometimes a service needs to obtain results from multiple different branches before it can run.

When multiple separate branches need to weave back together, or concurrent activity needs to be gathered up, we call that synchronization.

This chapter will explain how to synchronize the parallel or concurrent activity within your workflow using various builtin mechanisms.

Buffers

When services are connected together in a workflow, the next service in a chain will receive input messages as soon as they’re produced. But sometimes when a service is finished, the message that it produces can’t be used right away. It may need to wait for some other service to finish first, so their results can be combined.

For example, if we want to bake an apple pie, we don’t need to wait for the oven to preheat before we chop the apples, or vice versa. These can be done in parallel, but we need both to be done before we can begin baking the pie:

buffer-bake

To make this work, we capture the output of each service—chop_apple and preheat_oven—in a separate buffer. A buffer is a workflow element that can capture messages for later use. There are numerous ways and reasons to use a buffer, but a some examples include:

  • Hold messages until the workflow is ready to consume them (similar to “places” in Petri Nets)
  • Track the internal state of workflow (similar to the “blackboard” in behavior trees)

In our apple pie use case, we want to see that the “apple chopped” and “temperature ready” buffers both have a message before we pull those messages out of the buffers and pass the apples along to be baked.

Tip

Buffer settings allow you to specify how many messages a buffer can retain. You can also specify whether to discard the oldest or the newest messages when the limit is reached.

This allows buffers to handle buildups of data if one branch is generating messages at a higher frequency than another branch that it needs to sync with.

Certain operations take buffers instead of messages as inputs. Those operations will be activated on any change in any of the buffers connected to them, although the exact behavior depends on the operation. Some examples are join and listen, covered in the next two pages.

Join

A common use of buffers is to join together the results of two or more parallel branches. Crossflow provides a builtin join operation that can have two or more buffers connected into it. As soon as at least one message is present in each and every buffer connected to the join operation, the oldest message will be pulled from each buffer and combined into a single message that gets sent as the output of the join operation.

Join into struct

There are two ways to join buffers depending on whether you want to produce a struct or a collection. To join into a struct, you will need to specify a key (name) for each buffer connection. Each key will put the buffer value into a matching field in the struct.

join-keyed

The type of struct that gets created by the join operation is automatically inferred by what node the output of the join operation is connected to.

Note

A keyed join can be used to produce more than static structs. The values can also be gathered into a map, such as HashMap, BTreeMap, or JSON where the key name of the connection will be used as the buffer value’s key in the map.

Each key can only be used once per join operation. If a buffer is used in multiple join operations, it can have a different key for each join.

No matter what struct you intend to create through the join, the workflow builder will ensure that every necessary field has a corresponding buffer connected with a matching key name and matching data type. When building workflows with the native Rust API, any mismatch will produce a compilation error. When building a workflow from a JSON diagram, you will get an IncompatibleLayout error.

Join into sequence

While the keyed join is generally the recommended way to do a join (explicit key names have more semantic value), it is also possible to join into a sequence, such as an array, Vec, tuple, or JSON. For each connected buffer, specify a sequencing index that will determine where in the sequence that element belongs.

join-sequence

When joining into an array or Vec, all the buffer values must have the same data type. Joining into a tuple allows their data types to be mixed. Joining into JSON requires the data to be serialized. Similar to error handling for the keyed join, any incompatibility will produce a compilation error for the native Rust API and an IncompatibleLayout error when building from a JSON diagram.

Fetch by clone

When the conditions are met for a join operation to activate (at least one message is present in every connected buffer), the join operation will construct its output message by “fetching” the oldest message from each of its connected buffers. By default, “fetching” means to “pull” the message, removing it from the buffer and moving it into the output message.

Sometimes there may be a reason to clone the message out of the buffer instead of pulling it. For example if the buffer represents a checkpoint in your process that doesn’t need to be repeated, you can clone from the buffer to retain the checkpoint.

Suppose we want to make many apple pies with the same oven.

fetch-by-clone

We can’t bake any pies until the oven is preheated, but once the oven has reached the right temperature, we do not need preheat it again. We can have our ingredient preparation branch repeatedly prepare the pie pans for baking, and all of those pans will be put in the oven as soon as the oven has finished preheating once. Any new pans that are prepared after the preheating is finished can go into the oven right away.

To express this behavior, we use fetch-by-clone for the buffer that tracks whether the oven is preheated, while we do the regular fetch-by-pull for the prepared pan. The “finished preheating” buffer will be able to retain its knowledge that the preheating has finished, while the prepared pan buffer will have its pans consumed (moved into the oven) each time that it can happen.

Join Rates

The default behavior that you get from joining two buffers fits typical fork-then-join uses cases:

  • Each buffer will hold up to one message. Older messages get dropped when new messages arrive.
  • When the join is performed, the single message in each buffer will be pulled out, leaving all the buffers empty.

However this might not be the behavior you want in all uses cases of join. Sometimes branches that lead into a join will each be streaming messages at different rates. The settings you have for your buffer and your join operation could yield different outcomes depending on the message rates of the different branches.

Here’s an example of a branch that’s producing localization data (top) being joined with another branch producing camera data (bottom):

join-1-pull-1-pull

With both buffers having keep_last: 1 and being joined by pulling (rather than cloning) from both, you can see that some of the location data—which is being streamed down the branch faster—will be dropped because new location samples will enter the buffer before the older samples get pulled out for a join.

Suppose we have the opposite case where our robot moves infrequently and we want to pair up incoming camera samples with whatever the last known location happens to be. For that we can stick with the keep_last: 1 setting for both buffers, but use fetch-by-clone for the location data:

join-1-clone-1-pull

The above two setups work well enough if incoming samples are fungible, meaning we don’t care about exactly which messages get paired up between the branches. They might not work so well if there are specific messages that are supposed to be paired across the branches.

Suppose we have two branches that are each processing a batch of sensor data. The batch of sensor data contains pairs of samples that are related by a timestamp or some other important context. The data gets unzipped and sent down different branches to be processed based on the type of data it is:

join-1-pull-1-pull-bad

If one branch finishes processing its batch of data faster than the other branch, the join operation could accidentally pair up unrelated samples. In the above example, lidar sample #3 and camera sample #5 will end up discarded. Meanwhile lidar samples #4 and #5 will be paired with the wrong camera samples.

If you want to ensure that messages from the two branches are always paired up sequentially, you can simply set the buffer to use keep_all:

join-all-pull-all-pull

Warning

When using keep_all, make sure that the number of messages arriving from each branch will eventually equalize or else one buffer will grow unbounded, and may take up an excessive amount of RAM.

Tip

If you need more sophisticated logic to pair up samples across different branches—e.g. comparing their timestamp fields before deciding whether to join them—then you will need to use a custom listener instead of join.

Listen

While join can handle basic synchronization between branches, sometimes the buffers need to be managed with more nuanced logic than simply pulling out their oldest values. This is especially true if your workflow expresses a state machine where different state transitions may need to take place based on the combined values of multiple buffers.

Suppose a vehicle is approaching an intersection with a traffic signal. While we approach the intersection we’ll monitor the traffic signal, streaming the latest detection into a buffer. At the same time, the robot will approach the intersection.

listen-stoplight

Once the vehicle is close enough to the intersection, a decision must be made: Should the vehicle stop before reaching the intersection, or just drive through it? If the traffic signal is red, we will ask the vehicle to stop, but then once the signal turns green we will need to tell the vehicle to proceed.

To express this in a workflow we create two buffers: latest_signal and arriving. We create a listen operation (listener) that connects to both buffers. Every time a change is made to either buffer, the listener will output a message containing a key for each buffer. Those buffer keys allow a service to freely access the contents of the buffers and even make changes to the contents of each.

Let’s translate these requirements into how proceed_or_stop should manipulate the buffers when activated under different circumstances:

  • If the arriving buffer is empty then do nothing because the vehicle is not near the intersection yet (or has already passed the intersection).
  • If the arriving buffer has a value and latest_signal is red, leave the arriving buffer alone and command the vehicle to come to a stop. By leaving the arriving buffer alone, we can continue to listen for latest_signal to turn green.
  • If the arriving buffer has a value and latest_signal is green, drain the arriving buffer and command the vehicle to proceed. With the arriving buffer now empty, the listener will no longer react to any updates to latest_signal.
  • (Edge case) If the arriving buffer has a value and latest_signal is empty, treat latest_signal as though it were red (come to a stop) to err on the side of caution.

If we had tried to use the join operation for this logic, we would have drained the arriving buffer the first time that both buffers had a value. If the value in latest_signal were red then we would be prematurely emptying the arriving buffer, and then we would no longer be waiting for the green traffic signal.

Note

A listen operation (listener) will be activated each time any one of the buffers connected to it gets modified. The listener will pass along buffer keys that allow services to read and write to those connected buffers. Listeners will not be activated when a buffer is modified using one of the listener’s own buffer keys. This prevents infinite loops where a listener endlessly gets woken up by a downstream modification. You can choose to turn off this safety mechanism with allow_closed_loops.

Accessor

Just like any other operation in a workflow, the listen operation produces a message that can be connected to an input slot of a compatible node or other kind of operation. Similar to join, the listen operation can infer what message type it should produce based on what downstream operation it’s connected to. However the listen operation specifically creates an Accessor—a data type that gives access to one or more buffers within the workflow.

The most basic accessor is a BufferKey<T> which gives access to a buffer containing messages of type T. There are some opaque buffer keys like JsonBufferKey and AnyBufferKey which do not reveal the underlying message type within the buffer but allow you to interact with the buffer data within the limitations of JsonBufferMut and AnyBufferMut respectively.

In many cases you will want to receive multiple keys from a listener because it is often useful to listen to multiple buffers at once. The keys you get from the listener might be for buffers with different message types, and each key might have its own particular purpose or identity. You can define a custom accessor type in Rust using the Accessor macro:

listen-accessor

Simply create a struct whose fields are all buffer key types (which may include JsonBufferKey and AnyBufferKey). Derive Accessor and Clone for your custom struct. Use this custom struct as the input type of your service. When you connect a listener to a node that uses your service, it will know that it needs to create this struct for you.

When using a custom accessor, the buffers connected to the listener will need to specify a key name for their connection, much like they do when joining into a struct. That key name tells the listen operation which field that buffer’s key should be placed in. If there is ever a mismatch between key names or buffer types, or if any field is missing a connection or has multiple connections, then you will get an IncompatibleLayout error when building from a JSON diagram. When using the native Rust API any incompatibility will produce a compilation error.

Tip

To learn how to use an accessor within your service, see Using an Accessor.

Buffer Access

There may be times when a service needs more information about what is going on than what can be provided by the upstream service that connects into it. As we’ve seen, workflows store their state information in buffers. Typically we can design our workflow so that any additional information needed by a service can be sourced from one or more of the workflow’s buffers. This can function similar to a blackboard in a Behavior Tree.

Buffer access refers to a workflow operation that will take any input and combine it with a buffer Accessor before passing the combined message along to a node or other operation. For example, suppose a node that does task planning outputs a target destination for a robot to reach. For a path planner to determine how the robot should reach the target, it will also need to know the robot’s current location:

buffer-access

With the buffer access operation, we can take the target destination and combine it with a buffer key or other accessor that the plan_path service can use to check the latest location provided by the localization node. The output type of the buffer access operation is a tuple whose first element is the original output and the second element is a key or accessor that combines all buffers connected to the access operation.

Just like listen, buffer access can form its associated buffers into any Accessor that the downstream node asks for, as long as the data types and buffer key names are set correctly. The difference between listen and buffer access is that the buffer access operation does not get triggered when buffer values are changed. It only gets triggered when an input value is passed to it.

Tip

To learn how to use an accessor within your service, see Using an Accessor.

Collect

Warning

At the time of this writing, the collect operation is not yet available as a JSON diagram operation. This is being tracked by #59. In the meantime it can be put into a JSON diagram via the section builder operation.

Recall from the parallelism chapter that there are two ways to have parallel activity within a workflow: branching and spreading. Join and listen allow us to synchronize activity across multiple branches, but they aren’t necessarily suited for managing parallel activity happening along a single branch, which is what the spread operation does.

collect

The inverse of the spread operation is collect. The collect operation will monitor all workflow activity happening upstream of it. Messages sent to the collect operation will be held onto until all upstream activity has finished, then they will be collected into a single message (such as Vec) and sent out.

Maximum

You can also choose to collect until a certain number of elements has been reached. Then regardless of the upstream activity, all the gathered elements will be sent out as one message once the maximum has been reached. If more messages arrive later then a new collection will start, and they will also be sent out once they reach the maximum or all upstream activity has finished.

Minimum

If the collection needs to reach a certain number of elements, you can set that as a minimum. If all upstream activity ceases before the collect operation reaches the minimum—meaning it is impossible to ever reach the minimum—then all the gathered messages will be discarded and the collect operation will signal that it has disposed a message, which could lead to a cancellation.

Catching Unreachability

In crossflow a “disposal” happens any time one or more outputs of an operation will not carry any message after the operation was activated. Whenever a disposal happens we will check if it is still possible for the workflow to finish. If a workflow can no longer finish because of a disposal then it will be cancelled with an Unreachability error.

Consider the nature of the collect operation: It monitors upstream activity. One way it identifies when upstream activity has finished is to monitor when disposals happen. With each disposal, the collect operation will calculate its own “reachability”—i.e. whether or not it is possible for the collect operation to be reached by any of the ongoing activity in the workflow. Once the collect operation is no longer reachable, it will send out its collection, as long as the collection has more than the minimum number of elements, otherwise it will emit a disposal itself.

This means the collect operation has the ability to catch cases where some part of your workflow may become unreachable. A node becoming unreachable is a natural thing to happen in a workflow that contains any conditional branching, but sometimes you may want to respond to that unreachability in a certain way, perhaps perform a fallback action. By inserting a collect operation with a minimum of 0 and a maximum of 1, you can catch when that part of the workflow becomes unreachable by checking whether the output of the collect operation is empty or has 1 element.

Circularity

It is okay to put a collect operation inside of a cycle. In this case, “upstream” of the collect does include any operations downstream that can lead back to the collect operation. When calculating reachability, the collect operation will simply prune itself to prevent an infinite graph search.

single-collect-cycle

However, there is an edge case to be wary of. If you have two collect operations inside of one loop, then there is no logical way to resolve the reachability for either of the collect operations.

double-collect-cycle

Suppose all other activity within this cycle comes to a stop. Both collect operations will want to check their own reachabilities, and each will want to send out a message if it’s not reachable. If the left collect operation assumes that the right collect operation can still send out a message then the left collect operation should consider itself reachable. At the same time if the right collect operation assumes that the left can still send out a message then the right should consider itself reachable. If both see themselves as reachable then neither will ever decide to send out a message, but this directly contradicts the assumption that both made. The result will be that neither ever sends out a message, which means both collect operations are failing to do their jobs.

On the other hand if the left collect operation decides that the right will not send out a message then the left will decide that it should send a message. At the same time, the right collect operation would make the same assumption about the left and also decide to send out a message. Now both collect operations will produce messages every time the activity in the cycle settles down, meaning there will be potentially infinite empty messages being produced by these operations over the life of the workflow. Churning out infinite empty messages despite no actual new activity would also violate the purpose of the collect operation.

No matter which assumption is used to implement the collect operation, there is no way to get meaningful behavior. Instead the workflow will detect this when it attempts to run and cancel with a CircularCollect error. Currently there is no mechanism to detect this type of error when compiling or spawning the workflow. This issue is tracked by #148.

Scoping

It’s still possible to have two collect operations in the same cycle, but you need something to disrupt the circular dependency. You can use the scope operation to isolate one of the collect operations to focus exclusively on one portion of the cycle, e.g. the portion between the left and right collect operations:

scoped-collect-cycle

This effectively makes both collect operations invisible to each other. The left collect operation will only check if the scope to its right has any active sessions. The right collect operation will only check if there is any activity inside of its own scope.

Strictly speaking this structure is not logically equal to the original circular dependency structure—and good thing, because there is no way to resolve that structure!—but it does allow us to have a pattern of multiple spread🠚collect or stream🠚collect sections within one cycle of a workflow. We just need to isolate each collect to a specific scope of activity.

Channels

Buffers are a useful to encode the synchronization of workflow activities in the structure of the workflow itself. However sometimes a deeper coupling between services is needed.

Recall that “messages” that are passed between operations in a workflow can be any data type that can be safely moved between threads. This includes endpoints of channels—e.g. Sender and Receiver. By streaming out a Receiver as a message, the service that gets the Receiver can listen to ongoing activity in your service while both services run in parallel. Or by streaming out a Sender, the service that gets the Sender can send data back to your service while both services run in parallel.

channel-receiver

Suppose we have trajectory controller service and a motor controller service in a workflow. The trajectory controller might update at a frequency of 50Hz while the motor controller updates at a frequency of 1000Hz. There isn’t a natural way for the structure of the workflow itself to synchronize these services, but the trajectory controller could send a Receiver to the motor controller service, allowing the motor controller to run its feedback loop at its own rate and receive new targets as they arrive.

We use a stream to send out the receiver because streams can send messages out of the service while the service continues to run. This means the trajectory_controller service and motor_controller service can run simultaneously as async services, and those specific service sessions can communicate via the channel of the Receiver.

This can also be used to draw data into a running service. Suppose we want to consider environmental hazards that should alter the path of the trajectory or maybe even bring the robot to a stop. We could introduce a safety_monitor service:

channel-sender

The trajectory_controller can send out a Sender over a second output stream. This sender is used by the safety_monitor service to feed moving obstacle information back into the trajectory_controller. Rather than outright killing the trajectory_controller service to respond to obstacles, we can feed the service with the information it needs to respond to obstacles.

Caution

Channels are a powerful way to set up long-running one-way or two-way communication between async services in a workflow that run in parallel, but there is a notable drawback. If you want to visualize the execution of the workflow, data sent between services over channels will not be traceable. If you care about traceability, you should consider copying all channel data to an output stream for logging.

Cycles

Cycles are commonly used in programming to define routines that need to repeat until some condition is met. Similarly a workflow may need to contain a cycle of operations if some subroutine needs to keep running until a condition is met.

cycle

Cycles are not a special or specific operation in crossflow. A cycle is simply what happens when the output of an operation is connected to the input slot of an operation upstream of it.

When building a workflow, you are free to connect an output to the input slot of any operation that expects a compatible message type. There are no restrictions on how these connections are laid out, except that each output connects to exactly one input slot—but each input slot can take in any number of outputs, leaving an opening for downstream operations to connect their outputs back upstream.

Scopes

Every time a workflow is run, a session is started. That session is contained within some scope. Each scope has a start operation (green play symbol) and a terminate operation (golden star).

workflow-scope

Start

The start operation has one simple purpose: deliver the user’s request message into this session. Each time a new session starts, exactly one message will be produced by the start operation, and it will be whatever request message started this run of the workflow. The start operation will never output a second message for the same session.

Terminate

Naturally the terminate operation serves the opposite purpose of the start operation. The terminate operation has an input slot and no output slots, so it will never output a message within its own scope. Instead it takes the first message that gets passed to it for each session and then sets that as the final response of that workflow session. Any additional messages passed to it for the same session will simply be discarded.

Immediately after the terminate operation is activated, it will trigger the scope cleanup to begin. This ensures that all activity happening inside the workflow is brought to a clean and complete finish before the outside world is told that the workflow is done.

As soon as the cleanup process is finished, the first output that was passed to the terminate operation will be sent out as the final output of the workflow session.

Scope Cleanup

Scope cleanup can be broken down into two major parts that happen in sequence: operation cleanup followed by buffer cleanup.

Operation Cleanup

There are many kinds of operations that can exist within a workflow. Once the terminate operation is reached, we want to wind down that session as quickly as possible to avoid doing useless work—once the final output of the workflow is determined, no other work should matter in principle. This process of winding down is called the operation cleanup. There are three different ways that operations get cleaned depending on the kind of operation:

  • Blocking: The input message storage of the operation is cleared out for this session. Even if the operation awakens, it will quit early when it sees that it has no more input messages.
  • Async: The input message storage of the operation is cleared out, and any Tasks that were spawned for this operation are cancelled. The cleanup of this operation is considered finished when we are notified that the task was successfully dropped. At that point, there cannot be any more side-effects produced by the task.
  • Continuous: The order queue of the operation is cleared out for this session. The next time the service runs, it will no longer see any orders related to this session.
  • Workflow: The input message storage of the operation is cleared out, and the inner workflow is sent a cancellation signal. Any uninterruptible scopes within the workflow will be brought to a finish, and cleanup will be performed on the workflow’s operations and buffers. The cleanup of this operation is considered finished when we are notified that the inner workflow’s cleanup has finished.

Buffer Cleanup

Warning

At the time of this writing, buffer cleanup is not yet available for JSON diagrams. This is being tracked by #59. In the meantime, buffer data will simply be dropped when a scope terminates.

After operation cleanup is finished, there may still be data lingering in the buffers for this session. Often it would be fine to just discard that data without any further action, but sometimes the lingering buffer data is significant. Maybe some buffer data represents ownership of a resource that needs to be released, or contains an error that needs to be resolved before the session should end, or maybe there is a sign-off that should be performed before dropping the whole workflow session.

The buffer cleanup phase acts like a user-defined destructor for your workflow. You can define any number of buffer cleanup workflows for your workflow—you read that right, you can define workflows to clean up the data in the buffers of your workflow.

The input message for each cleanup workflow is an Accessor for the buffers that it’s cleaning up. You can choose any in-scope buffers that you would like the Accessor to contain when you set the cleanup workflow. You can also specify if each cleanup workflow should be run only when the parent workflow was prematurely cancelled, successfully terminated, or either.

Note

You can use the same buffer across multiple cleanup workflows, but be mindful of how those separate workflows might interfere with each other. All the cleanup workflows will run in parallel.

As soon as operation cleanup is finished, all the buffer cleanup workflows will be started at once, given access to whichever buffers they requested. The cleanup workflows are allowed to have cleanup workflows themselves, and so can the cleanup workflows of your cleanup workflows, etc. It is not possible to build a workflow with infinitely recursive cleanup workflows, because the attempt to build such a workflow would require infinite memory.

The buffer cleanup phase is finished once all cleanup workflows have terminated or cancelled, including any inner cleanup workflows that the cleanup workflows may contain. Any data belonging to this session that is still lingering in any of the buffers will simply be dropped.

Reachability

An important thing to consider for the lifecycle of a workflow is whether it’s even possible for the workflow to end. If a workflow is allowed to keep running indefinitely with no possibility of reaching the terminate operation, then the caller who made the request will be left hanging forever. This can lead to undesirable program behaviors like deadlocks.

To mitigate this problem, crossflow calculates the reachability of the terminate operation each time an event occurs that could influence whether the terminate operation can be reached. If at any point the terminate operation is no longer reachable, then the session of the scope will automatically be cancelled.

Note

An operation is reachable if there exists at least one plausible path to the operation from a currently active operation.

Inherent Unreachability

One kind of unreachability is inherent, meaning the very structure of the workflow makes it impossible for the terminate operation to ever be reached.

One way to get inherent unreachability is if nothing at all is connected to the terminate operation:

inherent-unreachability

A slightly more subtle version of inherent unreachability is when there is no path from the start operation to the terminate operation because each is part of a separate island of operations:

inherent-unreachability-island

In either case, this unreachability will be detected immediately. Before the initial message is even sent out by the start operation, crossflow will detect that the terminate operation cannot be reached from the start operation, and the workflow will be instantly cancelled without any operations running.

Conditional Unreachability

Inherent unreachability fully prevents a workflow from running, but most of the cases where we need to think about unreachability, it depends on runtime conditions. Depending on which branch(es) are activated by an operation in the workflow, the terminate operation might become unreachable.

Consider this simple fork-result example:

conditional-unchreachability

The branch going to node B will lead to the terminate operation whereas the branch going to node C never will. If the message produced by node A has an Ok value then the workflow will have no problem reaching the terminate operation. But for an Err value:

conditional-unreachability-bad

In this case crossflow will detect that the Ok message was “disposed” by the fork-result operation and immediately perform a reachability check for the terminate operation. Before node C even receives the message out of A, the workflow will be cancelled because the terminate operation is not reachable.

Fixing this problem is relatively simple. You just need to connect node D to the terminate operation:

conditional-unreachability-fixed

There may be cases where you actually do want the Err branch to cancel your workflow because only node B can correctly provide the final output of the workflow, but you nevertheless want nodes C and D to run before the workflow finishes running.

One way to accomplish this is by using the cleanup feature. You can pass the Err result into a buffer and then define a cleanup workflow for that buffer. Nodes C and D go into the cleanup workflow to guarantee that they get run even if the workflow gets cancelled.

conditional-unreachability-cleanup

The cleanup workflow can be designed so that nodes C and D do nothing if there was no message in the buffer. Alternatively you can set the cleanup settings so that this cleanup workflow only gets run during a cancellation and not during a successful termination.

Note

You might find that this solution is technically what you need, but the use of the cleanup feature does not express your workflow the way you would like. To provide better support for this use case, ticket #150 proposes a new cancellation operation that would prevent the unreachability test from prematurely cancelling the workflow.

Cancellation

There are times where it becomes impossible for a workflow to successfully terminate, or where a condition is met that requires the workflow to stop before it can successfully terminate. The potential causes of a cancellation are enumerated in [CancellationCause][CancellationCause].

When a workflow is cancelled, the caller will never receive a Response message, instead the Outcome will have an Err(_) value. Inside the Err will be a Cancellation to give some information on why the cancellation happened.

There are a few potential causes of cancellation that are worth being mindful of:

  • Unreachability - The terminate operation is no longer reachable. You will receive a list of the disposals that happened during the session which led to the unreachability.
  • Triggered Cancellation - A cancel operation was explicitly triggered by the workflow itself. You might receive a string that serializes the message that triggered the cancellation from inside the workflow.
  • Filtered - The condition of a filtering operation failed to pass, so the workflow was cancelled.

Disposal

When a scope nested inside of another scope gets cancelled, the parent scope will see that as a disposal, meaning the node will simply never yield a final message for that session of the nested scope.

Tip

It is generally discouraged to use cancellation in the happy path of your workflow. The cancellation data received by the caller is meant for debugging purposes, not to be used as regular service output. Instead if your workflow is known to be fallible it should return a Result type.

In crossflow, disposals are something that are managed automatically. Currently there is no operation for users to explicitly react to a disposal, so when the service of a node gets cancelled, this is generally invisible to the parent workflow until it escalates into a cancellation itself. The collect operation can be used to catch unreachability, but there may be significant information lost by the time the workflow reaches that point. Therefore it is best practice to return Result types for fallible workflows instead of having them cancel.

Stream Out

In crossflow all services are able to have output streams, which allow services to produce output messages while still running. These are different from the final output in two ways:

  • A service can have multiple output streams with different names and message types, whereas its final output has one fixed message type.
  • Each output stream can produce any number of output messages per activation of the service, including 0—outputting no message at all—whereas the final output always produces exactly one output message per service activation, unless the service gets cancelled.

When you spawn a workflow it will be encapsulated as a service. The stream out operation is how your workflow can provide output streams as a service. This operation will take any messages passed to it and forward them out of the workflow session.

For example suppose you have a workflow that takes in a basket of apples and chops them one at a time:

scope-stream

You can design the workflow so that each time an apple is chopped, the slices are sent out through an output stream, and then the workflow continues on to chop the next apple. The workflow ends when there are no apples left, and the final output of the workflow is just a trigger message.

Scope Operation

So far we’ve talked about how every workflow has a scope with start, terminate, and optionally stream out operations. But sometimes it’s useful for a set of operations within a workflow to be scoped.

The scope operation allows you to nest a self-contained workflow inside of another workflow. It is fully equivalent to spawning the sub-workflow into a service and then inserting that service into a node in your workflow, except that there’s no intermediate service created: the sub-workflow lives entirely inside the scope operation of your workflow.

Every time the scope operation is activated, a new child session is started inside the scope of the sub-workflow. Just like sessions of a workflow, each session of a scope operation is independent from each other and non-interfering.

Inside this new session, the incoming message that activated the scope operation will be handed off by the scope’s start operation. From there, the sub-workflow inside the scope will execute as normal until the scope’s terminate operation is reached.

Racing

One common use case for a scope operation is to conduct a race. Suppose we want a letter delivered as fast as possible but we can’t predict what means of delivery will get it there fastest.

scope-racing

We can create a scope that copies the letter and then sends each copy to a different transporter: a bicycle and a race car.

If the letter only needs to be delivered a few blocks from the sender then the bicycle might get it to the destination faster, as the race car will lose time while looking for parking. On the other hand, if the destination is many miles away, the race car will easily overtake the bicycle and arrive first.

The service that finishes first will trigger the terminate operation of the scope, and the other service will be told to simply drop its task to avoid wasted effort.

Isolation

Another way to use scopes is to isolate sets of parallel activities. As demonstrated by the spread operation, it is possible for a branch or any set of elements in a workflow to have multiple simultaneous activations within one session. In some cases, it’s important to organize that activity into separate sessions to avoid cross-contamination of data.

For example, suppose we have a workflow that takes in a list of cloud resources along with what directory each resource should be saved to. We can use the spread operation to convert this list into an internal stream of messages so all the assets can be fetched in parallel.

scope-isolation

Suppose the service that fetches the cloud resource takes in only the URL and gives back the raw data received. We will need to unzip the directory information from the URL and save the directory information in a buffer while the resource is fetched. Without a scope operation we would be in a dangerous situation: What if the resources finish being fetched in a different order than the directory information is saved in its buffer? When we rejoin the directory information with the fetched data, we could end up sending files to the wrong directory.

Tip

Buffers inside the scope operation store their data separately per session of the scope.

By encapsulating the buffers, unzip, and join operations inside of a scope, we ensure that the fetched data gets sent to the correct directory no matter what order it arrives in. Each time a message starts a new session of the scope, the directory buffer will only contain the directory information from the message that started the session.

Outside of the scope, we will collect the final outputs of the scope operation to ensure that the workflow keeps running until all resources are fetched and saved.

Streams

The scope operation also supports output streams. In this case the stream out operation sends messages out to the parent scope.

Reflection

Reflective programming—also referred to as reflection—is when a program can introspect and modify its own structure or behavior. Crossflow does not currently support generalized reflection, which would imply that a workflow could change its connections or add new nodes and operations at runtime. However, it does support a few reflective operations which are able to inspect or modify the overall state of the workflow.

Most operations in crossflow are “localized”, meaning they don’t know anything about the workflow that they are in, except for the immediate neighbors that they are connected to. The reflective operations covered in this chapter have a broader view of the workflow that they exist in. They can be used to assess or modify the execution of the workflow at runtime.

Note

Theoretically it is possible to implement generalized reflection in crossflow. The main challenge is how to design an API that does not leave loose ends dangling while modifications are being made, or an API that can protect the user from unintuitive race conditions that may happen as the workflow transitions from one structure to another.

Trim

Warning

At the time of this writing, the trim operation is not yet available as a JSON diagram operation. This is being tracked by #59. In the meantime it can be put into a JSON diagram via the section builder operation.

Sometimes unbridled parallelism is a liability. If multiple branches want to make use of the same services, there could be destructive interference between the branches, depending on the nature of the services they are using.

Suppose we want to define a workflow for sending a robot to a location, but we need the workflow to check if the location is available before sending the robot. We are operating in a multi-robot environment, so we need to make sure we are not sending multiple robots to the same location at the same time.

We’ve been provided with a reserve_location service that takes in a target location request and tries to reserve that location for our robot. If the location is not available right away, then reserve_location will first stream out a detour location for the robot to start moving towards. This detour location will be a parking spot that is as close as possible to the final target location.

trim

While the robot is heading to its detour location, the reserve_location service will remain active until it gets a confirmation that the target location is successfully reserved for our robot. Then the service will finish, passing along the target location to a path planner which passes along a path to a drive service. Once the robot reaches its target location, the drive service will finish and terminate the workflow.

But what would happen if the drive services of both branches end up running at the same time? The two simultaneously running services could end up fighting each other to send the same robot towards different locations. To prevent this we can use the trim operation.

As shown in the diagram above, before starting towards the target location, we will apply the trim operation to the detour branch. All the operations that are selected for trimming will undergo operation cleanup, meaning whatever they happen to be doing will be brought to a stop. The trim operation will wait until it gets notified that all the relevant operations have finished their cleanup, and then trim will forward along its input message as output. This ensures that it is impossible for multiple drive services to be running at the same time.

Gate

Warning

At the time of this writing, the gate operation is not yet available as a JSON diagram operation. This is being tracked by #59. In the meantime it can be put into a JSON diagram via the section builder operation.

Trim allows you to stop ongoing activity in a node, but there is also an operation that allows you to prevent activity from starting in the first place. The gate close operation can be applied to a set of buffers to block any connected join and listen operations from waking up when those buffers are modified. The gate open counterpart undoes the effect of gate close, allowing the join and listen operations to resume.

Note

Closing a buffer gate does not block the buffer access operation, and it does not prevent buffer keys from working.

Whenever a buffer’s gate transitions from closed to open status, any attached join and listen operations will be activated, whether or not any new messages arrived. If the conditions of the join operation are not met, it simply won’t produce any message, but the listen operation will always produce its accessor message after a gate opens, because the gate status is considered part of the buffer’s state and therefore may be relevant to a listener.

Tip

A service that has a buffer key can check the status of that buffer’s gate status using BufferGateView and can modify the buffer’s gate status using BufferGateMut.

This gating feature can be used to allow one branch of a workflow to manage the activity of another branch of the workflow. For example, suppose we are running a pie bakery with an online ordering system. We can have a service that watches a clock to stream out when the kitchen opening and closing times have arrived. Those streams can trigger our pie order buffer to pause or resume being sent to the kitchen:

gate

Any new orders that come in after closing hours will be placed in the buffer instead of being sent to the kitchen. Once the kitchen opens, all the queued orders will become visible to bake_pie service. While the kitchen is open, new orders will be sent through immediately.

Note

Closing the gate of a buffer does not prevent new messages from being pushed into the buffer, but it will prevent join and listen operations from being aware of the push.

Inject

Warning

At the time of this writing, the inject operation is not yet available as a JSON diagram operation. This is being tracked by #59. In the meantime it can be put into a JSON diagram via the section builder operation.

Sometimes a single fixed workflow structure is not sufficient to define the behavior of a dynamic or intelligent system. A flexible state machine may need to decide its execution structure at runtime; perhaps the state machine itself needs to reconfigure its own execution structure during a state transition.

While crossflow does not support fully generalized reflection, it does support an inject operation that allows you to spawn a service at runtime and inject it into the node of a workflow from inside the workflow itself. The service that you spawn could itself be a workflow whose entire structure is decided by another service.

For example an ordinary path planning service might produce a simple path or trajectory for a controller to track, but a more intelligent path planning service might want to include instructions for how the robot should interact with other devices—such as doors, elevators, and other robots. A trajectory would not be sufficient to describe these interactions, but a runtime generated workflow would do the trick.

inject

Instead of a trajectory message, the plan_path service will, itself, produce a Service that it passes along as a message. That service will be combined with a buffer key that gives access to the buffer that stores the robot’s current location—which is what the generated service needs as an input message—and then passed into the inject operation. From there, the inject operation will pass the buffer key into the generated service and run it as if it were a regular node in the workflow.

When the generated service finishes running it will produce a Result which is Ok if the plan was successfully followed, otherwise an Err. In the Ok case we simply terminate, while for an Err we will cycle back and ask for a new plan. The new plan that gets generated after an Err could be a completely different workflow than what previously ran, freshly generated to deal with the latest circumstances.

In general the inject operation allows workflows to define state transitions that completely change the behavior of the state machine from transition to transition. In most cases this will be implemented in a two-tier structure with a fixed cyclical structure for the top-level state machine that dynamically generates and executes a lower-level state machine to define the evolving situational logic.

Collect

Warning

At the time of this writing, the collect operation is not yet available as a JSON diagram operation. This is being tracked by #59. In the meantime it can be put into a JSON diagram via the section builder operation.

Collect was already covered under synchronization, but it can also be considered a reflective operation. It creates a point in the workflow where no further progress will be made until all upstream activity has finished.

How to Build a Workflow

Note

This chapter will be about building workflows using the native Rust API. That means writing Rust code to build the workflow, which will be compiled and embeded in an application. If you are instead interested in building workflows using JSON diagrams, then you can skip ahead to the JSON Diagrams chapter.

Spawning

You can spawn a workflow anywhere that you can access Bevy Commands through the trait SpawnWorkflowExt. This can be done while setting up your application or during runtime.

This is an example of spawning an input/output (i/o) workflow—a workflow that doesn’t have any output streams, just one input message and one final output message:

let workflow: Service<Request, Response> = commands.spawn_io_workflow(
    |scope: Scope<Request, Response>, builder: &mut Builder| {
        builder.connect(scope.start, scope.terminate);
    }
);

Notice some key details:

  • The output of spawn_io_workflow is a Service. This is what you will use to refer to the workflow after it has been spawned.
  • The input argument of spawn_io_workflow is a closure.
  • The input arguments of the closure are a Scope and a Builder.
  • The generic parameters of the Scope match those of the Service.
  • The Scope has an input and a terminate field. These represent the input and output of the overall workflow, and therefore match the request and response type of the Service. In this case Request and Response are actually aliases of the same type.
  • The Builder can make a connection between an output and an input.

Very often the Rust compiler can infer the generic types of the service and scope, so the above example can usually be reduced to:

let workflow = commands.spawn_io_workflow(
    |scope, builder| {
        builder.connect(scope.start, scope.terminate);
    }
);

The spawn_io_workflow command exists to make this inference easy and concise. When streams are used this kind of inference will not work as easily. This will be discussed more in the Output Streams section. In the meantime, move on to the next page to see how to put nodes into your workflow.

Creating a Node

Most of the useful work that happens inside a workflow is done by nodes. A node can be implemented by any provider. Providers will often be services—including blocking, async, and continuous services—but could also be callbacks or maps.

From a Service

Suppose we have a sum function defined to be a blocking service:

fn sum(In(input): BlockingServiceInput<Vec<f32>>) -> f32 {
    let mut sum = 0.0;
    for value in input.request {
        sum += value;
    }
    sum
}

We can spawn this as a service and then use it to create a node inside a workflow:

// Spawn a service that we can use inside a workflow
let service = commands.spawn_service(sum);

// Spawn a workflow and use the service inside it
let workflow = commands.spawn_io_workflow(
    move |scope, builder| {
        let node = builder.create_node(service);
        builder.connect(scope.start, node.input);
        builder.connect(node.output, scope.terminate);
    }
);

A few things to note:

  • We spawn the service outside of the workflow and then use the move keyword to move it into the closure that builds the workflow.
  • We use Builder::create_node to create a workflow node that will run the sum service.
  • After the node is created, we can access its input through node.input and its output through node.output.
  • builder.connect(ouput, input) will connect an Output to an InputSlot
    • scope.start is an Output inside the scope that will fire off exactly one message per workflow session. That one message is the input message that was sent into the workflow. It gets fired off as soon as the workflow session begins.
    • By connecting scope.start to node.input, we are passing the input message of the workflow directly into the node.
    • To pass back the sum as the output of the workflow, we connect node.output to scope.terminate.
    • As soon as any message is passed into scope.terminate, all activity in the workflow will terminate, and the first message that was passed into scope.terminate for this session will be sent out of the workflow as the workflow’s output message.
  • We don’t need to explicitly specify the Request and Response types of the workflow because the compiler can infer those from the two builder.connect(_, _) calls.

Spawning a service inside workflow building closure

Recommended practice is to spawn services outside of the workflow building closure and then move them into the closure. This allows services to have greater reusability, as they could be copied into multiple different workflows. However if you do want to spawn the service from inside the workflow building closure, that option is available:

let workflow = commands.spawn_io_workflow(
    move |scope, builder| {
        // Spawn a service using the builder's commands
        let service = builder.commands().spawn_service(sum);

        // Create the node using the newly spawned service
        let node = builder.create_node(service);
        builder.connect(scope.start, node.input);
        builder.connect(node.output, scope.terminate);
    }
);

From a Callback

Using a callback instead of a service looks much the same. The only difference is that callbacks don’t need to be spawned:

// Define a closure to perform a sum
let f = |request: Vec<f32>| -> f32 {
    request.into_iter().fold(0.0, |a, b| a + b)
};
// Convert the closure into a Callback
let callback = f.into_blocking_callback();

// Spawn a workflow and use the callback inside it
let workflow = commands.spawn_io_workflow(
    move |scope, builder| {
        let node = builder.create_node(callback);
        builder.connect(scope.start, node.input);
        builder.connect(node.output, scope.terminate);
    }
);

The fact that it was implemented as a callback instead of as a service makes no difference to the workflow builder. It still gets created as a Node with InputSlot and Output types that match the Request and Response types, respectively, of the callback.

From a Map

Maps are an extremely common element in workflows. Their ability to perform quick data transformations makes them invaluable for bridging the gap between different services. They are common enough that crossflow provides two special APIs to make them easier to write:

create_map_block

A blocking map is a short-lived closure that performs a quick calculation or data transformation. These are very useful for adapting message types in-between services that are chained together.

To create a blocking map you can simply pass a closure into the Builder::create_map_block method:

let workflow = commands.spawn_io_workflow(
    |scope, builder| {
        let node = builder.create_map_block(|request: Vec<f32>| {
            request.into_iter().fold(0.0, |a, b| a + b)
        });

        builder.connect(scope.start, node.input);
        builder.connect(node.output, scope.terminate);
    }
);

You might notice that maps are often defined within the workflow building closure itself even though Services and Callbacks are usually created outside. This isn’t a requirement, but it is usually the most ergonomic way to add a map. Reusability is less of a concern with maps than it is for Services or Callbacks.

create_map_async

An async map is simply used to run a basic async function. Suppose you have an async function like:

async fn get_page_title(url: String) -> Option<String> {
    let http_response = trpl::get(&url).await;
    let response_text = http_response.text().await;
    trpl::Html::parse(&response_text)
        .select_first("title")
        .map(|title| title.inner_html())
}

You can use this async function as a map by simply passing its name into Builder::create_map_async:

let workflow = commands.spawn_io_workflow(
    |scope, builder| {

        let node = builder.create_map_async(get_page_title);

        builder.connect(scope.start, node.input);
        builder.connect(node.output, scope.terminate);
    }
);

Alternatively you can define the async map inline as a closure. Just take note that Rust does not currently support async closures, so you will need your closure to return an async move { ... } block, like this:

let workflow = commands.spawn_io_workflow(
    |scope, builder| {
        let node = builder.create_map_async(|url: String| {
            async move {
                let http_response = trpl::get(&url).await;
                let response_text = http_response.text().await;
                trpl::Html::parse(&response_text)
                    .select_first("title")
                    .map(|title| title.inner_html())
            }
        });

        builder.connect(scope.start, node.input);
        builder.connect(node.output, scope.terminate);
    }
);

Chain

Creating a map through the Builder API is necessary if you need to connect multiple outputs into the same map node, but in most cases you’ll want to create a map that just transforms data as it passes from one operation to another. A more convenient way to do that is with a Chain, discussed later.

Connecting Nodes

The examples on the previous page are completely trivial workflows. In each one we’re just starting the workflow, running one service (or provider), and then terminating the workflow with the output of the service. The real value of a workflow is being able to assemble multiple services together into something more complex.

Here is an example of creating two nodes and chaining them:

let workflow = commands.spawn_io_workflow(
    |scope, builder| {
        let sum_node = builder.create_map_block(|request: Vec<f32>| {
            request.into_iter().fold(0.0, |a, b| a + b)
        });
        let double_node = builder.create_map_block(|request: f32| {
            2.0 * request
        });

        builder.connect(scope.start, sum_node.input);
        builder.connect(sum_node.output, double_node.input);
        builder.connect(double_node.output, scope.terminate);
    }
);

A common pattern when building workflows is to declare the nodes or operations at the top and then connect them together below.

The above workflow still doesn’t accomplish anything that we couldn’t get from running a Series instead. It’s just a sequence of actions that feed into each other. What makes workflows interesting is their ability to branch, cycle, and flow freely through a directed graph structure.

Conditional Branching

One useful kind of control flow is conditional branching. Conditional branching is when the activity in a workflow reaches a “fork in the road” where a message must go down one branch or another but not both.

fork-result

Commonly used for error handling in workflows, the fork-result operation will take in Result messages and activate one of two branches— the ok branch or the err branch—depending on whether the input message had an Ok or Err value.

let workflow: Service<Json, Result<SchemaV2, Error>> = commands.spawn_io_workflow(
    |scope, builder| {
        let parse_schema_v2 = builder.create_map_block(|request: Json| {
            // Try parsing the JSON with Schema V2
            serde_json::from_value::<SchemaV2>(request.clone())
                // If an error happened, pass along the original request message
                // so we can try parsing it with a different schema.
                .map_err(|_| request)
        });

        let parse_schema_v1 = builder.create_map_block(|request: Json| {
            // Try parsing the JSON with Schema V1 since V2 failed.
            serde_json::from_value::<SchemaV1>(request)
                // If the parsing was successful, upgrade the parsed value to
                // SchemaV2.
                .map(|value| value.upgrade_to_schema_v2())
        });

        let to_ok = builder.create_map_block(|request: SchemaV2| {
            Ok(request)
        });

        // Create a fork-result operation. We get back a tuple whose first element
        // is an InputSlot that lets us feed messages into the fork-result, and
        // whose second element is a struct containing two fields: ok and err,
        // each representing a different Output and therefore diverging branches
        // in the workflow.
        let (fork_result_input, fork_result) = builder.create_fork_result();

        builder.connect(scope.start, parse_schema_v2.input);
        builder.connect(parse_schema_v2.output, fork_result_input);

        // If parsing SchemaV2 was successful, wrap it back in Ok and terminate
        builder.connect(fork_result.ok, to_ok.input);
        builder.connect(to_ok.output, scope.terminate);

        // If we failed to parse the Json as SchemaV2 then try using SchemaV1 instead
        builder.connect(fork_result.err, parse_schema_v1.input);

        // If parsing SchemaV1 also fails then we have no more fallback, so just
        // pass back the result, whether it was successful or failed.
        builder.connect(parse_schema_v1.output, scope.terminate);
    }
);

Note how in this example both branches converge back to the same terminate operation.

fork-option

Another common branching operation is fork-option. Similar to fork-result, this creates two branches. One sends the value contained inside Some inputs while the other will produce a trigger () when the input value was None.

let workflow: Service<(), f32> = commands.spawn_io_workflow(
    |scope, builder| {
        let get_random = builder.create_map_block(|request: ()| {
            // Generate some random number between 0.0 and 1.0
            rand::random::<f32>()
        });

        let less_than_half = builder.create_map_block(|value: f32| {
            if value < 0.5 {
                Some(value)
            } else {
                None
            }
        });

        // Create a fork-option operation. We get back a tuple whose first element
        // is an InputSlot that lets us feed messages into the fork-option, and
        // whose second element is a struct containing two fields: some and none,
        // each representing a different Output and therefore diverging branches
        // in the workflow.
        let (fork_option_input, fork_option) = builder.create_fork_option();

        // Chain the three operations together.
        builder.connect(scope.start, get_random.input);
        builder.connect(get_random.output, less_than_half.input);
        builder.connect(less_than_half.output, fork_option_input);

        // Trigger the randomizer again if the value was not less than one-half.
        // This creates a cycle in the workflow.
        builder.connect(fork_option.none, get_random.input);

        // Terminate the workflow if it was less than one-half.
        // The value produced by the randomizer will be the workflow's output.
        builder.connect(fork_option.some, scope.terminate);
    }
);

Parallel Branches

Some kinds of forks result in parallel activity instead of conditionally activating branches. This is one of the most powerful features of workflows: being able to easily juggle large amounts of parallel activity.

racing

The fork-clone operation allows a message to be cloned and simultaneously activate multiple branches. Once activated, each branch will run independently and concurrently. This is useful if you have multiple separate concerns that need to be handled simultaneously.

Here is an example of a pick-and-place workflow where we have a parallel node that monitors the safety of the operation. While the pick-and-place sequence is being executed, the emergency_stop service will watch the state of the workcell and issue an output if anything threatens the safety of the operation. The pick-and-place operation and the emergency stop both connect to the terminate operation. Whichever yields an output first will end the workflow. This is known as a race.

let workflow = commands.spawn_io_workflow(
    |scope, builder| {
        // Create nodes for performing a pick and place
        let move_to_pick_pregrasp = builder.create_node(move_to_pick_pregrasp);
        let grasp_item = builder.create_node(grasp_item);
        let move_to_placement = builder.create_node(move_to_placement);
        let release_item = builder.create_node(release_item);

        // Also create a node that monitors whether an emergency stop is needed
        let emergency_stop = builder.create_node(emergency_stop);

        // Create a fork-clone operation. We get back a tuple whose first element
        // is an InputSlot that lets us feed messages into the fork-clone, and
        // whose second element is a struct that allows us to spawn outputs for
        // the fork.
        let (fork_clone_input, fork_clone) = builder.create_fork_clone();

        // Send the scope input message to be cloned
        builder.connect(scope.start, fork_clone_input);

        // When the scope starts, begin moving the robot to the pregrasp pose
        let cloned = fork_clone.clone_output(builder);
        builder.connect(cloned, move_to_pick_pregrasp.input);

        // When the scope starts, also start monitoring whether an emergency
        // stop is needed. If this gets triggered it will terminate the workflow
        // immediately.
        let cloned = fork_clone.clone_output(builder);
        builder.connect(cloned, emergency_stop.input);

        // Connect the happy path together
        builder.connect(move_to_pick_pregrasp.output, grasp_item.input);
        builder.connect(grasp_item.output, move_to_placement.input);
        builder.connect(move_to_placement.output, release_item.input);
        builder.connect(release_item.output, scope.terminate);

        // Connect the emergency stop to terminate
        builder.connect(emergency_stop.output, scope.terminate);
    }
);

joining

Another way to use fork-clone is to activate parallel branches that each control a different agent. This is often done when a process needs two agents to work independently until they both reach a synchronization point is reached. In that case instead of racing the two branches we will join them.

Here is an example of a robot that needs to use an elevator. The robot will start moving to the elevator lobby, and at the same time we will run a branch that watches the robot’s progress. When the robot is close enough to the elevator lobby, we will summon the elevator to come pick up the robot. When the robot and elevator both arrive in the elevator lobby, we will have the robot use the elevator.

let workflow = commands.spawn_io_workflow(
    |scope, builder| {
        // Create nodes for moving the robot and summoning the elevator.
        let move_robot_to_elevator = builder.create_node(move_robot_to_elevator);
        let on_robot_near_elevator = builder.create_node(on_robot_near_elevator);
        let send_elevator_to_level = builder.create_node(send_elevator_to_location);
        let use_elevator = builder.create_node(use_elevator);

        // Create a fork-clone operation. We get back a tuple whose first element
        // is an InputSlot that lets us feed messages into the fork-clone, and
        // whose second element is a struct that allows us to spawn outputs for
        // the fork.
        let (fork_clone_input, fork_clone) = builder.create_fork_clone();

        // Send the scope input message to be cloned
        builder.connect(scope.start, fork_clone_input);

        // When the scope starts, begin sending the robot to the elevator
        let cloned = fork_clone.clone_output(builder);
        builder.connect(cloned, move_robot_to_elevator.input);

        // When the scope starts, also start detecting whether the robot is
        // near the elevator so we know when to summon the elevator
        let cloned = fork_clone.clone_output(builder);
        builder.connect(cloned, on_robot_near_elevator.input);

        // When the robot has made it close enough to the elevator, begin
        // summoning the elevator
        builder.connect(on_robot_near_elevator.output, send_elevator_to_level.input);

        // Create a join operation that will activate when the robot has reached
        // the elevator lobby and the elevator has arrived on the correct floor.
        let both_arrived = builder.join((
            move_robot_to_elevator.output,
            send_elevator_to_level.output,
        ))
        .output();

        // When the robot has reached the elevator lobby and the elevator has
        // arived on the correct floor, have the robot use the elevator.
        builder.connect(both_arrived, use_elevator.input);

        // When the robot is done using the elevator, the workflow is finished.
        builder.connect(use_elevator.output, scope.terminate);
    }
);

unzipping

It’s not often that a message can be trivially fed directly from the output of one service to the input of another service. Most of the time the services that you want to connect to each other will have slight differences between the Response type that the first produces and the Request type that you’re trying to pass it along to. If you have two or more parallel branches that expect different inputs from each other, fork-clone might not seem like a good fit because every branch will receive the same message.

We’ve seen how blocking maps can be used to perform quick data transforms. If we combine a blocking map with the unzip operation, we can perform parallel branching where a specific message is sent down each branch:

let workflow = commands.spawn_io_workflow(
    |scope, builder| {
        // Create ndoes for picking an item and moving to a pickup location
        let pick_item = builder.create_node(pick_item);
        let move_to_location = builder.create_node(move_to_location);
        let hand_off_item = builder.create_node(hand_off_item);

        // Create a blocking map to transform the workflow input data into two
        // separate messages to send to two different branches.
        //
        // This returns a tuple with two elements. We will send each element to
        // a different branch at the same time.
        let transform_inputs = builder.create_map_block(|request: PickupTask| {
            (
                WorkcellTask {
                    workcell: request.workcell,
                    item: request.item,
                },
                MobileRobotTask {
                    vehicle: request.vehicle,
                    location: request.location,
                }
            )
        });

        // Create the unzip forking
        let (unzip_input, unzip) = builder.create_unzip();
        // Destructure the unzipped outputs
        let (workcell_task, mobile_robot_task) = unzip;

        // Synchronize when the workcell and mobile robot are both ready for the
        // item to be handed off
        let both_ready = builder.join((
            pick_item.output,
            move_to_location.output,
        ))
        .output();

        // Connect all the nodes
        builder.connect(scope.start, transform_inputs.input);
        builder.connect(transform_inputs.output, unzip_input);
        builder.connect(workcell_task, pick_item.input);
        builder.connect(mobile_robot_task, move_to_location.input);
        builder.connect(both_ready, hand_off_item.input);
        builder.connect(hand_off_item.output, scope.terminate);
    }
);

Building a Chain

Many workflows involve chaining services together rather than building complex graphs. The API examples on the previous page use the Builder API which is the most general API, able to build any kind of directed (cyclic) graph. However it is also maximally verbose, which—besides requiring more typing—can cause your workflow implementation to be scattered and difficult to follow.

We provide the Chain API as a streamlined alternative that suits cases where cycles aren’t needed. It allows you to build workflows by chaining methods together, a popular idiom in Rust. This can save typing time and also allows your code to express the structure of the workflow. Here we will recreate the workflows of the previous page, simplifying them using the Chain API.

simple sequence

Sequences of services can be chained together with a simple .then(_):

let workflow = commands.spawn_io_workflow(
    |scope, builder| {
        builder
            .chain(scope.start)
            .then(sum)
            .then(double)
            .connect(scope.terminate);
    }
);

A few notes about this example:

  • Builder::chain(input) allows us to begin creating a chain. In this case we begin the chain from scope.start, but you can begin a chain from any Output.
  • Chain::then supports the same arguments as Builder::create_node, meaning you can pass in Services or Callbacks.
  • Chain::connect takes in an InputSlot and ends the chain by feeding it into that input slot. This is useful for connecting the end of a chain into the terminate operation or looping it back to an earlier operation to create a cycle.

If you are dealing with maps and want to define in them inline when building the workflow, you can use Chain::map_block or Chain::map_async for that:

let workflow = commands.spawn_io_workflow(
    |scope, builder| {
        builder
            .chain(scope.start)
            .map_block(|request: Vec<f32>| {
                request.into_iter().fold(0.0, |a, b| a + b)
            })
            .map_block(|request: f32| {
                2.0 * request
            })
            .connect(scope.terminate);
    }
);

The Chain::map_block and Chain::map_async methods are just syntactic sugar around Chain::then that makes it easier to put maps into the chain. You can interleave calls to .then, .map_block, and .map_async however you would like when creating a sequence of nodes.

recreating fork-result

The Chain::fork_result method allows you to create two diverging branches when an output message has a Result type. It takes two closures as arguments, where each closure builds one of the diverging branches.

let workflow: Service<Json, Result<SchemaV2, Error>> = commands.spawn_io_workflow(
    |scope, builder| {
        builder
            .chain(scope.start)
            .map_block(|message: Json| {
                // Try parsing the JSON with Schema V2
                serde_json::from_value::<SchemaV2>(message.clone())
                    // If an error happened, pass along the original request message
                    // so we can try parsing it with a different schema.
                    .map_err(|_| message)
            })
            .fork_result(
                |ok| {
                    // If ok, wrap the message in Ok and connect it to terminate
                    ok.map_block(|msg| Ok(msg)).connect(scope.terminate);
                },
                |err| {
                    err
                    .map_block(|message: Json| {
                        // Try parsing the JSON with Schema V1 since V2 failed.
                        serde_json::from_value::<SchemaV1>(message)
                            // If the parsing was successful, upgrade the parsed
                            // value to SchemaV2.
                            .map(|value| value.upgrade_to_schema_v2())
                    })
                    // End this branch by feeding it into the terminate operation
                    .connect(scope.terminate);
                }
            );
    }
);

The chain operation has special methods for certain message types that can further simplify how you express the chain. For example the Result type gets access to Chain::branch_for_err that isolates the err branch of a fork-result and allows the rest of the chain to proceed with the ok branch:

let workflow: Service<Json, Result<SchemaV2, Error>> = commands.spawn_io_workflow(
    |scope, builder| {
        builder
            .chain(scope.start)
            .map_block(|request: Json| {
                // Try parsing the JSON with Schema V2
                serde_json::from_value::<SchemaV2>(request.clone())
                    // If an error happened, pass along the original request message
                    // so we can try parsing it with a different schema.
                    .map_err(|_| request)
            })
            // Create a branch that handles an Err value. This creates a
            // fork-result under the hood.
            .branch_for_err(|chain|
                chain
                .map_block(|request: Json| {
                    // Try parsing the JSON with Schema V1 since V2 failed.
                    serde_json::from_value::<SchemaV1>(request)
                        // If the parsing was successful, upgrade the parsed value to
                        // SchemaV2.
                        .map(|value| value.upgrade_to_schema_v2())
                })
                // End this branch by feeding it into the terminate operation
                .connect(scope.terminate)
            )
            // Continue the original chain, but only for Ok values.
            .map_block(|ok| Ok(ok))
            .connect(scope.terminate);
    }
);

This is typically used if the error handling branch is relatively small while the ok branch continues on for a long time.

recreating fork-option

It’s less obvious how to create cycles when using chains, but it is still completely possible! The key is to first create a Node so you can refer to the InputSlot later. Here’s an example of creating a cycle using a chain:

let workflow: Service<(), f32> = commands.spawn_io_workflow(
    |scope, builder| {
        // Make a small chain that returns a Node. We need to create an explicit
        // Node for get_random because we will need to refer to its InputSlot
        // later to create a cycle.
        let get_random: Node<(), f32> = builder
            .chain(scope.start)
            .map_block_node(|request: ()| rand::random::<f32>());

        builder
            .chain(get_random.output)
            .map_block(|value: f32| {
                if value < 0.5 {
                    Some(value)
                } else {
                    None
                }
            })
            // This creates a fork-option and sends None values back to the
            // get_random node. This creates a cycle in the workflow.
            .branch_for_none(|none| none.connect(get_random.input))
            // As we continue the chain, only Some(T) values will reach this
            // point, so the chain simplifies the Option<T> to just a T. We can
            // now connect this directly to the terminate operation.
            .connect(scope.terminate);
    }
);

A few notes about this example:

recreating racing

When you have diverging parallel branches, an easy way to create one of those branches is with a Chain::branch_clone. You just pass in a closure that builds off a new chain that will be fed a clone of the original message. The return value of branch_clone(_) will be a continuation of the original Chain that the branch_clone forked off of.

let workflow = commands.spawn_io_workflow(
    |scope, builder| {
        builder
            .chain(scope.start)
            .branch_clone(|chain|
                // This is a parallel branch fed with a clone of the scope input.
                chain
                .then(emergency_stop)
                .connect(scope.terminate)
            )
            // As we continue to build this chain, we are creating a branch that
            // will run in parallel to the one defined inside of .branch_clone(_).
            // This is where we'll define the happy path sequence of the
            // pick-and-place routine.
            .then(move_to_pick_pregrasp)
            .then(grasp_item)
            .then(move_to_placement)
            .then(release_item)
            .connect(scope.terminate);
    }
);

recreating joining

Chains even support synchronization operations like join. We can structure our fork-clone a bit differently than we did in the racing example to set it up for an easy join:

let workflow = commands.spawn_io_workflow(
    |scope, builder| {
        builder
            .chain(scope.start)
            .fork_clone((
                |chain: Chain<_>| {
                    // This branch moves the robot to the elevator
                    chain
                    .then(move_robot_to_elevator)
                    .output()
                },
                |chain: Chain<_>| {
                    // This branch monitors the robot and then summons the lift
                    chain
                    .then(on_robot_near_elevator)
                    .then(send_elevator_to_location)
                    .output()
                }
            ))
            .join(builder)
            .then(use_elevator)
            .connect(scope.terminate);
    }
);

A few notes about this example:

  • Chain::fork_clone takes in a tuple of any number of closures. Each closure takes a Chain<T> as an input argument where T is the message type of the original chain.
  • You will have to explicitly specify the : Chain<_> type of the closure arguments. The Rust compiler cannot infer this automatically due to limitations in what Traits can express, but you can always use the _ filler for the generic parameter of the chain.
  • The return value of Chain::fork_clone will be a tuple wrapping up all the return values of the closures. We have each closure end with Chain::output so we can gather up the plain outputs of all the branches into one tuple.
  • The join method can be applied to tuples of Outputs, allowing us to apply a join operation at the end of these branches to synchronize them.
  • The join method gives back a chain of the joined value that we can continue to build off of.

There are many ways to use Chain to structure a workflow. Sometimes you will find it more concise and intuitive, but other times you might find it messy and confusing. You can mix and match uses of Chain with uses of Builder however you would like. Ultimately both APIs boil down to InputSlots, Outputs, and Buffers (which will be covered later), making these APIs fully interoperable. Use whichever allows your workflow to be as understandable as possible.

recreating unzipping

Chains can also create forks using the unzip operation and join them together ergonomically:

let workflow = commands.spawn_io_workflow(
    |scope, builder| {
        builder
            .chain(scope.start)
            .map_block(|request: PickupTask| {
                (
                    WorkcellTask {
                        workcell: request.workcell,
                        item: request.item,
                    },
                    MobileRobotTask {
                        vehicle: request.vehicle,
                        location: request.location,
                    }
                )
            })
            .fork_unzip((
                |workcell_branch: Chain<_>| workcell_branch.then(pick_item).output(),
                |amr_branch: Chain<_>| amr_branch.then(move_to_location).output(),
            ))
            .join(builder)
            .then(hand_off_item)
            .connect(scope.terminate);
    }
);

Connecting Output Streams

Up until now all the examples of spawning a workflow have used spawn_io_workflow which spawns a service that takes in a single request and yields a single response with no output streams. Just as services support output streams, so do workflows. To stream messages out of your workflow while it is still running, you can use the stream out operation.

For starters, to stream out of a workflow you need to use the more general spawn_workflow method. Unlike the Request and Response generic parameters, it is never possible for the Rust compiler to infer what type you want for the Streams generic parameter. This is why the spawn_io_workflow exists: In cases where you don’t need to stream out of your workflow, all the generic parameters can usually be inferred.

The easiest way to specify the streams is to do it in the Scope. The Request and Response parameters can still be inferred by putting a placeholder _ in for them:

let workflow = commands.spawn_workflow(
    |scope: Scope<_, _, StreamOf<T>>, builder| {
        /* ... */
    }
);

Single Stream

If you workflow has a single output stream, you can use the StreamOf<T> struct to have a stream that produces messages of type T.

Here is how the stream out conceptual example of slicing apples would be written with the native Rust API:

let workflow = commands.spawn_workflow(
    |scope: Scope<_, _, StreamOf<AppleSlice>>, builder| {
        // Create the service nodes that will be involved in chopping apples
        let deposit_apples = builder.create_node(deposit_apples);
        let try_take_apple = builder.create_node(try_take_apple);
        let (have_apple_input, have_apple) = builder.create_fork_option();
        let chop_apple = builder.create_node(chop_apple);

        // Create a buffer to hold apples that are waiting to be chopped, and
        // an operation to access that buffer.
        let apple_buffer = builder.create_buffer(BufferSettings::keep_all());
        let access_apple_buffer = builder.create_buffer_access(apple_buffer);

        // Connect the scope input message to the deposit_apples service
        builder.connect(scope.start, deposit_apples.input);

        // Connect the stream of incoming apples to the apple buffer
        builder.connect(deposit_apples.streams, apple_buffer.input_slot());

        // When done depositing apples, start trying to take them by accessing
        // the buffer
        builder.connect(deposit_apples.output, access_apple_buffer.input);
        builder.connect(access_apple_buffer.output, try_take_apple.input);

        // Try to take an apple and check if we ran out
        builder.connect(try_take_apple.output, have_apple_input);
        // If there's another apple, send it to be chopped
        builder.connect(have_apple.some, chop_apple.input);
        // If there are no more apples, terminate the workflow
        builder.connect(have_apple.none, scope.terminate);

        // Stream the apple slices out of the scope, and then cycle back to
        // taking another apple when the current one is finished.
        builder.connect(chop_apple.streams, scope.streams);
        builder.connect(chop_apple.output, access_apple_buffer.input);
    }
);

The workflow has a single stream output that produces AppleSlice objects. You can also see how streams produced by the deposit_apples and chop_apple services are connected to other operations. The Node struct has a streams field which, for these services, is a single Output, because the services are using StreamOf.

Stream Pack

If your workflow needs multiple stream outputs, you can use a custom StreamPack. Here’s an example of a stream pack that might be provided by a navigation workflow:

#[derive(StreamPack)]
struct NavigationStreams {
    log: String,
    location: Vec2,
    error: NavigationError,
}

With that defined, we can emit multiple output streams from our workflow. Here is an example of a robot navigating through a doorway and streaming out information while it goes along:

// This service will have a mobile robot approach a door.
let approach_door = commands.spawn_service(
    |In(input): BlockingServiceInput<(), NavigationStreams>| {
        input.streams.log.send(String::from("approaching door"));
        /* ... approach the door ... */
    }
);

// open_door is not a navigation service so it will only have one
// output stream: log messages.
let open_door = commands.spawn_service(
    |In(input): BlockingServiceInput<(), StreamOf<String>>| {
        input.streams.send(String::from("opening door"));
        /* ... open the door ... */
    }
);

// This service will have a mobile robot move through a door.
let move_through_door = commands.spawn_service(
    |In(input): BlockingServiceInput<(), NavigationStreams>| {
        input.streams.log.send(String::from("moving through door"));
        /* ... move through the door ... */
    }
);

// This workflow will handle the whole process of moving a robot through a door,
// while streaming navigation and log information from all the services it runs.
let workflow = commands.spawn_workflow(
    |scope: Scope<_, _, NavigationStreams>, builder| {
        let approach_door = builder.create_node(approach_door);
        let open_door = builder.create_node(open_door);
        let move_through_door = builder.create_node(move_through_door);

        // Connect nodes together
        builder.connect(scope.start, approach_door.input);
        builder.connect(approach_door.output, open_door.input);
        builder.connect(open_door.output, move_through_door.input);
        builder.connect(move_through_door.output, scope.terminate);

        // Connect node streams to scope streams
        builder.connect(approach_door.streams.log, scope.streams.log);
        builder.connect(approach_door.streams.location, scope.streams.location);
        builder.connect(approach_door.streams.error, scope.streams.error);

        builder.connect(open_door.streams, scope.streams.log);

        builder.connect(move_through_door.streams.log, scope.streams.log);
        builder.connect(move_through_door.streams.location, scope.streams.location);
        builder.connect(move_through_door.streams.error, scope.streams.error);
    }
);

Tip

In the above example we are connecting node stream outputs to the scope stream input slots, but this is an illustration, not a limitation. The fields in Node::streams are all regular Outputs that can be connected into any InputSlot, and the fields in Scope::streams are regular InputSlots that can receive messages from any Output.

Using Buffers

Buffers are an important element of complex workflows. They are the key to both synchronizing parallel activity inside of a workflow and also tracking the overall state of the workflow.

Crossflow uses Bevy’s ECS to manage the data inside of buffers. This means if you want a node to directly access the data inside of a buffer you will need to implement the node with a service or a callback and use an accessor, which is explained on the next page.

joining

In an earlier example we saw how to join two branches by connecting their outputs with the Builder::join method. In reality, joining two or more outputs is implemented by creating a buffer with default settings for each output and then performing the join operation on that set of buffers.

Tip

For a conceptual review of the different buffer and join settings, visit the Join Rates chapter.

The automatic conversion from Output to Buffer is great for ergonomics, especially when building chains, but it’s important to know how to explicitly create buffers when needed.

keep_all

Suppose we want to process batches of lidar data and camera data in order to perform localization. Each kind of data is processed at different rates, but there are pairs of data between the types that need to be bundled back together, like in this figure:

join-all-pull-all-pull

To set this up in a workflow, we can explicitly create each of the buffers with the keep_all setting as seen in this code example:

let workflow = commands.spawn_io_workflow(
    |scope, builder| {
        let lidar_buffer = builder.create_buffer(BufferSettings::keep_all());
        let camera_buffer = builder.create_buffer(BufferSettings::keep_all());

        let localization_data = builder.join(
            LocalizationData::select_buffers(lidar_buffer, camera_buffer)
        );
    }
);

The data from the buffers will be joined into LocalizationData, defined here:

#[derive(Joined)]
struct LocalizationData {
    lidar: LidarData,
    camera: CameraData,
}

Notice that LocalizationData has derived the Joined trait which allows it to be an output of the join operation. Deriving Joined also implements the select_buffers method for the struct, allowing you to easily set the buffer that will feed into each field.

fetch-by-clone

By default the join operation will always pull messages out of their buffers once all the buffers are ready to be joined. In some cases you may want to clone one of the messages out instead of pulling it. For example, here we want to stamp each camera image with whatever last reported location the robot had:

join-1-clone-1-pull

Since keep_last: 1 is the default buffer setting, we can just use BufferSettings::default() when creating the buffers. What we need to do differently is apply .join_by_cloning() to the location_buffer when creating the join operation:

let workflow = commands.spawn_io_workflow(
    |scope, builder| {
        let location_buffer = builder.create_buffer(BufferSettings::default());
        let camera_buffer = builder.create_buffer(BufferSettings::default());

        let image_stamped = builder.join(ImageStamped::select_buffers(
            // Use .join_by_cloning() to have the location data cloned instead
            // of pulled for each join operation.
            location_buffer.join_by_cloning(),
            camera_buffer,
        ));
    }
);

Note

The .join_by_cloning() method can be used on any buffer whose message type implements the Clone trait. The choice to clone instead of pull is made per buffer in each join operation that gets created. The same buffer can take part in multiple join operations with different clone/pull settings for each of those joins.

Using an Accessor

Buffers can act similar to the “blackboard” concept in behavior trees—a repository of data shared across the different services in your workflow. To achieve thread-safe high-performance access to buffer data, we rely on Bevy’s ECS.

Buffers are implemented as entities with certain components attached. This means their data is stored in Bevy’s highly optimized memory management system, that can automatically provide safe parallel access if you use continuous services. The catch is, to access that data you need to use services or callbacks which can support system parameters. In particular you need to use the BufferAccess system parameter for read-only parallelizable access, or BufferAccessMut for exclusive read/write access.

Note

Maps cannot access buffer data. In general maps cannot use system parameters, making them slightly more efficient than services or callbacks, but leaving them unable to do as much.

With Access

To grant a service access to a buffer, you must provide it with a BufferKey. The two typical ways to obtain a buffer key are through the listen operation or the buffer access operation.

  • The listen operation will send out a message containing one or more buffer keys when the buffers attached to it are modified.
  • The buffer access operation will append a buffer key to a message that’s in flight from one node to another.

The important thing is that a BufferKey is somehow present inside the message data that gets passed into the service. The following example shows the Chain::with_access method adding a buffer key to messages that are being passed into some services. It also shows how those services use BufferAccess and BufferAccessMut to view and modify the contents of a buffer.

use crossflow::{prelude::*, testing::*};

/// Use mutable access (BufferAccessMut) to push values into a buffer. The
/// values are guaranteed to be present in the buffer before this service
/// finishes running. Therefore any service that accesses the buffer after this
/// service finishes is guaranteed to find the values present inside.
fn push_values(
    In(input): In<(Vec<i32>, BufferKey<i32>)>,
    mut access: BufferAccessMut<i32>,
) {
    let Ok(mut access) = access.get_mut(&input.1) else {
        return;
    };
    for value in input.0 {
        access.push(value);
    }
}

/// Use read-only access (BufferAccess) to look through the values in the
/// buffer. We pick out the largest value and clone it to pass it along. This
/// read-only access cannot pull or modify the data inside the buffer in any
/// way, it can only view and clone (if the data is cloneable) from the buffer.
fn get_largest_value(
    In(input): In<((), BufferKey<i32>)>,
    access: BufferAccess<i32>,
) -> Option<i32> {
    let access = access.get(&input.1).ok()?;
    access.iter().max().cloned()
}

let mut context = TestingContext::minimal_plugins();

let workflow = context.spawn_io_workflow(|scope, builder| {
    let buffer = builder.create_buffer(BufferSettings::keep_all());
    builder
        .chain(scope.start)
        .with_access(buffer)
        .then(push_values.into_blocking_callback())
        .with_access(buffer)
        .then(get_largest_value.into_blocking_callback())
        .connect(scope.terminate);
});

let mut outcome = context.command(|commands| {
    commands.request(vec![-3, 2, 10], workflow).outcome()
});

context.run_with_conditions(&mut outcome, Duration::from_secs(1));

let r = outcome.try_recv().unwrap().unwrap();
assert_eq!(r, Some(10));

Listen

The buffer access operation (with_access) only gets activated when a message is emitted by some operation output. To monitor the contents of one or more buffers directly, you need to use listen. The listen operation connects to one or more buffers and gets activated any time a modification is made to the contents of any one of the buffers connected to it.

When listen is activated, is passes along an accessor as a message. Unlike the buffer access operation, the message produced by a listener is nothing but a collection of buffer keys. For the listen operation to be useful, you need to send its message to a service that will do something with those keys.

The following code example recreates a simple intersection crossing workflow:

listen-accessor

/// Derive the Accessor trait so this struct of keys can be constructed by the
/// listen operation. Note that Accessor also requires Clone to be defined.
#[derive(Accessor, Clone)]
struct IntersectionKeys {
    signal: BufferKey<TrafficSignal>,
    arrival: BufferKey<[f32; 2]>,
}

/// Define a device that evaluates whether or not the robot should proceed
/// across the intersection.
fn proceed_or_stop(
    In(keys): In<IntersectionKeys>,
    signal_access: BufferAccess<TrafficSignal>,
    mut arrival_access: BufferAccessMut<[f32; 2]>,
) -> Option<RobotCommand> {
    let signal_buffer = signal_access.get(&keys.signal).ok()?;
    let mut arrival_buffer = arrival_access.get_mut(&keys.arrival).ok()?;

    // Get a reference to the newest message if one is available
    let signal = signal_buffer.newest()?;
    // Pull the value from this buffer is one is available
    let arrived = arrival_buffer.pull()?;

    match signal {
        TrafficSignal::Green => Some(RobotCommand::Go),
        TrafficSignal::Red => Some(RobotCommand::Stop),
    }
}

let workflow = commands.spawn_io_workflow(
    |scope, builder| {
        // Create buffers to store the two state variables:
        // * what the current traffic signal is
        // * whether the robot has reached the intersection
        let signal = builder.create_buffer(Default::default());
        let arrived = builder.create_buffer(Default::default());

        // Create services to update the two state variables
        let traffic_signal_service = builder.create_node(traffic_signal_service);
        let approach_intersection = builder.create_node(approach_intersection);

        // Activate both of the state update services at startup
        builder
            .chain(scope.start)
            .fork_clone((
                |chain: Chain<_>| chain.connect(traffic_signal_service.input),
                |chain: Chain<_>| chain.connect(approach_intersection.input),
            ));

        // Connect the services to their respective buffers. For the traffic
        // signal we connect its stream, because the signal will be changing
        // over time. For the arrival state we connect the output of the
        // approach_intersection service because the robot will only ever
        // approach the intersection once.
        builder.connect(traffic_signal_service.streams, signal.input_slot());
        builder.connect(approach_intersection.output, arrived.input_slot());

        builder
            // Create a listen operation that will activate whenever either the
            // traffic signal or arrival buffer has an update
            .listen(IntersectionKeys::select_buffers(signal, arrived))
            // When an update happens, provide both buffer keys to a node that
            // will evaluate if the robot is ready to cross
            .then(proceed_or_stop.into_blocking_callback())
            // If no decision can be made yet (e.g. one of the buffers is
            // unavailable) then just dispose this message
            .dispose_on_none()
            // If a decision was made, send a command to the robot
            .then(send_robot_command)
            // The previous service will return None if the command was to stop.
            // If the command was to go, then it will return Some after the robot
            // finishes crossing the intersection.
            .dispose_on_none()
            // Terminate when the robot has finished crossing.
            .connect(scope.terminate);
    }
);

Gate

Besides being able to access the data in buffers, you can also use a buffer key to open and close the gate of the buffer. When a the gate of a buffer is closed, listeners will not be notified when the content of the buffer gets modified. However this does not prevent the buffer from being accessed by any services that have a key for it.

Note

Closing a buffer gate has no effect on the buffer access operation.

fn manage_opening_time(
    In(input): In<(SystemTime, BufferKey<Order>)>,
    mut gate: BufferGateAccessMut,
    hours: Res<WorkingHours>,
) {
    let Ok(mut gate) = gate.get_mut(input.1) else {
        return;
    };

    let time = input.0;
    if time < hours.open || hours.close < time {
        gate.close_gate();
    } else {
        gate.open_gate();
    }
}

Workflow Setings

In general, services allow you to specify delivery settings which affect whether the service can run in parallel across one or more sessions, or whether the service can only be run once at a time across all sessions. There are also scope settings which determine whether a given scope is “interruptible”, meaning it cannot be cancelled from the outside—only an internal cancellation or successful termination can end the scope session.

Both of these settings are relevant to a workflow. A workflow is ultimately a service, and therefore supports delivery settings. A workflow has a root scope, and that root scope can have scope settings. Both of these types of settings are bundled into WorkflowSettings.

For blocking, async, and continuous services, you would set delivery instructions via the ServiceBuilder API, which allows you to chain .serial() or .parallel() onto the service name while spawning it. Instead of this chaining approach, workflows allow you to specify their settings by returning one of these from the closure that you use to spawn the workflow:

  • WorkflowSettings: Specify all the workflow settings that you want.
  • DeliverySettings: Specify the delivery settings and use the default scope settings (interruptible).
  • ScopeSettings: Specify the scope settings and use the default delivery settings (parallel).
  • (): Use the default delivery settings (parallel) and scope settings (interruptible). This is what your closure will return if you don’t explicitly return anything.

Here are examples of each:

WorkflowSettings

let workflow = commands.spawn_io_workflow(
    |scope, builder| {
        builder.connect(scope.start, scope.terminate);

        // Return explicit workflow settings.
        WorkflowSettings::new()
            .uninterruptible()
            .with_delivery(DeliverySettings::Serial)
    }
);

DeliverySettings

let workflow = commands.spawn_io_workflow(
    |scope, builder| {
        builder.connect(scope.start, scope.terminate);

        // Return explicit delivery settings.
        // The scope settings will be the default (interruptible).
        DeliverySettings::Serial
    }
);

ScopeSettings

let workflow = commands.spawn_io_workflow(
    |scope, builder| {
        builder.connect(scope.start, scope.terminate);

        // Return explicit scope settings.
        // The delivery settings will be the default (parallel).
        ScopeSettings::uninterruptible()
    }
);

Default

let workflow = commands.spawn_io_workflow(
    |scope, builder| {
        builder.connect(scope.start, scope.terminate);

        // Simply don't return anything from the closure
        // to get the default workflow settings.
    }
);

Nested Scope

When you use the scope operation inside of a workflow, that nested scope can have its own scope settings, independent from the rest of the workflow. This allows you to set specific clusters of operations as uninterruptible.

let workflow = commands.spawn_io_workflow(
    |scope, builder| {
        builder
            .chain(scope.start)
            .then_io_scope(
                |scope, builder| {
                    builder.connect(scope.start, scope.terminate);

                    // Set only this nested scope to be uninterruptible.
                    ScopeSettings::uninterruptible()
                }
            )
            .connect(scope.terminate);
    }
);

JSON Diagrams

The Rust API for building workflows is a good choice for embedding workflows natively into applications, especially Bevy-based applications. The Rust compiler will do a lot of heavy lifting to ensure the connections between your operations are compatible and that your workflow is being built correctly.

Another powerful feature of crossflow is to dynamically build an executable workflow from a JSON Diagram at runtime. These JSON diagrams can be hand-written by GUI tools or generated by automatic planners. Then the diagrams can be sent to an executor to be run.

make-diagrams

This chapter will explain the syntax of Crossflow JSON Diagrams and how to put together an executor.

To enable the diagram feature, remember to enable the diagram feature of crossflow:

# Cargo.toml

[dependencies]
crossflow = { version = "*", features = ["diagram"] }

Diagram Syntax

Crossflow’s JSON diagrams have a schema that’s meant to balance human and machine readability. The schema is freely available, enabling developers to validate diagrams and ensure correctness in their tools.

The schema is automatically generated from the Rust-native structs inside of crossflow. The library schemars is used for the generation. Whenever crossflow code is changed in a way that could affect the schema, the schema can be updated inside the crossflow repo by running

cargo run -F=diagram generate_schema

If the generated schema is ever out of date with the structs inside the library, an automated test in the crossflow repo will catch this if you run

cargo test -F=diagram

Diagram

The root of the crossflow diagram schema is the Diagram struct. Below is a minimal example of a diagram that works with the calculator demo:

{
  "$schema": "https://raw.githubusercontent.com/open-rmf/crossflow/refs/heads/main/diagram.schema.json",
  "version": "0.1.0",
  "description": "Basic workflow that multiplies the interger input by 3.",
  "input_examples": [
    {
      "description": "Multiply 123 by 3 to get 369",
      "value": "123"
    },
    {
      "description": "Multiply 456 by 3 to get 1368",
      "value": "456"
    }
  ],
  "start": "mul3",
  "ops": {
    "mul3": {
      "type": "node",
      "builder": "mul",
      "config": 3,
      "next": { "builtin": "terminate" }
    }
  }
}

Here’s a breakdown of the fields in the example:

  • "$schema" is an optional field that simply helps some JSON-related tools to validate the rest of the file. This is not a built-in part of the crossflow diagram schema, but may be helpful to include.
  • "version" prevents mistakes that may happen as the schema progresses over time.
  • "start" indicates which operation in the diagram to send the input of the workflow to. The operation named here can be thought of as the starting point of the workflow.
  • "ops" is a dictionary of the operations and buffers present in the workflow. The connections between these operations are specified inside the operation definitions.

Operations

Inside the "ops" field is a dictionary of the operations that exist in the workflow. Each key is a unique identifier for an operation instance in the workflow. The value of each dictionary entry is the definition of the operation instance.

The unique identifier (key) is used to refer to the operation instance in other parts of the diagram. For example, "start": "mul3" above indicates that the workflow starts from the operation in "ops" whose key is "mul3".

Here is an example of chaining operations together:

{
  "version": "0.1.0",
  "start": "mul3",
  "ops": {
    "mul3": {
      "type": "node",
      "builder": "mul",
      "config": 3,
      "next": "minus_1"
    },
    "minus_1": {
      "type": "node",
      "builder": "sub",
      "config": 1,
      "next": { "builtin": "terminate" }
    }
  }
}

The workflow starts at "mul3" and then "mul3" has a "next" field that says to pass its output to "minus_1". Inside "minus_1" the "next" field refers to { "builtin": "terminate" }. When you see { "builtin": _ }, that’s a reference to a builtin target, explained below.

Builtin Targets

Builtin targets are operation targets that are always available to pass outputs to without being instantiated in an "ops" dictionary. They are referred to with a JSON object syntax that looks like { "builtin": _ } where _ is one of the builtin targets:

  • "terminate": Use the output to terminate the current scope. The value passed into this operation will be the return value of the scope.
  • "dispose": Dispose of the output. This is used to explicitly indicate that you are okay with an output not being passed along to any operation—instead its messages will just be dropped after they are sent out. The diagram schema assumes that unconnected outputs are likely to be a mistake, similar to an unused variable in source code. The dispose operation essentially says that you are intentionally not using the output.
  • "cancel": Use the output to cancel the current scope. If the message can be converted to a string then its string representation will be included in the cancellation error message.

Instantiated Operations

The operations instantiated inside of "ops" are instantiated specifically for the diagram. Each instance has its own configuration based on what type of operation it is.

The supported diagram operations are encompassed by the DiagramOperation enum. Each variant of that enum represents a different type of operation. The schema for an operation is internally tagged, meaning they all take on this form:

{
  "type": "_",
  ...
}

where "_" should be replaced by the operation variant name in snake-case lettering. For example a DiagramOperation::ForkClone operation would contain

{
  "type": "fork_clone",
  ...
}

whereas a DiagramOperation::Node operation would contain

{
  "type": "node",
  ...
}

The rest of the fields in the operation are based on the specific schema of the operation type.

“fork_clone”

The schema of a "fork_clone" operation is given by ForkCloneSchema, which simply contains a next: Vec<NextOperation> that indicates which operations to pass clones of the input message to. That might look something like

{
  "type": "fork_clone",
  "next": ["foo", "bar", "baz"]
}

“node”

On the other hand the schema of a "node" operation is given by NodeSchema. This is a more complex operation schema that contains several significant fields:

  • "builder" is the unique name of a Node Builder that has been registered with your workflow executor. Node builders will be covered later in the Nodes page.
  • "config" is a configuration for this operation instance. The value associated with "config" has a dynamic schema determined by the "builder" that you chose. That schema can be looked up in the diagram element registry described in the next page.
  • "next" indicates where the final output message of the operation should be sent. This is a required field because we generally assume that the output of a node is valuable information that should be passed along. If you are running a node for its side-effects and don’t need to use its final output then you can set "next" to { "builtin": "dispose" }.
  • "stream_out" is a dictionary that says what target each output stream of the node should be connected to. This field is optional—even if your node has streams, you don’t have to connect them anywhere. If you don’t need to connect any streams, you can leave this field out. If you only need to connect some streams, you can fill in this field and only include entries for the streams you care about.

A node instance might look like

{
  "type": "node",
  "builder": "chop_apple",
  "config": {
    "slice_count": 6
  },
  "next": "try_take_apple",
  "stream_out": {
    "slices": "apple_slice_buffer"
  }
}

Trace Settings

All operation types include TraceSettings fields which determine how the operation gets viewed in an editor or visualizer. There are currently three trace settings fields:

  • display_text says how the name of the operation should be portrayed when the operation is visualized. Unlike the operation’s key in the "ops" dictionary, this display text does not need to be unique across the operations.
  • trace lets you toggle whether this specific operation will be traced. An operation whose trace setting is "on" will emit a signal whenever it produces a message. When set to "messages" the message data will be serialized and included in the trace signal each time. If you skip this field, the operation will follow the default trace setting at the diagram level.
  • extensions allows you to put thirdparty extension data into each operation. This can be used for adding editor-specific metadata such as the position where an operation should be rendered or how an operation should be displayed (e.g. color, icons, etc). The Diagram schema also has an extensions field for diagram-wide extensions.

These fields are flattened into the operation definition, so you would put them directly inside the operation definition, like this:

{
  "type": "node",
  "builder": "chop_apple",
  "config": {
    "slice_count": 6
  },
  "next": "try_take_apple",
  "stream_out": {
    "slices": "apple_slice_buffer"
  },
  "display_text": "Chop Apple",
  "trace": "messages"
}

Type Inference

You’ll notice that none of the schemas in a diagram specify any input or output message types. The workflow builder has the ability to infer what message types need to pass between operations. This is inferred from registered node and section builder information which have fixed message types.

Note

There are some niche cases where buffer message types can’t be automatically inferred. We might allow message types to be explicitly set for buffers. This is being tracked by #60.

Serialization / Deserialization

When a message type M is serializable, the workflow builder will automatically insert a conversion from M to JsonMessage when an output of M is connected to an input slot of JsonMessage:

implicit-serialize

Similarly when a JsonMessage output is connected to an input slot expecting a deserializable message M, the workflow builder will automatically insert a conversion from JsonMessage to M:

implicit-deserialize

In both cases there is a risk that the serialization or deserialization will fail. This is especially a concern for deserialization, since there is a wide space of JsonMessage values that cannot be successfully deserialized into an arbitrary data type M. There are significantly fewer ways in which serialization can fail, but the possibility does still exist.

For both automatic serialization and deserialization, we call the failure case an implicit error.

Implicit Errors

Implicit errors are error-related outputs that were not explicitly created by the user but which may occur because of unexpected circumstances. They can be thought of as similar to exceptions from conventional programming. The implicit serialize and implicit deserialize operations are examples of places where implicit errors may be produced.

The diagram schema provides an on_implicit_error hook that lets you specify what should be done with implicit errors. Similarly the scope operation schema also provides on_implicit_error. You can set these fields to a valid input slot within the scope, or to a builtin target. Implicit error handling is managed per scope, so the on_implicit_error of a parent scope has no effect on its nested scopes.

If on_implicit_error for a scope is not set to anything, then the default behavior is to trigger cancellation. In the case of implicit serialization or deserialization, the serialization/deserialization failure message will be stringified and passed along as the cancellation message.

Some operations that can produce errors will automatically connect their errors to the on_implicit_error handler if you don’t specify what should be done with it. For example the Transform schema has an optional on_error field. Setting that field will pass transformation errors along to whichever target you specify, but leaving it unset will cause the Transform operation to connect its errors to the implicit error handler of the current scope.

Execution

Crossflow is first and foremost a library to facilitate the execution of workflows, so there is not a single canonical way for JSON diagrams to be executed. However, there are certain requirements to meet and recommended code paths to follow for an application to be effective at executing diagrams.

Just like when workflows are built the native Rust API, a crossflow JSON diagram executor needs to be built as part of a Bevy App. You can either make a Bevy app that is entirely dedicated to executing JSON diagrams, or you can make the execution of JSON diagrams simply one feature within a broader app. In order for your app to execute diagrams, you will need to create a system that can receive JSON diagrams, build the diagrams into executable workflows, and then run those workflows.

The most important piece to understand when implementing an executor app is the DiagramElementRegistry. The registry stores “builders” that allow the operations ("ops") within a JSON diagram to be translated into workflow elements that can actually be executed.

There are three types of registrations present in the registry:

  • Message registration stores information about the message types supported by the executor, including what operations can be performed on the message type and how to perform it.
  • Node builder registration stores the Node Builders that allow "type": "node" operations in a diagram to be built into workflow nodes. This registration also stores the schema of the "config" field for each unique "builder" ID.
  • Section builder registration is similar to the node builder registration, except it stores information on Section Builders.

Creating a registry

Initializing a registry is simple:

use crossflow::prelude::*;
let mut registry = DiagramElementRegistry::new();

This will create a new registry that only contains registrations for the the “builtin” message types:

You’ll notice that we declared mut registry as mutable in the above code snippet. This is because the registry isn’t very useful until you start registering your own node builders. Without any node builders, your executor will only be able to build workflows that exclusively use the builtin message types listed above and the basic builtin operations.

Registering a node builder will allow you to build workflows that use custom services. Use DiagramElementRegistry::register_node_builder as shown below to register a new node builder.

registry.register_node_builder(
    NodeBuilderOptions::new("add"),
    |builder, config: f32| {
        builder.create_map_block(move |request: f32| {
            request + config
        })
    }
);

Node builders are covered in more detail on the next page.

Tip

When you register a node builder, you will also automatically register any input and output messages types needed by the node builder.

Building workflows with a registry

Once you’ve registered all the builders that your executor needs, you can start building workflows from diagrams. Simply create a valid Diagram instance and then call Diagram::spawn_io_workflow:

use serde_json::json;
let diagram_json = json!(
    {
        "version": "0.1.0",
        "start": "add_5",
        "ops": {
            "add_5": {
                "type": "node",
                "builder": "add",
                "config": 5,
                "next": { "builtin": "terminate" }
            }
        }
    }
);

let diagram = Diagram::from_json(diagram_json).unwrap();
let workflow = diagram.spawn_io_workflow(&mut commands, &registry).unwrap();

Note

In the above example, commands is a Commands instance. Typically you will need to create a Bevy system that receives JSON diagrams and builds them into workflows.

Executing built workflows

Once you’ve used the Diagram and registry to build the workflow, you will be holding a Service that you can execute. From there you can follow the same guidance in How to Run a Service or How to Run a Series.

Tip

To build the service and execute the workflow, your executor application will need to know the input and output message types of the diagram at compile time.

In most cases you can’t expect all incoming diagrams to have the exact same input and output message types as each other, so instead you can use JsonMessage as both the input and output message types (Request and Response) of all diagrams.

As long as the actual input and output message types of the diagrams are deserializable and serializable (respectively), the workflow builder can convert to/from JsonMessage to run the workflow and receive its response.

Premade Executor

If you would like to get started with executing crossflow diagrams with minimal effort, you can use the crossflow-diagram-editor library to quickly make a basic executor.

The calculator example shows how to use the crossflow-diagram-editor to create a workflow executor that provides simple calculator operations. For your own custom executor, you can replace the calculator node builders with your own more useful node builders.

use crossflow_diagram_editor::basic_executor::{self, DiagramElementRegistry, Error};

fn main() -> Result<(), Box<dyn Error>> {
    // Create a new regsitry with the default message types pre-registered.
    let mut registry = DiagramElementRegistry::new();

    // Register calculator-inspired node builders from the calculator_ops_catalog library.
    calculator_ops_catalog::register(&mut registry);

    // Run the basic executor
    basic_executor::run(registry)
}

Nodes

You will typically need to supply your registry with node builders in order to execute any useful JSON diagrams. The registry will maintain a dictionary of the node builders that you give to it. When a diagram is built into a workflow, the registered node builders will be used to construct the workflow.

This page will teach you all the details of how to register node builders.

Node Builder Options

Each time you register a node builder you will need to set its NodeBuilderOptions. The only required field in NodeBuilderOptions is the id:

let options = NodeBuilderOptions::new("multiply_by");

Display Text

This ID must be unique for each node builder added to the registry. Registering a second node builder with the same ID as an earlier one will remove the earlier one from the registry.

When you only provide the ID, graphical editors will typically use that ID as display text for its associated nodes when visualizing a diagram. This isn’t always a good idea since the unique ID could be an unintelligible UUID, or it could be mangled with namespaces or version numbers. Instead you can add a default display text to the node builder options:

let options = NodeBuilderOptions::new("multiply_by")
    .with_default_display_text("Multiply By");

The diagram element registry can be serialized into JSON and exported from the executor. The serialized registry data can be provided to a diagram editor or visualization frontend, which can then look up the display text and render it to the user.

Description

When someone is manually editing or viewing a workflow diagram, the purpose of a node might not be obvious from the ID or the display text. Therefore your node builder options can also include a description for the nodes that will be built:

let description = "Multiply the input value by the configured value.";

let options = NodeBuilderOptions::new("multiply_by")
    .with_default_display_text("Multiply By")
    .with_description(description);

Similar to the display text, the description will be included in the serialized registry, allowing diagram editors or visualizers to render the description for each node.

Closure

Along with the node builder options, you need to provide a closure when registering a node builder. The closure is what does the heavy lifting of creating the node for the workflow. It will be provided with two arguments: a &mut Builder and a config.

The Builder API allows your closure to create any kind of node. You can use it to call create_map_block or create_map_async for simple functions. For services or callbacks you can call create_node.

If you need to spawn a service within the closure, you can use Builder::commands to get a &mut Commands. Just keep in mind that services do not get automatically despawned when no longer used, so you should avoid spawning a new service for each node that gets created. Unless you do something to periodically clean up those services, you could end up with an arbitrarily growing number of services in your executor.

Tip

The closure is expected to be FnMut, which means you can cache data inside of it that can be reused or updated each time the closure gets run.

The return type of your closure must be Node<Request, Response, Streams>. That happens to be the return type of create_map_block, create_map_async, and create_node, so your entire closure could be as simple as calling one of those methods on Builder:

registry.register_node_builder(
    NodeBuilderOptions::new("add"),
    |builder, config: f32| {
        builder.create_map_block(move |request: f32| {
            request + config
        })
    }
);

The Request, Response, and Streams of the Node that you return will be automatically detected by the registry. They will be recorded as the input, output, and stream message types for nodes created by your node builder.

Technically the closure is not limited to only creating a single node. You could have the closure additionally create buffers or anything else, as long as the final return value is a Node. But it falls on you to ensure that the connected collection of elements that your closure builds can operate similar to a node, or else it may inflict confusing behavior on users and visualization tools.

Tip

If you need to generate something more complex than a simple Node, consider registering a section builder instead.

Config

Each node builder can decide on its own config data structure. This config will be the second argument passed to the closure. In the earlier example the config data structure is a simple floating point value, f32. Any type that implements Deserialize and JsonSchema can be used as a config type.

For complex node configurations, you can define your own custom struct and derive the necessary traits:

/// A custom node builder config for a greeting node
#[derive(Deserialize, JsonSchema)]
struct GreetConfig {
    /// How should the person be greeted?
    greeting: String,
    /// Should the greeting be printed out?
    print: bool,
}

registry.register_node_builder(
    NodeBuilderOptions::new("greet"),
    |builder, config: GreetConfig| {
        let GreetConfig { greeting, print } = config;
        builder.create_map_block(
            move |name: String| {
                let message = format!("{greeting}{name}");
                if print {
                    println!("{message}");
                }

                message
            }
        )
    }
);

Deriving JsonSchema for the config type allows us to save a schema for the config in the registry. This helps graphical editing tools and diagram generation tools to ensure that all the nodes have valid configurations. You could even auto-generate UIs tailored to the config of each node builder.

Tip

If your node builder doesn’t need any config information then just use the unit-type () as the config type.

Examples

Despite being provided with a config schema, human users may still struggle to figure out how to correctly configure a node to get what they want out of it. To mitigate this problem, you can provide example configurations in your node builder options:

use crossflow::{prelude::*, ConfigExample};
use serde::{Serialize, Deserialize};

/// A custom node builder config for a greeting node
#[derive(Serialize, Deserialize, JsonSchema)]
struct GreetConfig {
    /// How should the person be greeted?
    greeting: String,
    /// Should the greeting be printed out?
    print: bool,
}

let examples = vec![
    ConfigExample::new(
        "Say hello and print the message",
        GreetConfig {
            greeting: String::from("Hello, "),
            print: true,
        }
    ),
    ConfigExample::new(
        "Say guten tag and do not print the message",
        GreetConfig {
            greeting: String::from("Guten tag "),
            print: false,
        }
    ),
];

registry.register_node_builder(
    NodeBuilderOptions::new("greet")
        .with_description("Turn a name into a greeting")
        .with_config_examples(examples),
    |builder, config: GreetConfig| {
        let GreetConfig { greeting, print } = config;
        builder.create_map_block(
            move |name: String| {
                let message = format!("{greeting}{name}");
                if print {
                    println!("{message}");
                }

                message
            }
        )
    }
);

Crossflow graphical editing tools are encouraged to make these examples visible to users, and allow users to copy/paste the examples.

Tip

If you want something more flexible than a static struct for your config, you can always just use JsonMessage as the config type. Just be sure to include comprehensive examples so human users know what a valid config would be.

Fallible

There might be cases where a node builder cannot successfully build a node. Maybe there is a semantic error in the config (even though the parsing was successful), or maybe some resource needed by the builder has become unavailable.

The node builder API above assumes that building the node will always be successful. To allow the node building to fail, you can use the fallible API instead:

use anyhow::anyhow;
registry.register_node_builder_fallible(
    NodeBuilderOptions::new("divide_by"),
    |builder, config: f64| {
        if config == 0.0 {
            return Err(anyhow!("Cannot divide by zero"));
        }

        let node = builder.create_map_block(move |request: f64| request / config);
        Ok(node)
    }
);

You can return any error that can be converted into an anyhow::Error. The easiest option is to use the anyhow! macro.

Note

If the "config" in the diagram cannot be successfully deserialized into the data structure of your closure’s config, this will be automatically caught by the registry, and a ConfigError will be returned instead of a Service. Your node builder does not have to handle this error mode.

Message Operation Support

When you use register_node_builder from DiagramElementRegistry, the message types of Request, Response, and Streams will automatically be registered. By default all registered messages will also register the ability to serialize, deserialize, and clone, which allows those message types to support operations like fork-clone and conversion to/from JsonMessage.

For certain message types there may be additional operations that can be performed on them. For example if a node returns a Result type then users should be able to apply a fork-result to it. A tuple message should be able to unzip. Unfortunately the Rust programming language does not yet support specialization as a stable feature. To get around this, the register_node_builder returns a NodeRegistrationBuilder that lets you register support for additional operations.

At the end of registering a node, you can chain support for additional operators:

/// A request to get some information from a web page.
///
/// We implement Joined so this struct can be created by the join operation.
///
/// We implement Clone, Serialize, Deserialize, and JsonSchema so this struct
/// can support the default message operations.
#[derive(Joined, Clone, Serialize, Deserialize, JsonSchema)]
struct WebPageQuery {
    url: String,
    element: String,
}

registry
    .register_node_builder(
        NodeBuilderOptions::new("get_url_header"),
        |builder, config: ()| {
            builder.create_map_async(|query: WebPageQuery| {
                async move {
                    let page = fetch_content_from_url(query.url).await?;
                    page
                        .get(&query.element)
                        .cloned()
                        .ok_or_else(|| FetchError::ElementMissing(query.element))
                }
            })
        }
    )
    .with_join()
    .with_result();

In the above example WebPageQuery implements the Joined trait, which means it can be the output of a join operation. To register that ability for the message, we chain .with_join() after .register_node_builder().

At the same time the return type of node is a Result. We can also chain .with_result() to register fork-result support for the node’s output message type.

When adding support for an operation, the message we are adding support for must be compatible with the operation. This is ensured by the compiler. If the node in the example above were not returning a Result type then the Rust compiler would emit a compilation error when we try to register .with_result() for it.

Each operation support that we add using NodeRegistrationBuilder will only apply to either the input message type or the output message type depending on whether the operation would produce the message or consume the message, respectively.

For finer grain control over exactly what operations are registered for each message type, continue to the next section.

Special-case Message Registration

In most cases your messages types and their supported operations can be registered as described above—by registering a node and then chaining on any additional operations needed by the messages. However there are some special cases that don’t quite fit that pattern.

opt-out

Crossflow diagrams can support message types that don’t implement clone or support serialization/deserialization. By default the node registration API will assume that your node’s messages do support all of those traits, because those are very typical traits for “plain old data” types. We think if those operations weren’t registered by default, there is a high likelihood that users would forget to register the operations, so by default we try to register them.

If your node produces or consumes messages that are not plain old data, you will need to explicitly opt out of some traits in order to register the node builder. If you forget to opt-out of the default operations for message types that don’t support them, you’ll see a compilation error.

Here’s an example of a node with an input value that supports none of the default operations:

use tokio::sync::mpsc::UnboundedReceiver;
registry
    .opt_out()
    .no_cloning()
    .no_serializing()
    .no_deserializing()
    .register_node_builder(
        NodeBuilderOptions::new("stream_out"),
        |builder, config: ()| {
            builder.create_map(|input: AsyncMap<UnboundedReceiver<f32>, StreamOf<f32>>| {
                async move {
                    let mut receiver = input.request;
                    let stream = input.streams;

                    while let Some(msg) = receiver.recv().await {
                        stream.send(msg);
                    }
                }
            })
        }
    )
    .with_common_response();

The UnboundedReceiver does not support serialization, deserialization, or even cloning. To register this node builder, we first need to use .opt_out() and then specify

With those opted out, we can use the UnboundedReceiver as a message in the node.

To opt back into the default operations for the response, we apply .with_common_response(). A similar method exists if we were opting back in for the request type instead.

Streams

Registering additional operations for request and response message types is fairly straightforward, but there isn’t a clean way to do this for the message types that are present in output streams. If you have output streams that produce custom data structures, they will be registered with the default message operations (unless you opted out), but to register any additional operations for those types, you will need to do it directly.

Instead of using the NodeRegistrationBuilder API, you will need to use register_message:

#[derive(Clone, Serialize, Deserialize, JsonSchema)]
struct State {
    position: [f32; 2],
    battery_level: f32,
}

#[derive(Clone, Serialize, Deserialize, JsonSchema)]
enum Error {
    LostConnection
}

registry
    .register_node_builder(
        NodeBuilderOptions::new("navigate_to"),
        |builder, config: ()| {
            builder.create_map(
                |input: AsyncMap<[f32; 2], StreamOf<Result<State, Error>>>| {
                    async move {
                        let destination = input.request;
                        let stream = input.streams;

                        let mut update_receiver = navigate_to(destination);
                        while let Some(update) = update_receiver.recv().await {
                            stream.send(update);
                        }
                    }
                }
            )
        }
    );

// Explicitly register the message of the stream so we can add .with_result to it.
registry
    .register_message::<Result<State, Error>>()
    .with_result();

Section Builders

Sometimes you’ll want to provide users with a workflow element that does more than a node. Maybe you want to encapsulate a complex arrangement of operations as a single unit that users can drop into their workflows without worrying about the details of how it’s implemented. This is what we call a Section.

Section builders are able to generate a web of operations connected however necessary to fulfill the purpose of the section. You can register section builders in much the same way you register node builders. Once your section builder is registered, any diagram passed to your executor can include the section in its workflow.

Caution

A section is not related to scopes even though they superficially appear similar, as they both contain an arrangement of connected operations.

When a section is put into a workflow all operations in that section will exist in the original scope that the section has been placed in. This has important implications for session and buffer behavior. Each message that enters a scope will begin a new session, whereas no new session is created when a message enters a section.

Section Builder Options

Section builder options are essentially the same as node builder options. Refer to the node builder options guide to understand the fields in SectionBuilderOptions.

Closure

Just like the closure for node builders, section builders are implemented through a closure that takes in a &mut Builder and a config. The builder is used to create and connect whatever elements your section needs. The config—just like for node builders—is any deserializable data structure that provides the information needed to configure a section.

The key difference for a section is that it does not output a Node. Instead it outputs any struct that implements the Section trait:

use crossflow::{prelude::*, SectionBuilderOptions};

/// The kind of section produced by the "use_elevator" section builder.
#[derive(Section)]
struct UseElevatorSection {
    /// Begin using the elevator by having the robot enter it.
    begin: InputSlot<()>,
    /// Signal that the robot failed to enter the elevator.
    enter_elevator_failure: Output<MoveRobotError>,
    /// Signal that the elevator failed to reach its destination.
    move_elevator_error: Output<MoveElevatorError>,
    /// Retry moving the elevator. Trigger this when a move_elevator_error is
    /// resolved.
    retry_elevator_move: InputSlot<()>,
    /// Signal that localization failed.
    localization_error: Output<LocalizationError>,
    /// Retry localizing the robot at the new floor. Trigger this when a
    /// localization_error is resolved.
    retry_localization: InputSlot<()>,
    /// Signal that the robot failed to exit the elevator.
    exit_elevator_failure: Output<MoveRobotError>,
    /// Retry exiting the elevator. Trigger this when an exit_elevator_failure
    /// is resolved.
    retry_elevator_exit: InputSlot<()>,
    /// The robot has successfully exited the elevator at the destination floor.
    success: Output<()>,
}

/// The config data structure for the "use_elevator" section builder.
#[derive(Clone, Serialize, Deserialize, JsonSchema)]
struct UseElevatorConfig {
    elevator_id: String,
    robot_id: String,
    to_floor: String,
}

registry.register_section_builder(
    SectionBuilderOptions::new("use_elevator")
        .with_default_display_text("Use Elevator")
        .with_description("Have a robot use an elevator"),
    move |builder: &mut Builder, config: UseElevatorConfig| {
        let UseElevatorConfig { elevator_id, robot_id, to_floor } = config;

        // Create a node for entering the elevator
        let enter_elevator = builder.create_node(enter_elevator(&robot_id, &elevator_id));

        // Create a fork-result that splits based on whether the robot
        // successfully entered the elevator
        let (enter_elevator_result, enter_elevator_fork) = builder.create_fork_result();
        builder.connect(enter_elevator.output, enter_elevator_result);

        // Create a node to move the elevator if the robot successfully entered
        let move_elevator = builder.create_node(move_elevator(&elevator_id, &to_floor));
        builder.connect(enter_elevator_fork.ok, move_elevator.input);

        // Create a fork-result that splits based on whether the elevator
        // successfully arrived at its destination
        let (move_elevator_result, move_elevator_fork) = builder.create_fork_result();
        builder.connect(move_elevator.output, move_elevator_result);

        // Create a node to localize the robot once the elevator arrives at the
        // correct floor
        let localize_robot = builder.create_node(localize_robot(&robot_id, &to_floor));
        builder.connect(move_elevator_fork.ok, localize_robot.input);

        // Create a fork-result that splits based on whether the robot
        // successfully localized
        let (localize_result, localize_fork) = builder.create_fork_result();
        builder.connect(localize_robot.output, localize_result);

        // Create a node to exit the elevator after the robot has localized
        let exit_elevator = builder.create_node(exit_elevator(&robot_id, &elevator_id));
        builder.connect(localize_fork.ok, exit_elevator.input);

        // Create a fork-result that splits based on whether the robot
        // successfully exited the lift
        let (exit_elevator_result, exit_elevator_fork) = builder.create_fork_result();
        builder.connect(exit_elevator.output, exit_elevator_result);

        UseElevatorSection {
            begin: enter_elevator.input,
            enter_elevator_failure: enter_elevator_fork.err,
            move_elevator_error: move_elevator_fork.err,
            retry_elevator_move: move_elevator.input,
            localization_error: localize_fork.err,
            retry_localization: localize_robot.input,
            exit_elevator_failure: exit_elevator_fork.err,
            retry_elevator_exit: exit_elevator.input,
            success: exit_elevator_fork.ok,
        }
    }
);

In the above example we create a custom struct named UseElevatorSection to define what the inputs and outputs of our section are. The begin input begins the overall process of having a robot use an elevator. Each stage of using the elevator provides a signal to indicate if a problem has happened. Diagrams that use this section have the opportunity to handle each error however they would like, and then signal a retry_... input slot to resume the process.

Message Operation Support

When registering node builders, the Request, Response, and Streams message types also get registered. You can add support for more operations by chaining them onto the NodeRegistrationBuilder.

Sections are somewhat similar: The message type of each field in the section will be automatically registered. However it wouldn’t make sense to use chain methods to register additional operations for those message types, because there are arbitrary number of messages within the section, and we can’t assume that all the message types will support all the operations we want to add.

Instead we use Rust’s procedural macro system:

#[derive(Section)]
struct UseDoorSection {
    /// Robot and Door cannot be cloned, serialized, or deserialized, so we
    /// disable those operations. But the overall message can still be unzipped,
    /// so we enable the minimal version of unzip, which will register both Robot
    /// and Door, but without the common operations (clone, serialize, and deserialize).
    #[message(no_clone, no_serialize, no_deserialize, unzip_minimal)]
    begin: InputSlot<(Robot, Door)>,

    /// UseDoorError is cloneable, serializable, and deserializable, so we don't
    /// need to disable anything here. The overall message type is a result, so
    /// we can mark this as a result to register the fork-result operation.
    #[message(result)]
    outcome: Output<Result<(), UseDoorError>>,

    /// We can also expose buffers inside the section.
    door_state: Buffer<DoorState>,
}

Connecting to sections

The syntax for connecting to section inputs looks similar to builtin targets, except you replace "builtin" with the ops key of the section. For example:

{
  "version": "0.1.0",
  "start": { "use_elevator_West3": "begin" },
  "ops": {
    "use_elevator_West3": {
        "type": "section",
        "builder": "use_elevator",
        "config": {
            "elevator_id": "West3",
            "robot_id": "cleaning_bot_2",
            "to_floor": "L5"
        },
        "connect": {
            "exit_elevator_failure": "handle_exit_failure",
            "success": { "builtin": "terminate" }
        }
    },
    "handle_exit_failure": {
        "type": "node",
        "builder": "center_in_elevator",
        "config": {
            "elevator_id": "West3",
            "robot_id": "cleaning_bot_2"
        },
        "next": { "use_elevator_West3": "retry_elevator_exit" }
    }
  }
}

This refers to the UseElevatorSection example above. We can create a workflow with this section and then use its outputs and inputs to create a loop that lets us customize how elevator exit failures are handled.

To pass a message to one of the section inputs we use the syntax { "use_elevator_West3": _ } where _ is one of the field names of UseElevatorSection. While the example above doesn’t include any buffers, the syntax for referencing section buffers is the same—the workflow builder will know whether you’re referring to an input slot or a buffer based on where you are making the reference.

To connect the outputs of the section to other operations in the same scope as the section, use the "connect": field of the section operation. Note that all operations inside the section are in the same scope as operations outside of the section. In general, connections can only be made between operations that are in the same scope.

Section Templates

There are graphical structures in a diagram that come up frequently which you may want to reuse without reimplementing them each time. Section builders allow those structures to be programmed through the native Rust API, but that locks them into only working for a static set of input and output message types. It also precludes creating the reusable section through a graphical editor—section builders need to be compiled and registered inside the executor.

Section templates are an alternative to section builders that allow you to define reusable sections in the same way that JSON diagrams themselves are defined. That allows section templates to be defined from outside of the executor, either through a graphical editor or a tool that can generate diagrams.

Whereas section builders have static message types for their inputs and outputs, section templates support message type inference, just like regular JSON diagram operations.

Warning

Sections do not nest their operations in a new scope, so triggering terminate or cancel within a section will terminate or cancel the scope that the section was placed inside.

Section Template Schema

The SectionTemplate schema consists of four fields:

  • "inputs": an input remapping that says which input slots inside the section should be exposed to operations that are outside of the section. These are similar to the InputSlot<_> fields of a section builder.
  • "outputs": a list of exposed outputs that can be connected to the input slots of operations outside of this section. These output names can be used as targets by operations inside the "ops" section of the section template. None of the operations inside "ops" can use any of these names as a key.
  • "ops": a dictionary of operation instances that exist inside this section, along with information about how they connect to each other. The syntax of this section is exactly the same as the "ops" section of regular diagrams and scope operations.

Referencing Section Templates

Section templates are bundled inside of each Diagram in the "templates" field. Keeping templates together with the diagram that uses them allows us to ensure cohesion between these dynamically built structures.

Each template inside "templates" has a unique key. To use a template inside of an operation, create a section operation with a template configuration that references the unique key of that template:

{
    "version": "0.1.0",
    "templates": {
        "modify_by_10": {
            "inputs": ["add", "multiply"],
            "outputs": ["added", "multiplied"],
            "ops": {
                "add": {
                    "type": "node",
                    "builder": "add_by",
                    "config": 10,
                    "next": "added"
                },
                "multiply": {
                    "type": "node",
                    "builder": "multiply_by",
                    "config": 10,
                    "next": "multiplied"
                }
            }
        }
    },
    "start": { "modify": "add" },
    "ops": {
        "modify": {
            "type": "section",
            "template": "modify_by_10",
            "connect": {
                "added": { "builtin": "terminate" }
            }
        }
    }
}

In the above example we have a section template that exposes two input slots ("add" and "multiply") and two outputs ("added" and "multiplied").

We instantiate the template by creating the "modify" operation in the diagram’s "ops". Its "type" is set to "section" and then it contains the field "template": "modify_by_10" which tells the workflow builder that the section should be built using the section template whose key is "modify_by_10".

The same section template can be instantiated as many times as you want by creating a new "type": "section" operation for each instance. Type inference for each instance of the section template will be done separately, so you can have different message types passing through each section instance. This is useful for creating generic reusable workflow structures.

Section templates can be instantiated in any "ops" dictionary, whether it belongs to a diagram, scope, or another section template. However you cannot have a circular dependency between section templates because that would result in infinite recursion. Any attempt at this will cause a CircularTemplateDependency when building the workflow.

Remapping Inputs

In the above example, the "inputs" field simply lists the keys of operations inside the section template whose input slots should be exposed to outside operations. For simple sections this is usually enough, but section templates are able to contain other sections. At that point it’s not sufficient to reference the operation key since a section may expose multiple different inputs.

To resolve this, there is an alternative syntax for input remapping:

{
    "version": "0.1.0",
    "templates": {
        "handle_fruit": {
            "inputs": {
                "apple": { "apple_slicer": "apple" },
                "banana": "banana_peeler"
            },
            "outputs": ["apple_slices", "peeled_bananas"],
            "ops": {
                "apple_slicer": {
                    "type": "section",
                    "template": "slice_apples",
                    "connect": {
                        "slices": "apple_slices"
                    }
                },
                "banana_peeler": {
                    "type": "node",
                    "builder": "peel_banana",
                    "next": "peeled_bananas"
                }
            }
        },
        "slice_apples": {
            "inputs": {
                "apple": "apple_buffer"
            },
            "outputs": ["slices"],
            "ops": {
                "apple_buffer": { "type": "buffer" },
                "listener": {
                    "type": "listen",
                    "buffers": ["apple_buffer"],
                    "next": "slice"
                },
                "slice": {
                    "type": "node",
                    "builder": "slice_apples",
                    "stream_out": {
                        "slices": "slices"
                    },
                    "next": { "builtin": "dispose" }
                }
            }
        }
    }
}

In the above example you can see how the "handle_fruit" template is able to redirect exposed inputs to the "apple_slicer" section nested inside it:

"inputs": {
    "apple": { "apple_slicer": "apple" },
    "banana": "banana_peeler"
},

Meanwhile the "banana" input can be remapped to an operation inside the section template but with a different name, "banana_peeler".

This approach to remapping also works for exposed buffers.

Middlewares

When using the native Rust API of crossflow, it’s straightforward to tap into whatever middlewares you need, as long as a Rust library exists for it. Just add the relevant library to your Cargo.toml and spawn a service or callback that uses the middleware’s API.

For JSON diagrams, you’ll need your executor to register node builders for each middleware that you want to use. Registering node builders that can provide comprehensive coverage of all of a middleware’s capabilities is a daunting task, so we provide out-of-the-box support for the middlewares that we anticipate will be most useful for our community of users.

The number of middlewares that crossflow provides out-of-the-box support will grow over time as our user base grows. Moreover, downstream users are always welcome to write third-party libraries that register node builders for any middlewares we haven’t covered yet. Our out-of-the-box support uses public APIs of crossflow, so there would be no difference between first-party and third-party support.

Support for these middlewares is not turned on by default. We use Rust’s feature system to allow downstream users to toggle the support on or off. Since each middleware brings a potentially large number of dependencies with it, making the features opt-in will spare users from taking on a large volume of depenencies that they don’t need.

Here are the names of the features for activing various middlewares:

  • grpc - uses tonic and prost-reflect to provide clients for dynamically loaded gRPC services.
  • zenoh - provides full support for zenoh publishers, subscribers, and queriers. Payloads can be protobuf via prost-reflect or JSON strings.
  • ros2 (only on the ros2 branch) - provides registration helpers for ROS 2 subscriptions, publishers, service clients, and action clients via rclrs. For now each message/service/action type needs to be compiled in, but this requirement will relax when rclrs supports runtime loading of message definitions.

gRPC

Support for gRPC is implemented using

Use the grpc feature flag of crossflow to enable this support:

# Cargo.toml

[dependencies]
crossflow = { version = "*", features = ["diagram", "grpc"] }

Enabling gRPC

Use registry.enable_grpc(rt) to register premade node builders that can build gRPC clients into a workflow:

use crossflow::prelude::*;
use tokio::runtime::Runtime;

// Initialize a regular registry
let mut registry = DiagramElementRegistry::new();

// Use a tokio runtime to enable gRPC in the registry. Tonic is only compatible
// with tokio, so a tokio runtime is necessary to execute it.
//
// This will add gRPC node builders to the registry.
let rt = Arc::new(Runtime::new().unwrap());
registry.enable_grpc(Arc::clone(&rt));

// Run the tokio runtime on a separate thread.
//
// Bevy uses smol-rs for its async runtime, which is not directly compatible with
// tokio. We let tokio run on a separate thread and use channels to pass data
// between the runtimes.
let (exit_notifier, exit_receiver) = tokio::sync::oneshot::channel::<()>();
std::thread::spawn(move || {
    let _ = rt.block_on(exit_receiver);
});
Descriptor Pool

Additionally you will need to use decode_global_file_descriptor_set to add your proto definitions to the global descriptor pool. The node builders will look for service and protobuf definitions in the global descriptor pool. If a description isn’t present in the pool, the node builders will return a NodeBuildingError when you attempt to build a workflow from the diagram.

We leave it open-ended how descriptors are sent to the executor. There are many viable ways to do this using various middlewares or compiling the definitions into the executor itself. Users are welcome to come up with reusable third-party libraries to implement specific approaches and share them with the ecosystem.

Configuration

Regardless of client type, all gRPC nodes are configured using GrpcConfig. That config allows you to specify the service and method to call, as well as the URI of where the service should be found. You can optionally specify a timeout that will have the client quit if the response does not arrive by then.

As mentioned above whatever service type that you configure your node to use needs to be present within the executor’s global descriptor pool before you attempt to build a workflow from the diagram.

Unary and Server-Streaming Requests

Unary and server-streaming requests are both covered by the same node builder: "grcp_request".

Request — The grpc_request nodes take in a single JsonMessage which will be used as the single request message sent to the server. Each JsonMessage sent in activates a new gRPC client←→server session where the server will receive this single request. The gRPC client itself is instantiated when the workflow is originally built, and will be reused for all requests sent into this node. Separate sessions of the workflow will also use the same gRPC client whenever the same node is activated.

“out” — The response messages received from the gRPC server will be sent out of the stream named "out". This is the case whether the service is unary or server-streaming. This allows the node to have a single consistent structure regardless of whether the server ends up sending zero, one, or arbitrarily many response messages.

Response — The final response of the node will be a Result whose value is Ok after the server has ended the connection with no errors, or Err if an error came up during the request. The Err will contain a string rendering of a Status, based on gRPC status codes.

“canceller” — In case you want to cancel the gRPC request, you can capture this UnboundedSender, either storing it in a buffer or passing it to a node that can make a decision about when to cancel. Passing in a Some(String) will have the string included inside the string-rendered Status of the final response’s Err.

grpc_request-node

Bidirectional and Client-Streaming Requests

Bidirectional and client-streaming requests are both covered by the "grpc_client" node builder. Technically this node builder can also support unary and server-streaming requests, but the “grpc_request” node builder is more ergonomic for those.

What makes "grpc_client" different is it takes in an UnboundedReceiver of JsonMessage instead of just a single JsonMessage. This allows any number of messages to be streamed from the client (inside the workflow) out to the server within a single gRPC client←→server session.

Each UnboundedReceiver sent into this node will active a new independent gRPC client←→server session using the same client. Every message sent through the UnboundedSender associated with that receiver will be streamed out over the same gRPC client←→server session.

grpc_request-node

Zenoh

Support for zenoh is implemented using

Use the zenoh feature flag of crossflow to enable this support:

# Cargo.toml

[dependencies]
crossflow = { version = "*", features = ["diagram", "zenoh"] }

Enabling zenoh

Use registry.enable_zenoh(_) to register premade node builders that can build zenoh subscribers, publishers, and queriers:

use crossflow::prelude::*;

let mut registry = DiagramElementRegistry::new();
registry.enable_zenoh(Default::default());

enable_zenoh(_) takes in a zenoh config which you can feel free to customize.

Descriptor Pool

Just like for gRPC support, you will need to add your proto definitions to the global descriptor pool if you want to use protos as your payloads. If the needed proto definitions are missing from the descriptor pool, a NodeBuildingError will be produced.

JSON payloads do not require any additional steps.

Configuration

Subscribers, publishers, and queriers each have separate node builders with different configuration types tailored to the specific information needed by each:

The mandatory fields include:

key — the key expression for the connection. Used by all three.

encoder — either "json" or { "protobuf": "_" } to indicate how to encode outgoing messages. Used by publishers and queriers.

decoder — either "json" or { "protobuf": "_" } to indicate how to decode incoming messages. Used by subscriptions and queriers.

Besides the mandatory fields, each config struct provides comprehensive coverage of the quality of service and other settings for each type of zenoh connection. Refer to the zenoh documentation for in-depth descriptions of the qualities of service.

Subscription

Subscriptions, provided by the "zenoh_subscription" node builder, allow your workflow to receive messages from anonymous publishers that publish to a key that’s compatible with the key expression of your subscription. Once the node gets activated, it will stream out any incoming messages until it gets cancelled or until the scope it’s in finishes.

Request — A simple trigger that just prompts the node to begin listening for messages and streaming them out. If the node is already active, triggering it again will have no effect. If the node was previously active but cancelled, then triggering the node again will restart it.

“out” — Any messages received by the subscription will be sent out of the "out" stream.

“out_error” — If an error happens while decoding an incoming message, the error message will be streamed from "out_error". The node will continue running as normal even if these errors occur, but each of these messages indicates an incoming message that failed to be decoded.

“canceller” — In case you want to cancel the subscription, you can capture this UnboundedSender, either storing it in a buffer or passing it to a node that can make a decision about when to cancel. Triggering this will cause the final response of the node to be Ok(msg) where msg is whatever JsonMessage you pass into this sender. Since subscriptions can last indefinitely, this is the only way to stop the node from running before the scope terminates.

Response — The final response of the node is a Result whose value is Ok if the subscription was cancelled using the "canceller", or Err if a ZenohSubscriptionError occurred.

zenoh_subscription-node

Publisher

Publishers, provided by the "zenoh_publisher" node builder, allow your workflow to send messages to anonymous subscribers whose key expressions are compatible with the "key" that you configure for your node.

The zenoh publisher will be initialized and connect when the workflow is first built, and then every message sent to this node from any workflow session will be sent out over that same publisher. Note that this has some potential side-effects. If you configure the node to support late joiners then the late joiners will receive old messages even if the workflow session that sent those messages is no longer active.

Request — Pass in a JsonMessage to publish. If your node is configured to use protobuf encoding, the node will return an error message if the input JsonMessage failed to serialize into the intended protobuf message.

Response — A Result which will be Ok if publishing the message was successful, or a ZenohPublisherError if a problem occurred.

zenoh_publisher-node

Querier

Queriers, provided by the "zenoh_querier" node builder, allow your workflow to send off a request message to a queryable, which is similar to a service. The queryable will respond to the request message with some number of responses and then end the connection.

The layout of a querier node is the same as "zenoh_subscription" except

  • The Request is a JsonMessage. If a protobuf encoder was chosen, any failure to serialize the message into the intended protobuf type will have this node respond with an Err.
  • The response will return Ok(null) when the query is finished. It will also return Ok(msg) if some msg is sent to the UnboundedSender provided by "canceller". In the event of an error, it will return an Err containing ZenohQuerierError.

zenoh_querier-node

ROS 2

Support for ROS 2 is implemented using rclrs. We provide generic registration functions that allow you to register node builders for individual messages, services, and actions. Currently this means message, service, and action definitions need to be registered into the executor at compilation time. This restriction will be lifted once rclrs supports dynamic messages.

Use the ros2 feature flag of crossflow to enable this support:

# Cargo.toml

[dependencies]
crossflow = { version = "*", features = ["diagram", "ros2"] }

Setup

Caution

The ROS 2 support currently involves some complicated colcon setup and therefore is being kept to the ros2 branch of crossflow for now. We should be able to merge it into main once rclrs supports dynamic messages.

Follow the instructions on the README of the ros2 branch to set up a colcon workspace with the necessary dependencies.

Note

This currently uses forks for rosidl_rust and rosidl_runtime_rs to have schemars support. There will be an effort to move this support upstream to the original repos.

Registering Primitives

To register support for a specific message, service, or action, start by creating the node that will be used by the ROS 2 primitives.

Pass that node along to registry.enable_ros2(node), which will provide an API that you can chain message, service, and action registrations onto.

use crossflow::prelude::*;
use rclrs::*;

use nav_msgs::{
    msg::Path,
    srv::GetPlan,
    action::GetMap,
};

// Create an rclrs executor and node.
let context = Context::default_from_env().unwrap();
let mut executor = context.create_basic_executor();
let node = executor.create_node("crossflow_executor").unwrap();

// Use the node to register the specific message, service, and action types.
let mut registry = DiagramElementRegistry::new();
registry
    .enable_ros2(node)
    .register_ros2_message::<Path>()
    .register_ros2_service::<GetPlan>()
    .register_ros2_action::<GetMap>();

// Spin the executor on a separate thread since it needs to run alongside Bevy.
std::thread::spawn(move || {
    executor.spin(Default::default());
});

Configuration

Subscriptions, publishers, and service clients are configured using PrimitiveOptions in JSON format.

Since action clients have multiple independent QoS to consider, they are configured using ActionClientConfig in JSON format.

In both cases, the topic/service/action name is the only mandatory field. All the QoS settings are optional. Any unset QoS will use the default setting for whatever primitive it’s being applied to.

Subscription

Subscriptions allow your workflow to receive messages from anonymous publishers that publish to the same topic that you’ve subscribed to. Once the node gets activated, it will stream out any incoming messages until it gets cancelled or until the scope it’s in finishes.

Request — A simple trigger that just prompts the node to begin listening for messages and streaming them out. Note that unlike the zenoh subscription it’s possible to activate redundant instances of this node. This issue is being tracked by #158.

“out” — Any messages received by subscription will be sent out of the "out" stream. The message type will be the rosidl message struct that was registered rather than a JsonMessage.

“canceller” — In case you want to cancel the subscription, you can capture this [UnboundedSender], either storing it in a buffer or passing it to a node that can make a decision about when to cancel. Triggering this will cause the final response of the node to be Ok(msg) where msg is whatever JsonMessage you pass into this sender. Since subscriptions can last indefinitely, this is the only way to stop the node from running before the scope terminates.

Response — The final response of the node is a Result whose value is Ok if the subscription was cancelled using the "canceller", or Err if an error occurred.

ros2-subscription-node

Publisher

Publishers allow your workflow to send messages to anonymous subscribers whose topics match the topic of your publisher.

The ROS 2 publisher will be created when your workflow is created and reused across all sessions of the workflow. Note that this has some potential side-effects. If you configure the node to support late joiners then the late joiners will receive old messages even if the workflow session that sent those messages is no longer active.

Request — Pass in a message of type T to publish, where T is the message type that the node builder is meant for.

Response — A Result which will be Ok if publishing the message was successful, or Err if the message failed to publish.

ros2-publisher-node

Client

Clients (for services) allow your workflow to send a request to a service and get back a response. Clients and services have a 1-to-1 relationship with each other: For every one request you send, you get exactly one response back (in the absence of errors).

Request — Pass in the Srv::Request type associated with the service Srv that was registered for this node builder.

Response — A Result which will be Ok containing the Srv::Response from the server when the response gets delivered successfully. If the canceller is triggered before the response arrives, this will be Err containing the cancellation message. If any other error happens that interferes with the service, this will return an Err(JsonMessage::String(_)) describing the error.

“canceller” — Similar to the "canceller" for subscriptions, this allows the service to be cancelled before the response has finished arriving.

ros2-service_client-node

Action Client

ROS 2 actions represent a combination of topics and services that altogether describe an “action” (usually a physical process) which takes place over a period of time and involves incremental updates while it makes progress.

Anyone unfamiliar with ROS 2 actions is encouraged to read through the action tutorial.

Request — Pass in the A::Request type associated with action A that was registered for this node builder.

Response — A Result which will be Ok containing the A::Response and GoalStatusCode from the action server when the response gets delivered successfully. If an error happens internally that makes the result undeliverable, a string message describing the error will be returned in Err.

“feedback” — Stream of the action’s feedback messages.

“status” — Stream of the action’s goal status updates.

“canceller” — Send a request to cancel the action. Unlike subscriptions and services, an action server gets notified about a cancellation and can respond to it accordingly. Therefore triggering this does not immediately cancel the node. Instead the cancellation request will be sent to the action server, and "cancellation_response" will stream out the response from the action server, along with the JsonMessage of the cancellation request that it’s responding to. Once the action is successfully cancelled by the action server, the result and status will be sent out of the node’s final response wrapped in Ok.

“cancellation_response” — Stream of the action server’s responses to any cancellation requests sent to "canceller". The message that was passed into "canceller" will be included in the output. If the action’s communication is working correctly, there will be one message sent from this stream for every message sent through the "canceller" of an active action.

ros2-action_client-node

Multi-Agent State Machines

Listeners are excellent at multiplexing messages coming from many sources at once. This makes them well equipped to manage state machines that involve multiple independent agents that need to be orchestrated.

Suppose we have three robots:

  1. A machining robot that takes raw material and cuts (machines) it into a particular shape.
  2. A painting robot that takes machined material and applies paint to it.
  3. A tending robot that moves material around the work zone, passing it all between the other machines and the supply areas.

layout-multi-robot

We also have three supply areas within the work zone:

  1. The raw material supply is where raw material gets dropped off to be processed.
  2. The machined material supply is where machined material is kept until the painting robot is available to work on it.
  3. The finished material supply is where painted (finished) material is placed until it can be taken away.

For the overall flow we want the raw material to be moved to the machining robot, but only when the machining robot is available. We want machined material to be moved to the painting robot but only when the painting robot is available, otherwise it should be moved to the machined material supply. Finally painted material should be moved to the finished material supply. Each time material is moved to the machining robot or the painting robot, that robot should begin working on the material that it received.

We can express that with the following workflow:

listen-multi-robot

We define six buffers in total: one for each of the three robots and one for each of the three material supplies. Each of these buffers can be thought of as representing a variable in the multi-agent state machine. Each time a value changes for one of these variables, a relevant service will be activated to make decisions about what state transitions should take place:

1. send raw material for machining

Connected to the raw_material_supply, the machining_robot_state, and the tending_robot_state, this service will be activated any time one of these events happen:

  • New material is added to raw_material_supply
  • The machining robot becomes available
  • The tending robot becomes available

Technically there are other events that can activate the service (e.g. the tending robot begins a new task), but the service will exit early when evaluating irrelevant events.

When any of the above events trigger the service, it will check if all of the following conditions are met:

  • Raw material is available in the raw_material_supply
  • The machining robot is available
  • The tending robot is available

When all conditions are met this service will:

  • Claim the tending robot by setting the tending_robot_state buffer to Busy
  • Claim the machining robot by setting the machining_robot_state buffer to Busy
  • Begin this async routine:
    • Command the tending robot to pull an item from the raw material supply
    • Reduce the count in the raw_material_supply buffer
    • Place the item in the machining robot area
    • Release the tending robot by setting tending_robot_state buffer to available
    • Command the machining robot to perform its machining process
    • When the machining robot is finished, change the value in machining_robot_state to Finished

If any one of the conditions is not met, then the service will not do anything.

2. send machined material to painting area OR into queue for later

Connected to machining_robot_state, machined_material_supply, painted_robot_state, and tending_robot_state, this service will be activated any time one of these events happens:

  • The machining robot finishes a job
  • The tending robot becomes available
  • The painting robot becomes available

Technically there are other events that can activate the service (e.g. the machining robot or tending robot begins a new task), but the service will exit early when evaluating irrelevant events.

When any of the above events trigger the service, it will check whether the conditions are met for each of the following scenarios, in order, until it finds a scenario that is satisfied. The first one satisfied will be executed and the rest will be skipped:

🠚 move material from machining robot to painting robot

Conditions:

  • machining_robot_state is Finished
  • painting_robot_state is Available
  • tending_robot_state is Available

Execution:

  • Claim the tending robot by setting tending_robot_state buffer to Busy
  • Claim the painting robot by setting painted_robot_state buffer to Busy
  • Begin this async routine:
    • Command the tending robot to pull the material from the machining robot area
    • Once the machining robot area is clear, set machining_robot_state to Available
    • Move the material to the painting robot area
    • Release the tending robot by setting tending_robot_state buffer to available
    • Command the painting robot to perform its painting process
    • When the painting robot is finished, change the value in painting_robot_state to Finished

This transition has the benefit of efficiently making the machining robot available and beginning the painting in one fell swoop, skipping the intermediate machined material supply.

🠚 move material from machining robot to the machined_material_supply

If the first scenario was skipped then either the painting robot is busy or the machining robot had no material available. We should prioritize clearing the machining robot so more material can be machined as soon as possible.

Conditions:

  • machining_robot_state is Finished
  • machined_material_supply has capacity for more items
  • tending_robot_state is Available

Execution:

  • Claim the tending robot by setting tending_robot_state buffer to Busy
  • Begin this async routine:
    • Command the tending robot to pull the material from the machining robot area
    • Once the machining robot area is clear, set machining_robot_state to Available
    • Move the material to the machined material area
    • Increment the machined_material_supply value by one
    • Release the tending robot by setting tending_robot_state buffer to Available

🠚 move material from machined_material_supply to painting robot

If the first two scenarios were skipped then we cannot clear out the machining robot at this time. Now we should check if we can move any machined material from the machined_material_supply to the painting robot.

Conditions:

  • machined_material_supply is greater than zero
  • painting_robot_state is Available
  • tending_robot_state is Available

Execution:

  • Claim the tending robot by setting tending_robot_state buffer to Busy
  • Claim the painting robot by setting painted_robot_state buffer to Busy
  • Begin this async routine:
    • Command the tending robot to pull material from the machined_material_supply
    • Once the material is retrieved, decrement the value in the machined_material_supply buffer
    • Move the material to the painting robot area
    • Release the tending robot by setting tending_robot_state buffer to available
    • Command the painting robot to perform its painting process
    • When the painting robot is finished, change the value in painting_robot_state to Finished

🠚 else

If none of the above scenarios are satisfied, then this service will do nothing.

3. move finished material to pickup area

The final service is responsible for clearing material from the painting robot. Connected to painting_robot_state, finished_material_supply, and tending_robot_state, this service will be activated any time one of these events happens:

  • The painting robot finishes a job
  • Material is taken from the finished_material_supply
  • The tending robot becomes available

Technically there are other events that can activate the service (e.g. the tending robot begins a new task), but the service will exit early when evaluating irrelevant events.

When any of the above events triggers the service, it will check if all of the following conditions are met:

  • finished_material_supply has capacity for more items
  • painting_robot_state is Finished
  • tending_robot_state is Available

When all conditions are met this service will:

  • Claim the tending robot by setting the tending_robot_state buffer to Busy
  • Begin this async routine:
    • Command tending robot to pull material from the painting robot area
    • Once the painting robot area is clear, set painting_robot_state to Available
    • Move the material to the finished_material_supply
    • Increment the finished_material_supply value by one
    • Release the tending robot by setting tending_robot_state buffer to Available

Conclusion

The above workflow defines a highly parallelized process involving three agents that opportunistically keeps material moving whenever the right agents are available. Underlying this workflow is a state machine that combines the state information of all three agents and three material supply areas. Transitions for this state machine are asynchronous and can overlap with each other without negatively interfering with each other because their activities are orchestrated through their shared use of the buffers.

The logic of each of the state transitions that can be performed are encapsulated by three different services that do not need to know about each other’s existence. Each of these services can be implemented as an async function or by defining a separate lower-level workflow for each of them.

This kind of free-form reactive async system cannot be expressed by most graphical programming paradigms. Behavior Trees generally cannot express this kind of open-ended reactivity, and even most Petri Net implementations cannot express async transitions that gradually modify state variables (“places” in Petri Net terminology) throughout the execution of the transition.