Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Receiving from Output Streams

Some services also have output streams that you may want to receive data from. In that case you will need to take a [Capture][Capture] instead of only taking the [Outcome]:

let mut capture = commands.request(String::from("-3.14"), parsing_service).capture();

The parsing_service provides this ParsedStreams stream output pack:

#[derive(StreamPack)]
struct ParsedStreams {
    /// Values that were successfully parsed to u32
    parsed_as_u32: u32,
    /// Values that were successfully parsed to i32
    parsed_as_i32: i32,
    /// Values that were successfully parsed to f32
    parsed_as_f32: f32,
}

The string value "3.14" should be able to parse to f32 but not u32 or i32. We can receive whichever values were produced with the following code:

let _ = capture.outcome.await;
println!("The service has finished running.");
while let Some(value) = capture.streams.parsed_as_u32.recv().await {
    println!("Parsed an unsigned integer: {value}");
}

while let Some(value) = capture.streams.parsed_as_i32.recv().await {
    println!("Parsed a signed integer: {value}");
}

while let Some(value) = capture.streams.parsed_as_f32.recv().await {
    println!("Parsed a floating point number: {value}");
}

The streams field of Recipient will itself contain one field for each field of the service’s stream pack, and the names those fields will match the field names in the stream pack.

Each field in Recipient::streams will be a receiver that can be used to receive values streamed out by the requested service, either in a sync or async context.

In the snippet above, .recv() will provide a “Future” that can efficiently wait for the next value from the stream receiver that it’s called on. Calling .await on that Future will let the async task rest until the future is available. Calling .recv().await in a while-loop expression ensures that we keep draining the streams until all messages have been received.

The Recipient also has a response field which the above snippet uses to detect that the service is finished running. In general Recipient::response will provide the final output message of the service, but for parsing_service that is just a unit trigger ().

You are not required to await the response or the streams in any particular order. You could await or poll the streams while the request is still being processed in order to receive live updates from long-running requests. There are many tools in the Rust ecosystem that allow for sophisticated async programming patterns, such as tokio’s select macro that allows you to await on multiple streams at once and immediately receive the next message that emerges from any one of them:

use tokio::select;

let next_u32 = capture.streams.parsed_as_u32.recv();
let next_i32 = capture.streams.parsed_as_i32.recv();
let next_f32 = capture.streams.parsed_as_f32.recv();
select! {
    recv = next_u32 => {
        if let Some(value) = recv {
            println!("Received an unsigned integer: {value}");
        }
    }
    recv = next_i32 => {
        if let Some(value) = recv {
            println!("Received a signed integer: {value}");
        }
    }
    recv = next_f32 => {
        if let Some(value) = recv {
            println!("Received a floating point number: {value}");
        }
    }
}

Collecting Streams

If you are not interested in managing stream channels and just want to store the stream outputs for later use, you can use Series::collect_streams to store all the stream outputs in an entity of your choosing:

// Spawn an entity to be used to store the values coming out of the streams.
let storage = commands.spawn(()).id();

// Request the service, but set an entity to collect the streams before we
// take the response.
let outcome = commands
    .request(String::from("-5"), parsing_service)
    .collect_streams(storage)
    .outcome();

// Save the entity in a resource to keep track of it.
// You could also save this inside a component or move it
// into a callback, or anything else that suits your needs.
commands.insert_resource(StreamStorage(storage));

Then you can schedule a system to query the Collection component of that entity to inspect the streams. When using a stream pack with named streams make sure to use NamedValue<T> for the inner type of the Collection:

use crossflow::Collection;

#[derive(Resource, Deref)]
struct StreamStorage(Entity);

fn print_streams(
    storage: Res<StreamStorage>,
    mut query_u32: Query<&mut Collection<NamedValue<u32>>>,
    mut query_i32: Query<&mut Collection<NamedValue<i32>>>,
    mut query_f32: Query<&mut Collection<NamedValue<f32>>>,
) {
    if let Ok(mut collection_u32) = query_u32.get_mut(**storage) {
        for item in collection_u32.items.drain(..) {
            println!(
                "Received {} from a stream named {} in session {}",
                item.data.value,
                item.data.name,
                item.session,
            );
        }
    }

    if let Ok(mut collection_i32) = query_i32.get_mut(**storage) {
        for item in collection_i32.items.drain(..) {
            println!(
                "Received {} from a stream named {} in session {}",
                item.data.value,
                item.data.name,
                item.session,
            );
        }
    }

    if let Ok(mut collection_f32) = query_f32.get_mut(**storage) {
        for item in collection_f32.items.drain(..) {
            println!(
                "Received {} from a stream named {} in session {}",
                item.data.value,
                item.data.name,
                item.session,
            );
        }
    }
}