Senders
The one communication-related thing we haven’t touched, yet, is how to do request-response style communication among Actors. The “ask”-pattern gave us request-reponse between an Actor and some arbitrary (non-pool) thread, ports basically give us some form request-response between request and indication events (with some broadcasting semantic caveats, of course). But for Actor to Actor communication, we have not seen anything of this sort, yet. In fact, you may have noticed that receive_local(...)
does not actually give us any sender information, such as an ActorRef
. Neither is this available via the component context as would be the case in Akka.
In Kompact, for local messages at least, sender information must be passed explicitly. This is for two reasons:
- It avoids creating an
ActorRef
for every message when it’s not needed, since actor references are not trivially cheap to create. - It allows the sender reference to be typed with the appropriate message type.
This design gives us basically two variants to do request-reponse. If we know we are always going to respond to the same component instance, the most efficient thing to do is to get a reference to it once and then just keep it around as part of our internal state. This avoids constantly creating actor references, and is pretty efficient. If, however, we must respond to multiple different actors, which is often the case, we must make the sender reference part of the request message. We can do that either by adding a field to our custom message type, or simply wrapping our custom message type into the Kompact provided WithSender
struct. WithSender
is really the same idea as Ask
, replacing the KPromise<Response>
with an ActorRef<Response>
(yes, there is also WithSenderStrong
using an ActorRefStrong
instead).
Workers with Senders
To illustrate this mechanism we are going to rewrite the Workers example from the previous sections to use WithSender
instead of the WorkerPort
communication. We will use WithSender
here, instead of a stored manager actor reference, to illustrate the point, but it should be clear that the latter will be more efficient as we always reply to the manager.
First we remove all mentions of WorkerPort
, of course. Then we change the worker’s Message
type to WithSender<WorkPart, ManagerMessage>
. Why ManagerMessage
and not WorkResult
? Well, since all communication with the manager now happens via messages, we need to differentiate between messages from the main-thread, which are of type Ask<Work, WorkResult>
and messages from the worker, which are of type WorkResult
. Since we can only have a single Message
type, ManagerMessage
is simply an enum of both options.
#![allow(clippy::unused_unit)]
use kompact::prelude::*;
use std::{env, fmt, ops::Range, sync::Arc};
struct Work {
data: Arc<[u64]>,
merger: fn(u64, &u64) -> u64,
neutral: u64,
}
impl Work {
fn with(data: Vec<u64>, merger: fn(u64, &u64) -> u64, neutral: u64) -> Self {
let moved_data: Arc<[u64]> = data.into_boxed_slice().into();
Work {
data: moved_data,
merger,
neutral,
}
}
}
impl fmt::Debug for Work {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Work{{
data=<data of length={}>,
merger=<function>,
neutral={}
}}",
self.data.len(),
self.neutral
)
}
}
struct WorkPart {
data: Arc<[u64]>,
range: Range<usize>,
merger: fn(u64, &u64) -> u64,
neutral: u64,
}
impl WorkPart {
fn from(work: &Work, range: Range<usize>) -> Self {
WorkPart {
data: work.data.clone(),
range,
merger: work.merger,
neutral: work.neutral,
}
}
}
impl fmt::Debug for WorkPart {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"WorkPart{{
data=<data of length={}>,
range={:?},
merger=<function>,
neutral={}
}}",
self.data.len(),
self.range,
self.neutral
)
}
}
#[derive(Debug)]
struct WorkResult(u64);
#[derive(Debug)]
enum ManagerMessage {
Work(Ask<Work, WorkResult>),
Result(WorkResult),
}
#[derive(ComponentDefinition)]
struct Manager {
ctx: ComponentContext<Self>,
num_workers: usize,
workers: Vec<Arc<Component<Worker>>>,
worker_refs: Vec<ActorRefStrong<WithSender<WorkPart, ManagerMessage>>>,
outstanding_request: Option<Ask<Work, WorkResult>>,
result_accumulator: Vec<u64>,
}
impl Manager {
fn new(num_workers: usize) -> Self {
Manager {
ctx: ComponentContext::uninitialised(),
num_workers,
workers: Vec::with_capacity(num_workers),
worker_refs: Vec::with_capacity(num_workers),
outstanding_request: None,
result_accumulator: Vec::with_capacity(num_workers + 1),
}
}
}
impl ComponentLifecycle for Manager {
fn on_start(&mut self) -> Handled {
// set up our workers
for _i in 0..self.num_workers {
let worker = self.ctx.system().create(Worker::new);
let worker_ref = worker.actor_ref().hold().expect("live");
self.ctx.system().start(&worker);
self.workers.push(worker);
self.worker_refs.push(worker_ref);
}
Handled::Ok
}
fn on_stop(&mut self) -> Handled {
// clean up after ourselves
self.worker_refs.clear();
let system = self.ctx.system();
self.workers.drain(..).for_each(|worker| {
system.stop(&worker);
});
Handled::Ok
}
fn on_kill(&mut self) -> Handled {
self.on_stop()
}
}
impl Actor for Manager {
type Message = ManagerMessage;
fn receive_local(&mut self, msg: Self::Message) -> Handled {
match msg {
ManagerMessage::Work(msg) => {
assert!(
self.outstanding_request.is_none(),
"One request at a time, please!"
);
let work: &Work = msg.request();
if self.num_workers == 0 {
// manager gotta work itself -> very unhappy manager
let res = work.data.iter().fold(work.neutral, work.merger);
msg.reply(WorkResult(res)).expect("reply");
} else {
let len = work.data.len();
let stride = len / self.num_workers;
let mut start = 0usize;
let mut index = 0;
while start < len && index < self.num_workers {
let end = len.min(start + stride);
let range = start..end;
info!(self.log(), "Assigning {:?} to worker #{}", range, index);
let msg = WorkPart::from(work, range);
let worker = &self.worker_refs[index];
worker.tell(WithSender::from(msg, self));
start += stride;
index += 1;
}
if start < len {
// manager just does the rest itself
let res = work.data[start..len].iter().fold(work.neutral, work.merger);
self.result_accumulator.push(res);
} else {
// just put a neutral element in there, so our count is right in the end
self.result_accumulator.push(work.neutral);
}
self.outstanding_request = Some(msg);
}
}
ManagerMessage::Result(msg) => {
if self.outstanding_request.is_some() {
self.result_accumulator.push(msg.0);
if self.result_accumulator.len() == (self.num_workers + 1) {
let ask = self.outstanding_request.take().expect("ask");
let work: &Work = ask.request();
let res = self
.result_accumulator
.iter()
.fold(work.neutral, work.merger);
self.result_accumulator.clear();
let reply = WorkResult(res);
ask.reply(reply).expect("reply");
}
} else {
error!(
self.log(),
"Got a response without an outstanding promise: {:?}", msg
);
}
}
}
Handled::Ok
}
fn receive_network(&mut self, _msg: NetMessage) -> Handled {
unimplemented!("Still ignoring networking stuff.");
}
}
#[derive(ComponentDefinition)]
struct Worker {
ctx: ComponentContext<Self>,
}
impl Worker {
fn new() -> Self {
Worker {
ctx: ComponentContext::uninitialised(),
}
}
}
ignore_lifecycle!(Worker);
impl Actor for Worker {
type Message = WithSender<WorkPart, ManagerMessage>;
fn receive_local(&mut self, msg: Self::Message) -> Handled {
let my_slice = &msg.data[msg.range.clone()];
let res = my_slice.iter().fold(msg.neutral, msg.merger);
msg.reply(ManagerMessage::Result(WorkResult(res)));
Handled::Ok
}
fn receive_network(&mut self, _msg: NetMessage) -> Handled {
unimplemented!("Still ignoring networking stuff.");
}
}
pub fn main() {
let args: Vec<String> = env::args().collect();
assert_eq!(
3,
args.len(),
"Invalid arguments! Must give number of workers and size of the data array."
);
let num_workers: usize = args[1].parse().expect("number");
let data_size: usize = args[2].parse().expect("number");
run_task(num_workers, data_size);
}
fn run_task(num_workers: usize, data_size: usize) {
let system = KompactConfig::default().build().expect("system");
let manager = system.create(move || Manager::new(num_workers));
system.start(&manager);
let manager_ref = manager.actor_ref().hold().expect("live");
let data: Vec<u64> = (1..=data_size).map(|v| v as u64).collect();
let work = Work::with(data, overflowing_sum, 0u64);
println!("Sending request...");
let res = manager_ref
.ask_with(|promise| ManagerMessage::Work(Ask::new(promise, work)))
.wait();
println!("*******\nGot result: {}\n*******", res.0);
assert_eq!(triangular_number(data_size as u64), res.0);
system.shutdown().expect("shutdown");
}
fn triangular_number(n: u64) -> u64 {
(n * (n + 1u64)) / 2u64
}
fn overflowing_sum(lhs: u64, rhs: &u64) -> u64 {
lhs.overflowing_add(*rhs).0
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_workers() {
run_task(3, 1000);
}
}
Thus, when the worker wants to reply(...)
with a WorkResult
it actually needs to wrap it in a ManagerMessage
instance or the compiler is going to reject it.
#![allow(clippy::unused_unit)]
use kompact::prelude::*;
use std::{env, fmt, ops::Range, sync::Arc};
struct Work {
data: Arc<[u64]>,
merger: fn(u64, &u64) -> u64,
neutral: u64,
}
impl Work {
fn with(data: Vec<u64>, merger: fn(u64, &u64) -> u64, neutral: u64) -> Self {
let moved_data: Arc<[u64]> = data.into_boxed_slice().into();
Work {
data: moved_data,
merger,
neutral,
}
}
}
impl fmt::Debug for Work {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Work{{
data=<data of length={}>,
merger=<function>,
neutral={}
}}",
self.data.len(),
self.neutral
)
}
}
struct WorkPart {
data: Arc<[u64]>,
range: Range<usize>,
merger: fn(u64, &u64) -> u64,
neutral: u64,
}
impl WorkPart {
fn from(work: &Work, range: Range<usize>) -> Self {
WorkPart {
data: work.data.clone(),
range,
merger: work.merger,
neutral: work.neutral,
}
}
}
impl fmt::Debug for WorkPart {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"WorkPart{{
data=<data of length={}>,
range={:?},
merger=<function>,
neutral={}
}}",
self.data.len(),
self.range,
self.neutral
)
}
}
#[derive(Debug)]
struct WorkResult(u64);
#[derive(Debug)]
enum ManagerMessage {
Work(Ask<Work, WorkResult>),
Result(WorkResult),
}
#[derive(ComponentDefinition)]
struct Manager {
ctx: ComponentContext<Self>,
num_workers: usize,
workers: Vec<Arc<Component<Worker>>>,
worker_refs: Vec<ActorRefStrong<WithSender<WorkPart, ManagerMessage>>>,
outstanding_request: Option<Ask<Work, WorkResult>>,
result_accumulator: Vec<u64>,
}
impl Manager {
fn new(num_workers: usize) -> Self {
Manager {
ctx: ComponentContext::uninitialised(),
num_workers,
workers: Vec::with_capacity(num_workers),
worker_refs: Vec::with_capacity(num_workers),
outstanding_request: None,
result_accumulator: Vec::with_capacity(num_workers + 1),
}
}
}
impl ComponentLifecycle for Manager {
fn on_start(&mut self) -> Handled {
// set up our workers
for _i in 0..self.num_workers {
let worker = self.ctx.system().create(Worker::new);
let worker_ref = worker.actor_ref().hold().expect("live");
self.ctx.system().start(&worker);
self.workers.push(worker);
self.worker_refs.push(worker_ref);
}
Handled::Ok
}
fn on_stop(&mut self) -> Handled {
// clean up after ourselves
self.worker_refs.clear();
let system = self.ctx.system();
self.workers.drain(..).for_each(|worker| {
system.stop(&worker);
});
Handled::Ok
}
fn on_kill(&mut self) -> Handled {
self.on_stop()
}
}
impl Actor for Manager {
type Message = ManagerMessage;
fn receive_local(&mut self, msg: Self::Message) -> Handled {
match msg {
ManagerMessage::Work(msg) => {
assert!(
self.outstanding_request.is_none(),
"One request at a time, please!"
);
let work: &Work = msg.request();
if self.num_workers == 0 {
// manager gotta work itself -> very unhappy manager
let res = work.data.iter().fold(work.neutral, work.merger);
msg.reply(WorkResult(res)).expect("reply");
} else {
let len = work.data.len();
let stride = len / self.num_workers;
let mut start = 0usize;
let mut index = 0;
while start < len && index < self.num_workers {
let end = len.min(start + stride);
let range = start..end;
info!(self.log(), "Assigning {:?} to worker #{}", range, index);
let msg = WorkPart::from(work, range);
let worker = &self.worker_refs[index];
worker.tell(WithSender::from(msg, self));
start += stride;
index += 1;
}
if start < len {
// manager just does the rest itself
let res = work.data[start..len].iter().fold(work.neutral, work.merger);
self.result_accumulator.push(res);
} else {
// just put a neutral element in there, so our count is right in the end
self.result_accumulator.push(work.neutral);
}
self.outstanding_request = Some(msg);
}
}
ManagerMessage::Result(msg) => {
if self.outstanding_request.is_some() {
self.result_accumulator.push(msg.0);
if self.result_accumulator.len() == (self.num_workers + 1) {
let ask = self.outstanding_request.take().expect("ask");
let work: &Work = ask.request();
let res = self
.result_accumulator
.iter()
.fold(work.neutral, work.merger);
self.result_accumulator.clear();
let reply = WorkResult(res);
ask.reply(reply).expect("reply");
}
} else {
error!(
self.log(),
"Got a response without an outstanding promise: {:?}", msg
);
}
}
}
Handled::Ok
}
fn receive_network(&mut self, _msg: NetMessage) -> Handled {
unimplemented!("Still ignoring networking stuff.");
}
}
#[derive(ComponentDefinition)]
struct Worker {
ctx: ComponentContext<Self>,
}
impl Worker {
fn new() -> Self {
Worker {
ctx: ComponentContext::uninitialised(),
}
}
}
ignore_lifecycle!(Worker);
impl Actor for Worker {
type Message = WithSender<WorkPart, ManagerMessage>;
fn receive_local(&mut self, msg: Self::Message) -> Handled {
let my_slice = &msg.data[msg.range.clone()];
let res = my_slice.iter().fold(msg.neutral, msg.merger);
msg.reply(ManagerMessage::Result(WorkResult(res)));
Handled::Ok
}
fn receive_network(&mut self, _msg: NetMessage) -> Handled {
unimplemented!("Still ignoring networking stuff.");
}
}
pub fn main() {
let args: Vec<String> = env::args().collect();
assert_eq!(
3,
args.len(),
"Invalid arguments! Must give number of workers and size of the data array."
);
let num_workers: usize = args[1].parse().expect("number");
let data_size: usize = args[2].parse().expect("number");
run_task(num_workers, data_size);
}
fn run_task(num_workers: usize, data_size: usize) {
let system = KompactConfig::default().build().expect("system");
let manager = system.create(move || Manager::new(num_workers));
system.start(&manager);
let manager_ref = manager.actor_ref().hold().expect("live");
let data: Vec<u64> = (1..=data_size).map(|v| v as u64).collect();
let work = Work::with(data, overflowing_sum, 0u64);
println!("Sending request...");
let res = manager_ref
.ask_with(|promise| ManagerMessage::Work(Ask::new(promise, work)))
.wait();
println!("*******\nGot result: {}\n*******", res.0);
assert_eq!(triangular_number(data_size as u64), res.0);
system.shutdown().expect("shutdown");
}
fn triangular_number(n: u64) -> u64 {
(n * (n + 1u64)) / 2u64
}
fn overflowing_sum(lhs: u64, rhs: &u64) -> u64 {
lhs.overflowing_add(*rhs).0
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_workers() {
run_task(3, 1000);
}
}
In the manager we must first update our state to reflect the new message (and thus reference) types.
#![allow(clippy::unused_unit)]
use kompact::prelude::*;
use std::{env, fmt, ops::Range, sync::Arc};
struct Work {
data: Arc<[u64]>,
merger: fn(u64, &u64) -> u64,
neutral: u64,
}
impl Work {
fn with(data: Vec<u64>, merger: fn(u64, &u64) -> u64, neutral: u64) -> Self {
let moved_data: Arc<[u64]> = data.into_boxed_slice().into();
Work {
data: moved_data,
merger,
neutral,
}
}
}
impl fmt::Debug for Work {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Work{{
data=<data of length={}>,
merger=<function>,
neutral={}
}}",
self.data.len(),
self.neutral
)
}
}
struct WorkPart {
data: Arc<[u64]>,
range: Range<usize>,
merger: fn(u64, &u64) -> u64,
neutral: u64,
}
impl WorkPart {
fn from(work: &Work, range: Range<usize>) -> Self {
WorkPart {
data: work.data.clone(),
range,
merger: work.merger,
neutral: work.neutral,
}
}
}
impl fmt::Debug for WorkPart {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"WorkPart{{
data=<data of length={}>,
range={:?},
merger=<function>,
neutral={}
}}",
self.data.len(),
self.range,
self.neutral
)
}
}
#[derive(Debug)]
struct WorkResult(u64);
#[derive(Debug)]
enum ManagerMessage {
Work(Ask<Work, WorkResult>),
Result(WorkResult),
}
#[derive(ComponentDefinition)]
struct Manager {
ctx: ComponentContext<Self>,
num_workers: usize,
workers: Vec<Arc<Component<Worker>>>,
worker_refs: Vec<ActorRefStrong<WithSender<WorkPart, ManagerMessage>>>,
outstanding_request: Option<Ask<Work, WorkResult>>,
result_accumulator: Vec<u64>,
}
impl Manager {
fn new(num_workers: usize) -> Self {
Manager {
ctx: ComponentContext::uninitialised(),
num_workers,
workers: Vec::with_capacity(num_workers),
worker_refs: Vec::with_capacity(num_workers),
outstanding_request: None,
result_accumulator: Vec::with_capacity(num_workers + 1),
}
}
}
impl ComponentLifecycle for Manager {
fn on_start(&mut self) -> Handled {
// set up our workers
for _i in 0..self.num_workers {
let worker = self.ctx.system().create(Worker::new);
let worker_ref = worker.actor_ref().hold().expect("live");
self.ctx.system().start(&worker);
self.workers.push(worker);
self.worker_refs.push(worker_ref);
}
Handled::Ok
}
fn on_stop(&mut self) -> Handled {
// clean up after ourselves
self.worker_refs.clear();
let system = self.ctx.system();
self.workers.drain(..).for_each(|worker| {
system.stop(&worker);
});
Handled::Ok
}
fn on_kill(&mut self) -> Handled {
self.on_stop()
}
}
impl Actor for Manager {
type Message = ManagerMessage;
fn receive_local(&mut self, msg: Self::Message) -> Handled {
match msg {
ManagerMessage::Work(msg) => {
assert!(
self.outstanding_request.is_none(),
"One request at a time, please!"
);
let work: &Work = msg.request();
if self.num_workers == 0 {
// manager gotta work itself -> very unhappy manager
let res = work.data.iter().fold(work.neutral, work.merger);
msg.reply(WorkResult(res)).expect("reply");
} else {
let len = work.data.len();
let stride = len / self.num_workers;
let mut start = 0usize;
let mut index = 0;
while start < len && index < self.num_workers {
let end = len.min(start + stride);
let range = start..end;
info!(self.log(), "Assigning {:?} to worker #{}", range, index);
let msg = WorkPart::from(work, range);
let worker = &self.worker_refs[index];
worker.tell(WithSender::from(msg, self));
start += stride;
index += 1;
}
if start < len {
// manager just does the rest itself
let res = work.data[start..len].iter().fold(work.neutral, work.merger);
self.result_accumulator.push(res);
} else {
// just put a neutral element in there, so our count is right in the end
self.result_accumulator.push(work.neutral);
}
self.outstanding_request = Some(msg);
}
}
ManagerMessage::Result(msg) => {
if self.outstanding_request.is_some() {
self.result_accumulator.push(msg.0);
if self.result_accumulator.len() == (self.num_workers + 1) {
let ask = self.outstanding_request.take().expect("ask");
let work: &Work = ask.request();
let res = self
.result_accumulator
.iter()
.fold(work.neutral, work.merger);
self.result_accumulator.clear();
let reply = WorkResult(res);
ask.reply(reply).expect("reply");
}
} else {
error!(
self.log(),
"Got a response without an outstanding promise: {:?}", msg
);
}
}
}
Handled::Ok
}
fn receive_network(&mut self, _msg: NetMessage) -> Handled {
unimplemented!("Still ignoring networking stuff.");
}
}
#[derive(ComponentDefinition)]
struct Worker {
ctx: ComponentContext<Self>,
}
impl Worker {
fn new() -> Self {
Worker {
ctx: ComponentContext::uninitialised(),
}
}
}
ignore_lifecycle!(Worker);
impl Actor for Worker {
type Message = WithSender<WorkPart, ManagerMessage>;
fn receive_local(&mut self, msg: Self::Message) -> Handled {
let my_slice = &msg.data[msg.range.clone()];
let res = my_slice.iter().fold(msg.neutral, msg.merger);
msg.reply(ManagerMessage::Result(WorkResult(res)));
Handled::Ok
}
fn receive_network(&mut self, _msg: NetMessage) -> Handled {
unimplemented!("Still ignoring networking stuff.");
}
}
pub fn main() {
let args: Vec<String> = env::args().collect();
assert_eq!(
3,
args.len(),
"Invalid arguments! Must give number of workers and size of the data array."
);
let num_workers: usize = args[1].parse().expect("number");
let data_size: usize = args[2].parse().expect("number");
run_task(num_workers, data_size);
}
fn run_task(num_workers: usize, data_size: usize) {
let system = KompactConfig::default().build().expect("system");
let manager = system.create(move || Manager::new(num_workers));
system.start(&manager);
let manager_ref = manager.actor_ref().hold().expect("live");
let data: Vec<u64> = (1..=data_size).map(|v| v as u64).collect();
let work = Work::with(data, overflowing_sum, 0u64);
println!("Sending request...");
let res = manager_ref
.ask_with(|promise| ManagerMessage::Work(Ask::new(promise, work)))
.wait();
println!("*******\nGot result: {}\n*******", res.0);
assert_eq!(triangular_number(data_size as u64), res.0);
system.shutdown().expect("shutdown");
}
fn triangular_number(n: u64) -> u64 {
(n * (n + 1u64)) / 2u64
}
fn overflowing_sum(lhs: u64, rhs: &u64) -> u64 {
lhs.overflowing_add(*rhs).0
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_workers() {
run_task(3, 1000);
}
}
We also remove the port connection logic from the ComponentLifecycle
handler. Then we change the Message
type of the manager to ManagerMessage
and match on the ManagerMessage
variant in the receive_local(...)
function. For the ManagerMessage::Work
variant, we basically do the same thing as in the old receive_local(...)
function, except that we construct a WithSender
instance from the WorkPart
instead of sending it directly to the worker. We then simply copy the code from the old WorkResult
handler into the branch for ManagerMessage::Result
.
#![allow(clippy::unused_unit)]
use kompact::prelude::*;
use std::{env, fmt, ops::Range, sync::Arc};
struct Work {
data: Arc<[u64]>,
merger: fn(u64, &u64) -> u64,
neutral: u64,
}
impl Work {
fn with(data: Vec<u64>, merger: fn(u64, &u64) -> u64, neutral: u64) -> Self {
let moved_data: Arc<[u64]> = data.into_boxed_slice().into();
Work {
data: moved_data,
merger,
neutral,
}
}
}
impl fmt::Debug for Work {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Work{{
data=<data of length={}>,
merger=<function>,
neutral={}
}}",
self.data.len(),
self.neutral
)
}
}
struct WorkPart {
data: Arc<[u64]>,
range: Range<usize>,
merger: fn(u64, &u64) -> u64,
neutral: u64,
}
impl WorkPart {
fn from(work: &Work, range: Range<usize>) -> Self {
WorkPart {
data: work.data.clone(),
range,
merger: work.merger,
neutral: work.neutral,
}
}
}
impl fmt::Debug for WorkPart {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"WorkPart{{
data=<data of length={}>,
range={:?},
merger=<function>,
neutral={}
}}",
self.data.len(),
self.range,
self.neutral
)
}
}
#[derive(Debug)]
struct WorkResult(u64);
#[derive(Debug)]
enum ManagerMessage {
Work(Ask<Work, WorkResult>),
Result(WorkResult),
}
#[derive(ComponentDefinition)]
struct Manager {
ctx: ComponentContext<Self>,
num_workers: usize,
workers: Vec<Arc<Component<Worker>>>,
worker_refs: Vec<ActorRefStrong<WithSender<WorkPart, ManagerMessage>>>,
outstanding_request: Option<Ask<Work, WorkResult>>,
result_accumulator: Vec<u64>,
}
impl Manager {
fn new(num_workers: usize) -> Self {
Manager {
ctx: ComponentContext::uninitialised(),
num_workers,
workers: Vec::with_capacity(num_workers),
worker_refs: Vec::with_capacity(num_workers),
outstanding_request: None,
result_accumulator: Vec::with_capacity(num_workers + 1),
}
}
}
impl ComponentLifecycle for Manager {
fn on_start(&mut self) -> Handled {
// set up our workers
for _i in 0..self.num_workers {
let worker = self.ctx.system().create(Worker::new);
let worker_ref = worker.actor_ref().hold().expect("live");
self.ctx.system().start(&worker);
self.workers.push(worker);
self.worker_refs.push(worker_ref);
}
Handled::Ok
}
fn on_stop(&mut self) -> Handled {
// clean up after ourselves
self.worker_refs.clear();
let system = self.ctx.system();
self.workers.drain(..).for_each(|worker| {
system.stop(&worker);
});
Handled::Ok
}
fn on_kill(&mut self) -> Handled {
self.on_stop()
}
}
impl Actor for Manager {
type Message = ManagerMessage;
fn receive_local(&mut self, msg: Self::Message) -> Handled {
match msg {
ManagerMessage::Work(msg) => {
assert!(
self.outstanding_request.is_none(),
"One request at a time, please!"
);
let work: &Work = msg.request();
if self.num_workers == 0 {
// manager gotta work itself -> very unhappy manager
let res = work.data.iter().fold(work.neutral, work.merger);
msg.reply(WorkResult(res)).expect("reply");
} else {
let len = work.data.len();
let stride = len / self.num_workers;
let mut start = 0usize;
let mut index = 0;
while start < len && index < self.num_workers {
let end = len.min(start + stride);
let range = start..end;
info!(self.log(), "Assigning {:?} to worker #{}", range, index);
let msg = WorkPart::from(work, range);
let worker = &self.worker_refs[index];
worker.tell(WithSender::from(msg, self));
start += stride;
index += 1;
}
if start < len {
// manager just does the rest itself
let res = work.data[start..len].iter().fold(work.neutral, work.merger);
self.result_accumulator.push(res);
} else {
// just put a neutral element in there, so our count is right in the end
self.result_accumulator.push(work.neutral);
}
self.outstanding_request = Some(msg);
}
}
ManagerMessage::Result(msg) => {
if self.outstanding_request.is_some() {
self.result_accumulator.push(msg.0);
if self.result_accumulator.len() == (self.num_workers + 1) {
let ask = self.outstanding_request.take().expect("ask");
let work: &Work = ask.request();
let res = self
.result_accumulator
.iter()
.fold(work.neutral, work.merger);
self.result_accumulator.clear();
let reply = WorkResult(res);
ask.reply(reply).expect("reply");
}
} else {
error!(
self.log(),
"Got a response without an outstanding promise: {:?}", msg
);
}
}
}
Handled::Ok
}
fn receive_network(&mut self, _msg: NetMessage) -> Handled {
unimplemented!("Still ignoring networking stuff.");
}
}
#[derive(ComponentDefinition)]
struct Worker {
ctx: ComponentContext<Self>,
}
impl Worker {
fn new() -> Self {
Worker {
ctx: ComponentContext::uninitialised(),
}
}
}
ignore_lifecycle!(Worker);
impl Actor for Worker {
type Message = WithSender<WorkPart, ManagerMessage>;
fn receive_local(&mut self, msg: Self::Message) -> Handled {
let my_slice = &msg.data[msg.range.clone()];
let res = my_slice.iter().fold(msg.neutral, msg.merger);
msg.reply(ManagerMessage::Result(WorkResult(res)));
Handled::Ok
}
fn receive_network(&mut self, _msg: NetMessage) -> Handled {
unimplemented!("Still ignoring networking stuff.");
}
}
pub fn main() {
let args: Vec<String> = env::args().collect();
assert_eq!(
3,
args.len(),
"Invalid arguments! Must give number of workers and size of the data array."
);
let num_workers: usize = args[1].parse().expect("number");
let data_size: usize = args[2].parse().expect("number");
run_task(num_workers, data_size);
}
fn run_task(num_workers: usize, data_size: usize) {
let system = KompactConfig::default().build().expect("system");
let manager = system.create(move || Manager::new(num_workers));
system.start(&manager);
let manager_ref = manager.actor_ref().hold().expect("live");
let data: Vec<u64> = (1..=data_size).map(|v| v as u64).collect();
let work = Work::with(data, overflowing_sum, 0u64);
println!("Sending request...");
let res = manager_ref
.ask_with(|promise| ManagerMessage::Work(Ask::new(promise, work)))
.wait();
println!("*******\nGot result: {}\n*******", res.0);
assert_eq!(triangular_number(data_size as u64), res.0);
system.shutdown().expect("shutdown");
}
fn triangular_number(n: u64) -> u64 {
(n * (n + 1u64)) / 2u64
}
fn overflowing_sum(lhs: u64, rhs: &u64) -> u64 {
lhs.overflowing_add(*rhs).0
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_workers() {
run_task(3, 1000);
}
}
The receive_local(...)
function is getting pretty long, so we should probably decompose it into smaller private functions if we actually wanted to maintain this code.
Now finally, when we want to send the Ask
from the main-thread, we also need to wrap it into ManagerMessage::Work
. This prevents us from simply using ActorRef::ask
, as it only produces an Ask
instance, not our wrapper ManagerMessage
. This gets us back to previously mentioned ActorRef::ask_with
function, which allows us to construct our Ask
instance and put it into our wrapper ourselves. If we were to use this construction in many places throughout or code, it would likely be a good idea to use a constructor function on ManagerMessage
to map the promise
and the work
values to the proper structure.
#![allow(clippy::unused_unit)]
use kompact::prelude::*;
use std::{env, fmt, ops::Range, sync::Arc};
struct Work {
data: Arc<[u64]>,
merger: fn(u64, &u64) -> u64,
neutral: u64,
}
impl Work {
fn with(data: Vec<u64>, merger: fn(u64, &u64) -> u64, neutral: u64) -> Self {
let moved_data: Arc<[u64]> = data.into_boxed_slice().into();
Work {
data: moved_data,
merger,
neutral,
}
}
}
impl fmt::Debug for Work {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Work{{
data=<data of length={}>,
merger=<function>,
neutral={}
}}",
self.data.len(),
self.neutral
)
}
}
struct WorkPart {
data: Arc<[u64]>,
range: Range<usize>,
merger: fn(u64, &u64) -> u64,
neutral: u64,
}
impl WorkPart {
fn from(work: &Work, range: Range<usize>) -> Self {
WorkPart {
data: work.data.clone(),
range,
merger: work.merger,
neutral: work.neutral,
}
}
}
impl fmt::Debug for WorkPart {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"WorkPart{{
data=<data of length={}>,
range={:?},
merger=<function>,
neutral={}
}}",
self.data.len(),
self.range,
self.neutral
)
}
}
#[derive(Debug)]
struct WorkResult(u64);
#[derive(Debug)]
enum ManagerMessage {
Work(Ask<Work, WorkResult>),
Result(WorkResult),
}
#[derive(ComponentDefinition)]
struct Manager {
ctx: ComponentContext<Self>,
num_workers: usize,
workers: Vec<Arc<Component<Worker>>>,
worker_refs: Vec<ActorRefStrong<WithSender<WorkPart, ManagerMessage>>>,
outstanding_request: Option<Ask<Work, WorkResult>>,
result_accumulator: Vec<u64>,
}
impl Manager {
fn new(num_workers: usize) -> Self {
Manager {
ctx: ComponentContext::uninitialised(),
num_workers,
workers: Vec::with_capacity(num_workers),
worker_refs: Vec::with_capacity(num_workers),
outstanding_request: None,
result_accumulator: Vec::with_capacity(num_workers + 1),
}
}
}
impl ComponentLifecycle for Manager {
fn on_start(&mut self) -> Handled {
// set up our workers
for _i in 0..self.num_workers {
let worker = self.ctx.system().create(Worker::new);
let worker_ref = worker.actor_ref().hold().expect("live");
self.ctx.system().start(&worker);
self.workers.push(worker);
self.worker_refs.push(worker_ref);
}
Handled::Ok
}
fn on_stop(&mut self) -> Handled {
// clean up after ourselves
self.worker_refs.clear();
let system = self.ctx.system();
self.workers.drain(..).for_each(|worker| {
system.stop(&worker);
});
Handled::Ok
}
fn on_kill(&mut self) -> Handled {
self.on_stop()
}
}
impl Actor for Manager {
type Message = ManagerMessage;
fn receive_local(&mut self, msg: Self::Message) -> Handled {
match msg {
ManagerMessage::Work(msg) => {
assert!(
self.outstanding_request.is_none(),
"One request at a time, please!"
);
let work: &Work = msg.request();
if self.num_workers == 0 {
// manager gotta work itself -> very unhappy manager
let res = work.data.iter().fold(work.neutral, work.merger);
msg.reply(WorkResult(res)).expect("reply");
} else {
let len = work.data.len();
let stride = len / self.num_workers;
let mut start = 0usize;
let mut index = 0;
while start < len && index < self.num_workers {
let end = len.min(start + stride);
let range = start..end;
info!(self.log(), "Assigning {:?} to worker #{}", range, index);
let msg = WorkPart::from(work, range);
let worker = &self.worker_refs[index];
worker.tell(WithSender::from(msg, self));
start += stride;
index += 1;
}
if start < len {
// manager just does the rest itself
let res = work.data[start..len].iter().fold(work.neutral, work.merger);
self.result_accumulator.push(res);
} else {
// just put a neutral element in there, so our count is right in the end
self.result_accumulator.push(work.neutral);
}
self.outstanding_request = Some(msg);
}
}
ManagerMessage::Result(msg) => {
if self.outstanding_request.is_some() {
self.result_accumulator.push(msg.0);
if self.result_accumulator.len() == (self.num_workers + 1) {
let ask = self.outstanding_request.take().expect("ask");
let work: &Work = ask.request();
let res = self
.result_accumulator
.iter()
.fold(work.neutral, work.merger);
self.result_accumulator.clear();
let reply = WorkResult(res);
ask.reply(reply).expect("reply");
}
} else {
error!(
self.log(),
"Got a response without an outstanding promise: {:?}", msg
);
}
}
}
Handled::Ok
}
fn receive_network(&mut self, _msg: NetMessage) -> Handled {
unimplemented!("Still ignoring networking stuff.");
}
}
#[derive(ComponentDefinition)]
struct Worker {
ctx: ComponentContext<Self>,
}
impl Worker {
fn new() -> Self {
Worker {
ctx: ComponentContext::uninitialised(),
}
}
}
ignore_lifecycle!(Worker);
impl Actor for Worker {
type Message = WithSender<WorkPart, ManagerMessage>;
fn receive_local(&mut self, msg: Self::Message) -> Handled {
let my_slice = &msg.data[msg.range.clone()];
let res = my_slice.iter().fold(msg.neutral, msg.merger);
msg.reply(ManagerMessage::Result(WorkResult(res)));
Handled::Ok
}
fn receive_network(&mut self, _msg: NetMessage) -> Handled {
unimplemented!("Still ignoring networking stuff.");
}
}
pub fn main() {
let args: Vec<String> = env::args().collect();
assert_eq!(
3,
args.len(),
"Invalid arguments! Must give number of workers and size of the data array."
);
let num_workers: usize = args[1].parse().expect("number");
let data_size: usize = args[2].parse().expect("number");
run_task(num_workers, data_size);
}
fn run_task(num_workers: usize, data_size: usize) {
let system = KompactConfig::default().build().expect("system");
let manager = system.create(move || Manager::new(num_workers));
system.start(&manager);
let manager_ref = manager.actor_ref().hold().expect("live");
let data: Vec<u64> = (1..=data_size).map(|v| v as u64).collect();
let work = Work::with(data, overflowing_sum, 0u64);
println!("Sending request...");
let res = manager_ref
.ask_with(|promise| ManagerMessage::Work(Ask::new(promise, work)))
.wait();
println!("*******\nGot result: {}\n*******", res.0);
assert_eq!(triangular_number(data_size as u64), res.0);
system.shutdown().expect("shutdown");
}
fn triangular_number(n: u64) -> u64 {
(n * (n + 1u64)) / 2u64
}
fn overflowing_sum(lhs: u64, rhs: &u64) -> u64 {
lhs.overflowing_add(*rhs).0
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_workers() {
run_task(3, 1000);
}
}
At this point we should able to run the example again, and see the same behaviour as before.
Note: As before, if you have checked out the examples folder you can run the concrete binary with:
cargo run --release --bin workers_sender 4 100000