Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Creating a Node

Most of the useful work that happens inside a workflow is done by nodes. A node can be implemented by any provider. Providers will often be services—including blocking, async, and continuous services—but could also be callbacks or maps.

From a Service

Suppose we have a sum function defined to be a blocking service:

fn sum(In(input): BlockingServiceInput<Vec<f32>>) -> f32 {
    let mut sum = 0.0;
    for value in input.request {
        sum += value;
    }
    sum
}

We can spawn this as a service and then use it to create a node inside a workflow:

// Spawn a service that we can use inside a workflow
let service = commands.spawn_service(sum);

// Spawn a workflow and use the service inside it
let workflow = commands.spawn_io_workflow(
    move |scope, builder| {
        let node = builder.create_node(service);
        builder.connect(scope.start, node.input);
        builder.connect(node.output, scope.terminate);
    }
);

A few things to note:

  • We spawn the service outside of the workflow and then use the move keyword to move it into the closure that builds the workflow.
  • We use Builder::create_node to create a workflow node that will run the sum service.
  • After the node is created, we can access its input through node.input and its output through node.output.
  • builder.connect(ouput, input) will connect an Output to an InputSlot
    • scope.start is an Output inside the scope that will fire off exactly one message per workflow session. That one message is the input message that was sent into the workflow. It gets fired off as soon as the workflow session begins.
    • By connecting scope.start to node.input, we are passing the input message of the workflow directly into the node.
    • To pass back the sum as the output of the workflow, we connect node.output to scope.terminate.
    • As soon as any message is passed into scope.terminate, all activity in the workflow will terminate, and the first message that was passed into scope.terminate for this session will be sent out of the workflow as the workflow’s output message.
  • We don’t need to explicitly specify the Request and Response types of the workflow because the compiler can infer those from the two builder.connect(_, _) calls.

Spawning a service inside workflow building closure

Recommended practice is to spawn services outside of the workflow building closure and then move them into the closure. This allows services to have greater reusability, as they could be copied into multiple different workflows. However if you do want to spawn the service from inside the workflow building closure, that option is available:

let workflow = commands.spawn_io_workflow(
    move |scope, builder| {
        // Spawn a service using the builder's commands
        let service = builder.commands().spawn_service(sum);

        // Create the node using the newly spawned service
        let node = builder.create_node(service);
        builder.connect(scope.start, node.input);
        builder.connect(node.output, scope.terminate);
    }
);

From a Callback

Using a callback instead of a service looks much the same. The only difference is that callbacks don’t need to be spawned:

// Define a closure to perform a sum
let f = |request: Vec<f32>| -> f32 {
    request.into_iter().fold(0.0, |a, b| a + b)
};
// Convert the closure into a Callback
let callback = f.into_blocking_callback();

// Spawn a workflow and use the callback inside it
let workflow = commands.spawn_io_workflow(
    move |scope, builder| {
        let node = builder.create_node(callback);
        builder.connect(scope.start, node.input);
        builder.connect(node.output, scope.terminate);
    }
);

The fact that it was implemented as a callback instead of as a service makes no difference to the workflow builder. It still gets created as a Node with InputSlot and Output types that match the Request and Response types, respectively, of the callback.

From a Map

Maps are an extremely common element in workflows. Their ability to perform quick data transformations makes them invaluable for bridging the gap between different services. They are common enough that crossflow provides two special APIs to make them easier to write:

create_map_block

A blocking map is a short-lived closure that performs a quick calculation or data transformation. These are very useful for adapting message types in-between services that are chained together.

To create a blocking map you can simply pass a closure into the Builder::create_map_block method:

let workflow = commands.spawn_io_workflow(
    |scope, builder| {
        let node = builder.create_map_block(|request: Vec<f32>| {
            request.into_iter().fold(0.0, |a, b| a + b)
        });

        builder.connect(scope.start, node.input);
        builder.connect(node.output, scope.terminate);
    }
);

You might notice that maps are often defined within the workflow building closure itself even though Services and Callbacks are usually created outside. This isn’t a requirement, but it is usually the most ergonomic way to add a map. Reusability is less of a concern with maps than it is for Services or Callbacks.

create_map_async

An async map is simply used to run a basic async function. Suppose you have an async function like:

async fn get_page_title(url: String) -> Option<String> {
    let http_response = trpl::get(&url).await;
    let response_text = http_response.text().await;
    trpl::Html::parse(&response_text)
        .select_first("title")
        .map(|title| title.inner_html())
}

You can use this async function as a map by simply passing its name into Builder::create_map_async:

let workflow = commands.spawn_io_workflow(
    |scope, builder| {

        let node = builder.create_map_async(get_page_title);

        builder.connect(scope.start, node.input);
        builder.connect(node.output, scope.terminate);
    }
);

Alternatively you can define the async map inline as a closure. Just take note that Rust does not currently support async closures, so you will need your closure to return an async move { ... } block, like this:

let workflow = commands.spawn_io_workflow(
    |scope, builder| {
        let node = builder.create_map_async(|url: String| {
            async move {
                let http_response = trpl::get(&url).await;
                let response_text = http_response.text().await;
                trpl::Html::parse(&response_text)
                    .select_first("title")
                    .map(|title| title.inner_html())
            }
        });

        builder.connect(scope.start, node.input);
        builder.connect(node.output, scope.terminate);
    }
);

Chain

Creating a map through the Builder API is necessary if you need to connect multiple outputs into the same map node, but in most cases you’ll want to create a map that just transforms data as it passes from one operation to another. A more convenient way to do that is with a Chain, discussed later.