Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Zenoh

Support for zenoh is implemented using

Use the zenoh feature flag of crossflow to enable this support:

# Cargo.toml

[dependencies]
crossflow = { version = "*", features = ["diagram", "zenoh"] }

Enabling zenoh

Use registry.enable_zenoh(_) to register premade node builders that can build zenoh subscribers, publishers, and queriers:

use crossflow::prelude::*;

let mut registry = DiagramElementRegistry::new();
registry.enable_zenoh(Default::default());

enable_zenoh(_) takes in a zenoh config which you can feel free to customize.

Descriptor Pool

Just like for gRPC support, you will need to add your proto definitions to the global descriptor pool if you want to use protos as your payloads. If the needed proto definitions are missing from the descriptor pool, a NodeBuildingError will be produced.

JSON payloads do not require any additional steps.

Configuration

Subscribers, publishers, and queriers each have separate node builders with different configuration types tailored to the specific information needed by each:

The mandatory fields include:

key — the key expression for the connection. Used by all three.

encoder — either "json" or { "protobuf": "_" } to indicate how to encode outgoing messages. Used by publishers and queriers.

decoder — either "json" or { "protobuf": "_" } to indicate how to decode incoming messages. Used by subscriptions and queriers.

Besides the mandatory fields, each config struct provides comprehensive coverage of the quality of service and other settings for each type of zenoh connection. Refer to the zenoh documentation for in-depth descriptions of the qualities of service.

Subscription

Subscriptions, provided by the "zenoh_subscription" node builder, allow your workflow to receive messages from anonymous publishers that publish to a key that’s compatible with the key expression of your subscription. Once the node gets activated, it will stream out any incoming messages until it gets cancelled or until the scope it’s in finishes.

Request — A simple trigger that just prompts the node to begin listening for messages and streaming them out. If the node is already active, triggering it again will have no effect. If the node was previously active but cancelled, then triggering the node again will restart it.

“out” — Any messages received by the subscription will be sent out of the "out" stream.

“out_error” — If an error happens while decoding an incoming message, the error message will be streamed from "out_error". The node will continue running as normal even if these errors occur, but each of these messages indicates an incoming message that failed to be decoded.

“canceller” — In case you want to cancel the subscription, you can capture this UnboundedSender, either storing it in a buffer or passing it to a node that can make a decision about when to cancel. Triggering this will cause the final response of the node to be Ok(msg) where msg is whatever JsonMessage you pass into this sender. Since subscriptions can last indefinitely, this is the only way to stop the node from running before the scope terminates.

Response — The final response of the node is a Result whose value is Ok if the subscription was cancelled using the "canceller", or Err if a ZenohSubscriptionError occurred.

zenoh_subscription-node

Publisher

Publishers, provided by the "zenoh_publisher" node builder, allow your workflow to send messages to anonymous subscribers whose key expressions are compatible with the "key" that you configure for your node.

The zenoh publisher will be initialized and connect when the workflow is first built, and then every message sent to this node from any workflow session will be sent out over that same publisher. Note that this has some potential side-effects. If you configure the node to support late joiners then the late joiners will receive old messages even if the workflow session that sent those messages is no longer active.

Request — Pass in a JsonMessage to publish. If your node is configured to use protobuf encoding, the node will return an error message if the input JsonMessage failed to serialize into the intended protobuf message.

Response — A Result which will be Ok if publishing the message was successful, or a ZenohPublisherError if a problem occurred.

zenoh_publisher-node

Querier

Queriers, provided by the "zenoh_querier" node builder, allow your workflow to send off a request message to a queryable, which is similar to a service. The queryable will respond to the request message with some number of responses and then end the connection.

The layout of a querier node is the same as "zenoh_subscription" except

  • The Request is a JsonMessage. If a protobuf encoder was chosen, any failure to serialize the message into the intended protobuf type will have this node respond with an Err.
  • The response will return Ok(null) when the query is finished. It will also return Ok(msg) if some msg is sent to the UnboundedSender provided by "canceller". In the event of an error, it will return an Err containing ZenohQuerierError.

zenoh_querier-node