Receiving from Output Streams
Some services also have output streams that you may want
to receive data from. In that case you will need to take a [Capture][Capture]
instead of only taking the [Outcome]:
let mut capture = commands.request(String::from("-3.14"), parsing_service).capture();
The parsing_service provides this ParsedStreams stream output pack:
#[derive(StreamPack)]
struct ParsedStreams {
/// Values that were successfully parsed to u32
parsed_as_u32: u32,
/// Values that were successfully parsed to i32
parsed_as_i32: i32,
/// Values that were successfully parsed to f32
parsed_as_f32: f32,
}
The string value "3.14" should be able to parse to f32 but not u32 or i32.
We can receive whichever values were produced with the following code:
let _ = capture.outcome.await;
println!("The service has finished running.");
while let Some(value) = capture.streams.parsed_as_u32.recv().await {
println!("Parsed an unsigned integer: {value}");
}
while let Some(value) = capture.streams.parsed_as_i32.recv().await {
println!("Parsed a signed integer: {value}");
}
while let Some(value) = capture.streams.parsed_as_f32.recv().await {
println!("Parsed a floating point number: {value}");
}
The streams field of Recipient will itself contain one field for
each field of the service’s stream pack, and the names those fields will match
the field names in the stream pack.
Each field in Recipient::streams will be a
receiver
that can be used to receive values streamed out by the requested service, either in a
sync
or async
context.
In the snippet above, .recv() will provide a “Future” that can efficiently
wait for the next value from the stream receiver that it’s called on. Calling
.await on that Future will let the async task rest until the future is available.
Calling .recv().await in a while-loop expression ensures that we keep draining
the streams until all messages have been received.
The Recipient also has a response field which the above snippet uses to
detect that the service is finished running. In general Recipient::response
will provide the final output message of the service, but for parsing_service
that is just a unit trigger ().
You are not required to await the response or the streams in any particular order. You could await or poll the streams while the request is still being processed in order to receive live updates from long-running requests. There are many tools in the Rust ecosystem that allow for sophisticated async programming patterns, such as tokio’s select macro that allows you to await on multiple streams at once and immediately receive the next message that emerges from any one of them:
use tokio::select;
let next_u32 = capture.streams.parsed_as_u32.recv();
let next_i32 = capture.streams.parsed_as_i32.recv();
let next_f32 = capture.streams.parsed_as_f32.recv();
select! {
recv = next_u32 => {
if let Some(value) = recv {
println!("Received an unsigned integer: {value}");
}
}
recv = next_i32 => {
if let Some(value) = recv {
println!("Received a signed integer: {value}");
}
}
recv = next_f32 => {
if let Some(value) = recv {
println!("Received a floating point number: {value}");
}
}
}
Collecting Streams
If you are not interested in managing stream channels and just want to store the
stream outputs for later use, you can use
Series::collect_streams
to store all the stream outputs in an entity of your choosing:
// Spawn an entity to be used to store the values coming out of the streams.
let storage = commands.spawn(()).id();
// Request the service, but set an entity to collect the streams before we
// take the response.
let outcome = commands
.request(String::from("-5"), parsing_service)
.collect_streams(storage)
.outcome();
// Save the entity in a resource to keep track of it.
// You could also save this inside a component or move it
// into a callback, or anything else that suits your needs.
commands.insert_resource(StreamStorage(storage));
Then you can schedule a system to query the Collection
component of that entity to inspect the streams. When using a stream pack with
named streams make sure to use NamedValue<T>
for the inner type of the Collection:
use crossflow::Collection;
#[derive(Resource, Deref)]
struct StreamStorage(Entity);
fn print_streams(
storage: Res<StreamStorage>,
mut query_u32: Query<&mut Collection<NamedValue<u32>>>,
mut query_i32: Query<&mut Collection<NamedValue<i32>>>,
mut query_f32: Query<&mut Collection<NamedValue<f32>>>,
) {
if let Ok(mut collection_u32) = query_u32.get_mut(**storage) {
for item in collection_u32.items.drain(..) {
println!(
"Received {} from a stream named {} in session {}",
item.data.value,
item.data.name,
item.session,
);
}
}
if let Ok(mut collection_i32) = query_i32.get_mut(**storage) {
for item in collection_i32.items.drain(..) {
println!(
"Received {} from a stream named {} in session {}",
item.data.value,
item.data.name,
item.session,
);
}
}
if let Ok(mut collection_f32) = query_f32.get_mut(**storage) {
for item in collection_f32.items.drain(..) {
println!(
"Received {} from a stream named {} in session {}",
item.data.value,
item.data.name,
item.session,
);
}
}
}