Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Output Streams

So far we’ve seen services that return a single output as a final response to each request it receives. This is enough for chaining together sequential atomic services, but some actions are carried out over a period of time and involve stages or incremental progress that you may want to track.

Crossflow services have the option of using output streams to transmit messages while processing a request. Before the final response is sent out, a service can stream out any number of messages over any number of output streams. Each stream can have its own message type.

Streams can be anonymous—having no name, identified simply by what its output type is—but best practice is to use named streams. Giving names to streams adds clarity to the purpose or meaning of each stream.

Tip

If you want to know how to receive data from output streams, see Receiving from Output Streams.

To see how to connect output streams inside a workflow, see Connecting Output Streams.

Anonymous Streams

While it’s not the recommended approach, we’ll start by explaining how anonymous streams work. If you are confident that your service will only ever produce one stream or that the type information is enough to make the purpose of the stream clear, then anonymous streams may be acceptable for your use case.

In this example we implement a Fibonacci sequence service that streams out the values of the sequence and then ultimately just returns a trigger () to indicate that the service is finished running:

fn fibonacci_example(
    In(input): BlockingServiceInput<u32, StreamOf<u32>>
) {
    let order = input.request;
    let stream = input.streams;

    let mut current = 0;
    let mut next = 1;
    for _ in 0..order {
        stream.send(current);

        let sum = current + next;
        current = next;
        next = sum;
    }
}

We use StreamOf<u32> to define an anonymous (unnamed) stream of u32. This stream allows us to emit a stream of outputs where each message is a new item in the Fiboanacci sequence, as opposed to the conventional “final response” that would require us to return a single message of Vec<u32>. While a Fibonacci sequence is a trivial use case, this same approach can be used to transform any single input into a stream of any number of outputs.

Multiple Anonymous Streams

In the previous example you might have noticed

let stream = input.streams;

The plural input.streams was renamed to a singular stream variable. This is because in general services expect that multiple streams need to be supported. In the case of StreamOf<T> the potentially multiple streams get reduced to one stream.

To get multiple anonymous streams, you can use a tuple. Here’s a slightly tweaked version of the previous example that additionally streams out a stringified version of the sequence:

fn fibonacci_string_example(
    In(input): BlockingServiceInput<u32, (StreamOf<u32>, StreamOf<String>)>,
) {
    let order = input.request;
    let u32_stream = input.streams.0;
    let string_stream = input.streams.1;

    let mut current = 0;
    let mut next = 1;
    for _ in 0..order {
        u32_stream.send(current);
        string_stream.send(format!("{current}"));

        let sum = current + next;
        current = next;
        next = sum;
    }
}

Now input.streams contains two streams, packed together as a tuple. Since they are packed into a tuple, they continue to be anonymous (unnamed), but they can be accessed separately and send different outputs.

Warning

When you use multiple anonymous output streams, you must not include the same message type multiple times, e.g. (StreamOf<u32>, StreamOf<u32>). This cannot be caught at compile time and will lead to confusing behavior, because when this service is spawned it will appear to have multiple output streams, but only one of the Outputs will receive all of the messages, even if the service sends messages to both.

Stream Pack

While anonymous streams are an option, it’s best to always use named streams, especially if you are creating services for a diagram.

To create named streams, you should define a struct where each field represents a different output stream. The data type of each field will be the message type of its associated stream. Simply apply #[derive(StreamPack)] to that struct, and now you can use those streams in your system:

#[derive(StreamPack)]
struct FibonacciStreams {
    integers: u32,
    strings: String,
}

fn fibonacci_stream_pack_example(
    In(input): BlockingServiceInput<u32, FibonacciStreams>,
) {
    let order = input.request;
    let streams = input.streams;

    let mut current = 0;
    let mut next = 1;
    for _ in 0..order {
        streams.integers.send(current);
        streams.strings.send(format!("{current}"));

        let sum = current + next;
        current = next;
        next = sum;
    }
}

Now input.streams contains a separate named field for each of the streams in the stream pack. Each of those fields lets you send output messages for each stream. The compiler will ensure that the messages you send are compatible with the stream’s type.

Tip

Unlike with anonymous streams, you can have multiple named streams with the same message type.

Async Streams

The previous examples show how to use streams in a blocking service to generate a stream of data from a single input. While that is a valid use of streams, there is another conceptually important use: streaming updates or products out of an ongoing live service. The streams that come out of a blocking service will be delivered at the same time at the final response due to the nature of blocking services, so live updates are only relevant for async and continuous services.

Below is a stub of a navigation service. It takes in a NavigationRequest that includes a destination and a BufferKey for the robot’s position. While the robot makes progress towards its destination, the service will emit NavigationStreams including location data, log data, and any errors that come up along the way.

#[derive(StreamPack)]
struct NavigationStreams {
    log: String,
    location: Vec2,
    error: NavigationError,
}

#[derive(Clone)]
struct NavigationRequest {
    destination: Vec2,
    robot_position_key: BufferKey<Vec2>,
}

fn navigate(
    In(input): AsyncServiceInput<NavigationRequest, NavigationStreams>,
    nav_graph: Res<NavigationGraph>,
) -> impl Future<Output = Result<(), NavigationError>> + use<> {
    // Clone the navigation graph resource so we can move the clone into the
    // async block.
    let nav_graph = (*nav_graph).clone();

    // Create a callback for fetching the latest position
    let get_position = |
        In(key): In<BufferKey<Vec2>>,
        access: BufferAccess<Vec2>,
    | {
        access.get_newest(&key).cloned()
    };
    let get_position = get_position.into_blocking_callback();

    // Unpack the input into simpler variables
    let NavigationRequest { destination, robot_position_key } = input.request;
    let location_stream = input.streams.location;

    // Begin an async block that will run in the AsyncComputeTaskPool
    async move {
        loop {
            // Fetch the latest position using the async channel
            let position = input.channel.request_outcome(
                robot_position_key.clone(),
                get_position.clone()
            )
                .await
                .ok()
                .flatten();

            let Some(position) = position else {
                // Position has not been reported yet, so just try again later.
                continue;
            };

            // Send the current position out over an async stream
            location_stream.send(position);

            // TODO: Command the robot to proceed towards its destination
            // TODO: Break the loop when the robot arrives at its destination
        }

        Ok(())
    }
}

A few important details of the above example:

  • We use the fn _(AsyncServiceInput) -> impl Future<Output = _> syntax so we can have an async service that accesses the NavigationGraph resource at startup.
  • We use the async move { ... } syntax to create the long-running Future that will run in the AsyncComputeTaskPool.
  • All relevant input data is unpacked and then moved into the async block.
  • We create a callback to periodically fetch data from the position buffer while our async block is running using the async channel. This is the most effective way for async tasks to access Bevy ECS data.
  • The streams provided to an async service can be moved into an async block, allowing messages to be streamed out while the async task is still being processed.

Continuous Streams

The output streams of blocking and async services both have a similar ergonomics, even if their behavior has some notable differences. For continuous services, output streams allow you to stream out messages while a request is still in flight, similar to async services.

However the API has an important difference: a continuous service can see all its active requests (orders) at once, and its streams are isolated per order:

fn continuous_navigate(
    In(srv): ContinuousServiceInput<NavigationRequest, Result<(), NavigationError>, NavigationStreams>,
    mut continuous: ContinuousQuery<NavigationRequest, Result<(), NavigationError>, NavigationStreams>,
    position_access: BufferAccess<Vec2>,
    nav_graph: Res<NavigationGraph>,
) {
    let Some(mut orders) = continuous.get_mut(&srv.key) else {
        // The service provider has despawned, so we can no longer do anything
        return;
    };

    orders.for_each(|order| {
        let NavigationRequest { destination, robot_position_key } = order.request().clone();
        let Some(position) = position_access.get_newest(&robot_position_key) else {
            // Position is not available yet
            return;
        };

        order.streams().location.send(*position);

        // TODO: Command the robot to proceed towards its destination
        // TODO: Use order.respond(Ok(())) when the robot arrives at its destination
    });
}

In the above example you can see that the output streams are accessed through order.streams() instead of input.streams. Each order comes from a different request message passed into the continuous service—potentially from many different workflows or sessions at once. Forcing you to send streams through the OrderMut API ensures that the messages you stream out are only going to the specific order that they’re meant for.