A pattern that’s quite useful in Rust when doing asynchronous programming with a resource which should only be accessed by one thread is to use Rust channels and enums for communication with that dedicated thread.
There are two instances where I have used this pattern in Eclipse uProtocol:
up-streamer
crate, detailed in this post.up-transport-vsomeip-rust
crate.Let’s discuss the general principle and then I’ll show the specifics of the up-transport-vsomeip-rust
crate in a follow-up post.
Let’s say that we have some resource which we either must or want to have accessible. It could be that it’s a struct or trait object which is not thread safe, for example.
In that case, we can leverage Rust’s robust ecosystem around channels and Rust’s enum capabilities to create rather robust control mechanisms around that resource.
Imagine we are commanding a grasping robot arm and wish to have only a single thread have control over the actuation for safety purposes.
First we can define one or more enums, nesting them to describe what the commands are and what information they must command.
We might come up with enums which look like the following.
enum Finger {
Thumb,
Pointer,
Middle,
Index,
Pinky
}
enum HandCommand {
FlexWrist(f64), // number of radians to flex wrist
TapFinger(Finger),
// ... more ...
}
enum ArmCommand {
RotateBase(f32), // radians to swivel base of arm
// ... more ...
}
enum ActuationCommand {
Hand(HandCommand),
Arm(ArmCommand)
}
Let’s imagine we have an Actuator
which can perform the commands.
struct Actuator {
// ... snip ...
}
impl Actuator {
fn flex_wrist(flex: f64) { /* snip */ }
fn tap_finger(to_tap: Finger) { /* snip */ }
fn rotate_base(swivel: f32) { /* snip */ }
}
The previous post gave an example using async-std
. The real-world example I’ll give later will use tokio
, so let’s use tokio
for this general principle section. Here we’re going to spawn a thread with a dedicated tokio
Runtime
to process commands received.
I’ll add some comments starting with PELE
to explain what’s going on.
// PELE: We take a Receiver which is the receiving end of a Rust channel
// There are many channel libraries which could serve this purpose, e.g.
// tokio::sync::mpsc::Receiver
fn actuation_command_loop(rx_cmd: Receiver<ActuationCommand>) {
// PELE: Spawn a dedicated thread onto which to run the command loop
thread.spawn(move || {
// PELE: Make a tokio Runtime which is dedicated to the spawned thread
let runtime = Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to create tokio runtime");
// PELE: Perform a blocking operation on this thread-dedicated Runtime
runtime.block_on(async move {
// PELE: ... and create our thread-local Actuator
let actuator = Actuator::new(/* ... */);
// PELE: ... to then go into the infinite loop processing commands
// using the Actuator
while let Some(cmd) = rx_cmd.recv().await {
// PELE: We handle the commands by then dispatching
// into the Actuator functions
match cmd {
ActuationCommand::Hand(hand_cmd) => {
match hand_cmd {
HandCommand::FlexWrist(flex) => {
actuator.flex_wrist(flex);
}
HandCommand::TapFinger(finger) => {
actuator.tap_finger(finger);
}
}
}
ActuationCommand::Arm(arm_cmd) => {
match arm_cmd {
ArmCommand::RotateBase(swivel) => {
actuator.rotate_base(swivel);
}
}
}
}
}
});
});
}
We need to create a channel Sender
and Receiver
through which to communicate commands before starting the command loop. Feel free to use whichever channel library you would like, there are many choices. Here I’m using tokio::sync::mpsc::channel
.
let (rx, tx) = channel(10000);
// PELE: Transfer ownership of the Receiver into the command loop
actuation_command_loop(rx);
We can now send a command using the tx
Sender
.
tx.send(ActuationCommand::Hand(HandCommand::TapFinger(Finger::Pointer))).await;
The command loop will process the commands in the order they are sent. In fact, we can clone the Sender
and pass it off to multiple different threads, since I imagine that we’ll have different parts of the system which need to send ActuationCommands
.
Now this will work so long as we only need to fire off commands to the command loop. However, most of the time we’ll want to receive back something such as a status or some retrieved value.
Let’s flesh the example out a bit further by considering that by handing over a one-shot use channel within the enum members they can then use to communicate back. Here again, there are different options to choose from. I’ll use a tokio::sync::oneshot::channel
.
struct Status {
InvalidCommandState,
// ...
}
type Pressure = f64;
enum Finger {
Thumb,
Pointer,
Middle,
Index,
Pinky
}
enum HandCommand {
FlexWrist(f64, oneshot::Receiver<Result<(), Status>>), // number of radians to flex wrist
TapFinger(Finger, oneshot::Receiver<Result<Pressure, Status>>),
// ... more ...
}
enum ArmCommand {
RotateBase(f32, oneshot::Receiver<Result<(), Status>>), // radians to swivel base of arm
// ... more ...
}
enum ActuationCommand {
Hand(HandCommand),
Arm(ArmCommand)
}
The sending of the command itself doesn’t change much. What does change is the semantics we want to have around waiting on receiving the result back from the command loop. What to do here can depend on the specifics of your application. One option is to await for a certain time and then fail if timed out.
const MAX_WAIT_DUR_MS: u64 = 30;
let (res_rx, res_tx) = oneshot::channel();
tx.send(ActuationCommand::Hand(HandCommand::TapFinger(Finger::Pointer, res_tx))).await;
match timeout(Duration::from_millis(MAX_WAIT_DUR_MS), res_rx) {
Ok(Ok(pressure)) => {/* do something with returned pressure */}
Ok(Err(e)) => {/* do something like log error we failed to receive status back */}
Err(e) => {/* here we know the deadline exceeded */}
}
You will likely want another command which, when received, will notify the command loop to gracefully close down. Doing so is left as an exercise to the reader!
I’ve used this pattern two times when trying to protect a resource which was not thread-safe and it worked well in both cases. Sometimes when interacting with those resources it’s possible to make the resource thread-safe. Sometimes it’s not, like when binding to and using another library through FFI. Nice to have another tool in the tool belt.