Delivery Instructions
A common problem with highly parallelized async systems is when multiple instances of one service are active at the same time, but the service is designed for exclusive usage. This can happen when a service interacts with global resources like a mouse cursor or physical assets like the joints of a robot. If multiple conflicting requests are being made for the same set of resources, there could be deadlocks, misbehaviors, or even critical failures.
Resource contention is a broad and challenging problem with no silver bullet solution, but crossflow offers a delivery instructions mechanism that can help to handle simple cases. With the help of delivery instructions, you can implement your service to not worry about exclusive access, and then wrap the service in instructions that tell crossflow to only execute the service once-at-a-time.
More advanced use of delivery instructions allow you to have specific sets of requests for a service run in serial while others run in parallel at the same time. You can have specific queues where requests in the same queue run in serial while the queues themselves run in parallel.
Tip
Delivery instructions are only relevant for async and continuous services. They have no effect on blocking services since blocking services always run in serial no matter what.
Always-Serial Services
Warning
Always-serial services are also serial across different workflows and different sessions of the same workflow. When a service gets used in multiple different nodes, those nodes will only execute once-at-a-time, even if the nodes belong to different workflows or are being triggered across different sessions.
You can choose for an async or continuous service to always be run in serial, meaning when multiple requests are sent to it, they will always be run once-at-a-time. This does not block any other services from running at the same time.
To make any service always run serially, just add .serial() while spawning it:
let service = commands.spawn_service(
my_service
.serial()
);
Note
Apply
.serial()to the name of the service itself, inside the parenthesis of.spawn_service(_). You cannot chain it outside the parentheses of.spawn_service(_).
Delivery Label
Always-serial is often too restrictive. When a service is used across different workflows or in multiple nodes at the same time, the requests being sent to it might be accessing unrelated resources. In this case, you can assign delivery instructions to the service before making the request.
The first step is to create a delivery label:
#[derive(Debug, Clone, PartialEq, Eq, Hash, DeliveryLabel)]
struct MyDeliveryLabel {
set: String,
}
A delivery label can be any struct that implements Debug, Clone, PartialEq, Eq, Hash, and DeliveryLabel.
The #[derive(DeliveryLabel)] will fail at compile time if any of the required traits are missing, so the compiler will help ensure that your struct has all the traits it needs.
Other than those traits, there is no restriction on what can be used as a delivery label.
To use a delivery label, spawn a service as normal, but apply instructions with .instruct(_) when passing it into a request:
// Spawn the service as normal
let service = commands.spawn_service(my_service);
// Create delivery instructions
let instructions = DeliveryInstructions::new(
MyDeliveryLabel {
set: String::from("my_set")
}
);
// Add the instructions while requesting
let outcome = commands.request(
String::from("hello"),
service.instruct(instructions),
).outcome();
To create the DeliveryInstructions, you must pass in an instance of your delivery label.
Every request made with a matching delivery label value will be put into the same queue and run in serial with each other. Requests made with different delivery labels will be put in separate queues which can run in parallel.
Pre-empt
Sometimes waiting for a service to execute once-at-a-time isn’t really what you want. When multiple requests are contending for the same resource, sometimes what you really want is to discard the earlier requests so the newest one can take over.
We call this pre-emption, and provide it as an option for DeliveryInstructions:
// Create a label
let label = MyDeliveryLabel {
set: String::from("my_set")
};
// Make instructions that include preempting
let preempt_instructions = DeliveryInstructions::new(label).preempt();
let outcome = commands.request(
String::from("hello"),
service.instruct(preempt_instructions)
).outcome();
When .preempt() is applied to DeliveryInstructions, that request will discard all requests that were queued up before it.
Even a live task that is in the process of being executed will be cancelled when a pre-empting request arrives.
Note
Earlier we said that delivery instructions don’t have an effect on blocking services, but this is not true in the case of pre-emption. If multiple requests with the same delivery label have gotten queued up for a blocking service before it’s had a chance to flush, a pre-empting request will clear out the earlier requests before they can ever execute.
Warning
When a service being used by a workflow gets pre-empted, that will result in a disposal.
Ensure
Sometimes pre-emption may be too much of a scorched-earth approach.
If you have some requests that should be pre-empted while others are too crucial to pre-empt, you can apply .ensure():
let instructions = DeliveryInstructions::new(label)
.preempt()
.ensure();
let outcome = commands.request(
String::from("hello"),
service.instruct(instructions)
).outcome();
Tip
.preempt()and.ensure()can be used independently of each other per request. A request can be pre-emptive but not ensured, or ensured but not pre-emptive, or it can be both pre-emptive and ensured.
Full Example
use async_std::future::{pending, timeout};
use crossflow::bevy_app::App;
use crossflow::prelude::*;
#[derive(Debug, Clone, PartialEq, Eq, Hash, DeliveryLabel)]
struct MyDeliveryLabel {
set: String,
}
impl MyDeliveryLabel {
fn new(set: String) -> Self {
Self { set }
}
}
fn main() {
let mut app = App::new();
app.add_plugins(CrossflowExecutorApp::default());
let waiting_time = std::time::Duration::from_secs(2);
let waiting_service =
app.world_mut()
.spawn_service(move |In(input): AsyncServiceInput<String>| async move {
// Create a future that will never finish
let never = pending::<()>();
// Wait on the never-ending future until a timeout finishes.
// This creates an artifical delay for the async service.
let _ = timeout(waiting_time, never).await;
println!("{}", input.request);
});
// We will fire off 10 requests at once for three different sets where each
// set has delivery instructions, making them have serial (one-at-a-time)
// delivery within each set. Since the sets themselves have independent labels,
// the requests in each set can be processed in parallel.
//
// The service itself does nothing but waits two seconds before printing its input message.
for set in ["A", "B", "C"] {
let instructions = DeliveryInstructions::new(MyDeliveryLabel::new(set.to_string()));
for i in 1..=10 {
let message = format!("Message #{i} for set {set}");
let service = waiting_service.instruct(instructions.clone());
app.world_mut().command(|commands| {
commands.request(message, service).detach();
});
}
}
app.run();
}
Tip
To set delivery instructions for the services used in nodes, simply apply
.instruct(_)to the service before passing it tobuilder.create_node(_)