Continuous Services
Most of the time a service can be written as a blocking service or an async service depending on whether the service is known to be short-lived or long-lived and whether it needs to use the async feature of Rust. But there’s one more shape a service can take on: continuous.
One thing that blocking and async services share in common is that they are each defined by a function that takes in a request as an argument and eventually yields a response. On the other hand a continuous service is defined by a Bevy system that runs continuously in the system schedule. Each time the continuous service is woken up in the schedule, it can make some incremental progress on the set of active requests assigned to it—which we call the order queue.
What is a Continuous Service?
Each service—whether blocking, async, or continuous—is associated with a unique Entity that we refer to as the service provider. For blocking and async services the main purpose of the service provider is to
- store the system that implements the service, and
- allow users to configure the service by inserting components on the service provider entity.
For continuous services the service provider has one additional purpose: to store
the order queue component. This component is used to keep track of its active
requests—i.e. the requests aimed at the continuous service which have not received
a response yet. When a request for the continuous service shows up, flush_execution
will send that request into the order queue component of the service that it’s
meant for.
Each continuous service will be run in the regular Bevy system schedule just like every regular Bevy system in the application. This means continuous services can run in parallel to each other, as well as run in parallel to all other regular Bevy systems, as long as there are no read/write conflicts in what the systems need to access.
Each time a continuous service is woken up by the schedule, it should check its order queue component and look through any active requests it has. It should try to make progress on those tasks according to whatever its intended behavior is—perhaps try to complete one at a time or try to complete all of them at once. There is no requirement to complete any number of the orders on any given wakeup. There is no time limit or iteration limit for completing any orders. Any orders that do not receive a response will simply carry over to the next schedule update.
Once an order is complete, the continuous service can issue a response to it. After issuing a response, the request of the order will immediately become inaccessible, even within the current update cycle. Responding to an order will fully drop the order from the perspective of the continuous service. This prevents confusion that might otherwise lead to accidentally sending two responses for the same order. Nevertheless the continuous service can respond to any number of unique orders within a single update cycle.
Spawn a Continuous Service
Spawning a continuous service has some crucial differences from spawning a blocking or an async service. These differences come from two facts:
- The request information (order queue) persists between service updates as a component
- The service needs to be added to the app’s system schedule
This first difference means that continuous services must query for their order queue.
In the hello_continuous_service example below you can see the query argument
takes in a ContinuousQuery whose generic parameters perfectly match the generic
parameters of ContinuousServiceInput above it. This pattern is mandatory for
continuous services so they can access their order queue.
fn hello_continuous_service(
In(srv): ContinuousServiceInput<String, String>,
mut query: ContinuousQuery<String, String>,
) {
let Some(mut orders) = query.get_mut(&srv.key) else {
// The service provider has despawned, so we can no longer do anything
return;
};
orders.for_each(|order| {
let name = order.request();
let response = format!("Hello, {name}!");
order.respond(response);
});
}
Unlike other service types, the srv input arguments contains nothing but a key
that must be used by query to access the orders belonging to this service. This
access is fallible because it’s possible for a user or a system to despawn the
provider of this service while the service is still inside the system schedule.
In most cases when the order queue can no longer be accessed, your continuous
service can just return immediately. Any requests sent to it will be automatically
cancelled.
When you do get access to your orders—in this case through the orders variable—you can iterate
through each order, using .request() to borrow the request value and then
.respond(_) to send your response for the order.
Inside the System Schedule
The other important difference with continuous services is that they need to be
added to the app’s system schedule. While blocking and async services
can be spawned anywhere that you have access to a Commands, continuous services
can only be spawned when you have access to an App:
let continuous_service: Service<String, String> = app.spawn_continuous_service(
Update,
hello_continuous_service
);
Instead of spawn_service which gets used by blocking and async services,
continuous services require you to call spawn_continuous_service. You’ll also
notice that an additional argument is needed here: the label for which schedule
the continuous service should run in.
Note
Currently Bevy does not support dynamically adding systems to the schedule, so continuous services will generally need to be added during App startup.
Despite these differences in how continuous services get spawned, you will still
ultimately receive a Service<Request, Response>, exactly the same as blocking
and async services. From the outside, continuous services appear no different
from other kinds of service; the differences in how it behaves are just
implementation details.
Serial Orders
In the hello_continuous_service example above
we respond to each request on the same update frame that the request arrives.
Very often continuous services are managing a process that needs to be spread out
across multiple frames. The example below shows an example of responding to a
sequence of orders, one at a time, where each order might need multiple frames
before it’s complete.
#[derive(Resource, Deref, DerefMut)]
struct Position(Vec2);
#[derive(Component, Deref)]
struct Speed(f32);
fn move_base_vehicle_to_target(
In(srv): ContinuousServiceInput<Vec2, Result<(), ()>>,
mut query: ContinuousQuery<Vec2, Result<(), ()>>,
speeds: Query<&Speed>,
mut base_position: ResMut<Position>,
time: Res<Time>,
) {
let Some(mut orders) = query.get_mut(&srv.key) else {
// The service provider has been despawned, so this continuous service
// can no longer function.
return;
};
// Get the oldest active request for this service. Orders will be indexed
// from 0 to N-1 from oldest to newest. When an order is completed, all
// orders after it will shift down by an index on the next update cycle.
let Some(order) = orders.get_mut(0) else {
// There are no active requests, so no need to do anything
return;
};
let dt = time.delta_secs_f64() as f32;
let Ok(speed) = speeds.get(srv.key.provider()) else {
// The velocity setting has been taken from the service, so it can no
// longer complete requests.
order.respond(Err(()));
return;
};
let target = *order.request();
match move_to(**base_position, target, **speed, dt) {
Ok(_) => {
// The vehicle arrived
**base_position = target;
println!("Base vehicle arrived at {target}");
order.respond(Ok(()));
}
Err(new_position) => {
// The vehicle made progress but did not arrive
**base_position = new_position;
}
}
}
On each update of the continuous service we only look at the oldest request in the order queue, ignoring all others. This effectively makes the service “serial” meaning no matter how many requests are coming in to it, they will only be handled one at a time.
Parallel Orders
One of the advantages that continuous services have over blocking and async is
the ability to handle many requests at once, potentially with interactions
happening between them. The example below shows how orders.for_each(_) can be
used to iterate over all active requests in one update frame, making incremental
progress on all of them at once.
#[derive(Clone, Copy)]
struct DroneRequest {
target: Vec2,
speed: f32,
}
fn send_drone_to_target(
In(srv): ContinuousServiceInput<DroneRequest, ()>,
mut query: ContinuousQuery<DroneRequest, ()>,
mut drone_positions: Local<HashMap<Entity, Vec2>>,
base_position: Res<Position>,
time: Res<Time>,
) {
let Some(mut orders) = query.get_mut(&srv.key) else {
return;
};
orders.for_each(|order| {
let DroneRequest { target, speed } = *order.request();
let position = drone_positions.entry(order.id()).or_insert_with(|| {
println!(
"Drone {} taking off from {}, heading to {target}",
order.id().index(),
**base_position,
);
**base_position
});
let dt = time.delta_secs_f64() as f32;
match move_to(*position, target, speed, dt) {
Ok(_) => {
println!("Drone {} arrived at {target}", order.id().index());
order.respond(());
}
Err(new_position) => {
*position = new_position;
}
}
});
// Remove any old task IDs that are no longer in use
drone_positions.retain(|id, _| orders.iter().any(|order| order.id() == *id));
}
Full Example
Here is an example that incorporates everything above and also demonstrates how
continuous systems can be configured in the schedule just like regular Bevy systems.
Notice the use of .configure(_) when spawning move_base and send_drone.
This allows us to make sure the base vehicle is up to date when the drone service
is updated.
use crossflow::bevy_app::{App, Update};
use crossflow::prelude::*;
use bevy_derive::*;
use bevy_ecs::prelude::*;
use bevy_time::{Time, TimePlugin};
use glam::Vec2;
use std::collections::HashMap;
#[derive(Debug, Hash, PartialEq, Eq, Clone, SystemSet)]
struct MoveBaseVehicle;
fn main() {
let mut app = App::new();
app.add_plugins((CrossflowExecutorApp::default(), TimePlugin::default()))
.insert_resource(Position(Vec2::ZERO));
let move_base = app.spawn_continuous_service(
Update,
move_base_vehicle_to_target
.with(|mut srv: EntityWorldMut| {
// Set the speed component for this service provider
srv.insert(Speed(1.0));
})
.configure(|config| {
// Put this service into a system set so that we can order other
// services before or after it.
config.in_set(MoveBaseVehicle)
}),
);
let send_drone = app.spawn_continuous_service(
Update,
send_drone_to_target.configure(|config| {
// This service depends on side-effects from move_base, so we should
// always schedule it afterwards.
config.after(MoveBaseVehicle)
}),
);
let move_vehicle_to_random_position = move |app: &mut App| {
app.world_mut()
.command(|commands| commands.request(random_vec2(20.0), move_base).outcome())
};
let launch_drone_to_random_position = move |app: &mut App| {
app.world_mut().command(|commands| {
let request = DroneRequest {
target: random_vec2(20.0),
speed: 1.0,
};
commands.request(request, send_drone).detach();
});
};
let mut base_moving = move_vehicle_to_random_position(&mut app);
let mut last_launch = std::time::Instant::now();
loop {
app.update();
if base_moving.is_available() {
// Send the base to a new location
base_moving = move_vehicle_to_random_position(&mut app);
}
if last_launch.elapsed() > std::time::Duration::from_secs(1) {
launch_drone_to_random_position(&mut app);
last_launch = std::time::Instant::now();
}
}
}
fn random_vec2(width: f32) -> Vec2 {
width * Vec2::new(rand::random::<f32>(), rand::random::<f32>())
}
#[derive(Resource, Deref, DerefMut)]
struct Position(Vec2);
#[derive(Component, Deref)]
struct Speed(f32);
fn move_base_vehicle_to_target(
In(srv): ContinuousServiceInput<Vec2, Result<(), ()>>,
mut query: ContinuousQuery<Vec2, Result<(), ()>>,
speeds: Query<&Speed>,
mut base_position: ResMut<Position>,
time: Res<Time>,
) {
let Some(mut orders) = query.get_mut(&srv.key) else {
// The service provider has been despawned, so this continuous service
// can no longer function.
return;
};
// Get the oldest active request for this service. Orders will be indexed
// from 0 to N-1 from oldest to newest. When an order is completed, all
// orders after it will shift down by an index on the next update cycle.
let Some(order) = orders.get_mut(0) else {
// There are no active requests, so no need to do anything
return;
};
let dt = time.delta_secs_f64() as f32;
let Ok(speed) = speeds.get(srv.key.provider()) else {
// The velocity setting has been taken from the service, so it can no
// longer complete requests.
order.respond(Err(()));
return;
};
let target = *order.request();
match move_to(**base_position, target, **speed, dt) {
Ok(_) => {
// The vehicle arrived
**base_position = target;
println!("Base vehicle arrived at {target}");
order.respond(Ok(()));
}
Err(new_position) => {
// The vehicle made progress but did not arrive
**base_position = new_position;
}
}
}
#[derive(Clone, Copy)]
struct DroneRequest {
target: Vec2,
speed: f32,
}
fn send_drone_to_target(
In(srv): ContinuousServiceInput<DroneRequest, ()>,
mut query: ContinuousQuery<DroneRequest, ()>,
mut drone_positions: Local<HashMap<Entity, Vec2>>,
base_position: Res<Position>,
time: Res<Time>,
) {
let Some(mut orders) = query.get_mut(&srv.key) else {
return;
};
orders.for_each(|order| {
let DroneRequest { target, speed } = *order.request();
let position = drone_positions.entry(order.id()).or_insert_with(|| {
println!(
"Drone {} taking off from {}, heading to {target}",
order.id().index(),
**base_position,
);
**base_position
});
let dt = time.delta_secs_f64() as f32;
match move_to(*position, target, speed, dt) {
Ok(_) => {
println!("Drone {} arrived at {target}", order.id().index());
order.respond(());
}
Err(new_position) => {
*position = new_position;
}
}
});
// Remove any old task IDs that are no longer in use
drone_positions.retain(|id, _| orders.iter().any(|order| order.id() == *id));
}
fn move_to(current: Vec2, target: Vec2, speed: f32, dt: f32) -> Result<(), Vec2> {
let dx = f32::max(0.0, speed * dt);
let dp = target - current;
let distance = dp.length();
if distance <= dx {
return Ok(());
}
let Some(u) = dp.try_normalize() else {
// If dp can't be normalized then it's close to zero, so there's
// nowhere to go.
return Ok(());
};
return Err(current + u * dx);
}