Messages and Events
To begin with we must decide what we want to send over events and what over messages, and how exactly our message/event types should look like.
Messages
For the incoming work assignments, we want some kind of request-response-style communication pattern, so we can reply once the aggregation is complete.
So we will use Actor communication for this part of the example, so that we can later use the “ask”-pattern again from the main-thread. For now, we know that the result of the work is a u64
, which we will wrap in a WorkResult
struct for clarity of purpose.
#![allow(clippy::unused_unit)]
use kompact::prelude::*;
use std::{env, fmt, ops::Range, sync::Arc};
struct Work {
data: Arc<[u64]>,
merger: fn(u64, &u64) -> u64,
neutral: u64,
}
impl Work {
fn with(data: Vec<u64>, merger: fn(u64, &u64) -> u64, neutral: u64) -> Self {
let moved_data: Arc<[u64]> = data.into_boxed_slice().into();
Work {
data: moved_data,
merger,
neutral,
}
}
}
impl fmt::Debug for Work {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Work{{
data=<data of length={}>,
merger=<function>,
neutral={}
}}",
self.data.len(),
self.neutral
)
}
}
struct WorkPart {
data: Arc<[u64]>,
range: Range<usize>,
merger: fn(u64, &u64) -> u64,
neutral: u64,
}
impl WorkPart {
fn from(work: &Work, range: Range<usize>) -> Self {
WorkPart {
data: work.data.clone(),
range,
merger: work.merger,
neutral: work.neutral,
}
}
}
impl fmt::Debug for WorkPart {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"WorkPart{{
data=<data of length={}>,
range={:?},
merger=<function>,
neutral={}
}}",
self.data.len(),
self.range,
self.neutral
)
}
}
#[derive(Clone, Debug)]
struct WorkResult(u64);
struct WorkerPort;
impl Port for WorkerPort {
type Indication = WorkResult;
type Request = Never;
}
#[derive(ComponentDefinition)]
struct Manager {
ctx: ComponentContext<Self>,
worker_port: RequiredPort<WorkerPort>,
num_workers: usize,
workers: Vec<Arc<Component<Worker>>>,
worker_refs: Vec<ActorRefStrong<WorkPart>>,
outstanding_request: Option<Ask<Work, WorkResult>>,
result_accumulator: Vec<u64>,
}
impl Manager {
fn new(num_workers: usize) -> Self {
Manager {
ctx: ComponentContext::uninitialised(),
worker_port: RequiredPort::uninitialised(),
num_workers,
workers: Vec::with_capacity(num_workers),
worker_refs: Vec::with_capacity(num_workers),
outstanding_request: None,
result_accumulator: Vec::with_capacity(num_workers + 1),
}
}
}
impl ComponentLifecycle for Manager {
fn on_start(&mut self) -> Handled {
// set up our workers
for _i in 0..self.num_workers {
let worker = self.ctx.system().create(Worker::new);
worker.connect_to_required(self.worker_port.share());
let worker_ref = worker.actor_ref().hold().expect("live");
self.ctx.system().start(&worker);
self.workers.push(worker);
self.worker_refs.push(worker_ref);
}
Handled::Ok
}
fn on_stop(&mut self) -> Handled {
// clean up after ourselves
self.worker_refs.clear();
let system = self.ctx.system();
self.workers.drain(..).for_each(|worker| {
system.stop(&worker);
});
Handled::Ok
}
fn on_kill(&mut self) -> Handled {
self.on_stop()
}
}
impl Require<WorkerPort> for Manager {
fn handle(&mut self, event: WorkResult) -> Handled {
if self.outstanding_request.is_some() {
self.result_accumulator.push(event.0);
if self.result_accumulator.len() == (self.num_workers + 1) {
let ask = self.outstanding_request.take().expect("ask");
let work: &Work = ask.request();
let res = self
.result_accumulator
.iter()
.fold(work.neutral, work.merger);
self.result_accumulator.clear();
let reply = WorkResult(res);
ask.reply(reply).expect("reply");
}
} else {
error!(
self.log(),
"Got a response without an outstanding promise: {:?}", event
);
}
Handled::Ok
}
}
impl Actor for Manager {
type Message = Ask<Work, WorkResult>;
fn receive_local(&mut self, msg: Self::Message) -> Handled {
assert!(
self.outstanding_request.is_none(),
"One request at a time, please!"
);
let work: &Work = msg.request();
if self.num_workers == 0 {
// manager gotta work itself -> very unhappy manager
let res = work.data.iter().fold(work.neutral, work.merger);
msg.reply(WorkResult(res)).expect("reply");
} else {
let len = work.data.len();
let stride = len / self.num_workers;
let mut start = 0usize;
let mut index = 0;
while start < len && index < self.num_workers {
let end = len.min(start + stride);
let range = start..end;
info!(self.log(), "Assigning {:?} to worker #{}", range, index);
let msg = WorkPart::from(work, range);
let worker = &self.worker_refs[index];
worker.tell(msg);
start += stride;
index += 1;
}
if start < len {
// manager just does the rest itself
let res = work.data[start..len].iter().fold(work.neutral, work.merger);
self.result_accumulator.push(res);
} else {
// just put a neutral element in there, so our count is right in the end
self.result_accumulator.push(work.neutral);
}
self.outstanding_request = Some(msg);
}
Handled::Ok
}
fn receive_network(&mut self, _msg: NetMessage) -> Handled {
unimplemented!("Still ignoring networking stuff.");
}
}
#[derive(ComponentDefinition)]
struct Worker {
ctx: ComponentContext<Self>,
worker_port: ProvidedPort<WorkerPort>,
}
impl Worker {
fn new() -> Self {
Worker {
ctx: ComponentContext::uninitialised(),
worker_port: ProvidedPort::uninitialised(),
}
}
}
ignore_lifecycle!(Worker);
ignore_requests!(WorkerPort, Worker);
impl Actor for Worker {
type Message = WorkPart;
fn receive_local(&mut self, msg: Self::Message) -> Handled {
let my_slice = &msg.data[msg.range];
let res = my_slice.iter().fold(msg.neutral, msg.merger);
self.worker_port.trigger(WorkResult(res));
Handled::Ok
}
fn receive_network(&mut self, _msg: NetMessage) -> Handled {
unimplemented!("Still ignoring networking stuff.");
}
}
pub fn main() {
let args: Vec<String> = env::args().collect();
assert_eq!(
3,
args.len(),
"Invalid arguments! Must give number of workers and size of the data array."
);
let num_workers: usize = args[1].parse().expect("number");
let data_size: usize = args[2].parse().expect("number");
run_task(num_workers, data_size);
}
fn run_task(num_workers: usize, data_size: usize) {
let system = KompactConfig::default().build().expect("system");
let manager = system.create(move || Manager::new(num_workers));
system.start(&manager);
let manager_ref = manager.actor_ref().hold().expect("live");
let data: Vec<u64> = (1..=data_size).map(|v| v as u64).collect();
let work = Work::with(data, overflowing_sum, 0u64);
println!("Sending request...");
let res = manager_ref.ask(work).wait();
println!("*******\nGot result: {}\n*******", res.0);
assert_eq!(triangular_number(data_size as u64), res.0);
system.shutdown().expect("shutdown");
}
fn triangular_number(n: u64) -> u64 {
(n * (n + 1u64)) / 2u64
}
fn overflowing_sum(lhs: u64, rhs: &u64) -> u64 {
lhs.overflowing_add(*rhs).0
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_workers() {
run_task(3, 1000);
}
}
We also know that we need to pass some data array and an aggregation function when we make a request for work to be done. Since we will want to share the data later with our workers, we’ll put it into an atomic reference, i.e. Arc<[u64]>
. For the aggregation function, we’ll simply pass a function pointer of type fn(u64, &u64) -> u64
, which is the signature accepted by the fold
function on an iterator. However, in order to start a fold
, we also need a neutral element, which depends on the aggregation function. So we add that to the work request as a field as well.
#![allow(clippy::unused_unit)]
use kompact::prelude::*;
use std::{env, fmt, ops::Range, sync::Arc};
struct Work {
data: Arc<[u64]>,
merger: fn(u64, &u64) -> u64,
neutral: u64,
}
impl Work {
fn with(data: Vec<u64>, merger: fn(u64, &u64) -> u64, neutral: u64) -> Self {
let moved_data: Arc<[u64]> = data.into_boxed_slice().into();
Work {
data: moved_data,
merger,
neutral,
}
}
}
impl fmt::Debug for Work {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Work{{
data=<data of length={}>,
merger=<function>,
neutral={}
}}",
self.data.len(),
self.neutral
)
}
}
struct WorkPart {
data: Arc<[u64]>,
range: Range<usize>,
merger: fn(u64, &u64) -> u64,
neutral: u64,
}
impl WorkPart {
fn from(work: &Work, range: Range<usize>) -> Self {
WorkPart {
data: work.data.clone(),
range,
merger: work.merger,
neutral: work.neutral,
}
}
}
impl fmt::Debug for WorkPart {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"WorkPart{{
data=<data of length={}>,
range={:?},
merger=<function>,
neutral={}
}}",
self.data.len(),
self.range,
self.neutral
)
}
}
#[derive(Clone, Debug)]
struct WorkResult(u64);
struct WorkerPort;
impl Port for WorkerPort {
type Indication = WorkResult;
type Request = Never;
}
#[derive(ComponentDefinition)]
struct Manager {
ctx: ComponentContext<Self>,
worker_port: RequiredPort<WorkerPort>,
num_workers: usize,
workers: Vec<Arc<Component<Worker>>>,
worker_refs: Vec<ActorRefStrong<WorkPart>>,
outstanding_request: Option<Ask<Work, WorkResult>>,
result_accumulator: Vec<u64>,
}
impl Manager {
fn new(num_workers: usize) -> Self {
Manager {
ctx: ComponentContext::uninitialised(),
worker_port: RequiredPort::uninitialised(),
num_workers,
workers: Vec::with_capacity(num_workers),
worker_refs: Vec::with_capacity(num_workers),
outstanding_request: None,
result_accumulator: Vec::with_capacity(num_workers + 1),
}
}
}
impl ComponentLifecycle for Manager {
fn on_start(&mut self) -> Handled {
// set up our workers
for _i in 0..self.num_workers {
let worker = self.ctx.system().create(Worker::new);
worker.connect_to_required(self.worker_port.share());
let worker_ref = worker.actor_ref().hold().expect("live");
self.ctx.system().start(&worker);
self.workers.push(worker);
self.worker_refs.push(worker_ref);
}
Handled::Ok
}
fn on_stop(&mut self) -> Handled {
// clean up after ourselves
self.worker_refs.clear();
let system = self.ctx.system();
self.workers.drain(..).for_each(|worker| {
system.stop(&worker);
});
Handled::Ok
}
fn on_kill(&mut self) -> Handled {
self.on_stop()
}
}
impl Require<WorkerPort> for Manager {
fn handle(&mut self, event: WorkResult) -> Handled {
if self.outstanding_request.is_some() {
self.result_accumulator.push(event.0);
if self.result_accumulator.len() == (self.num_workers + 1) {
let ask = self.outstanding_request.take().expect("ask");
let work: &Work = ask.request();
let res = self
.result_accumulator
.iter()
.fold(work.neutral, work.merger);
self.result_accumulator.clear();
let reply = WorkResult(res);
ask.reply(reply).expect("reply");
}
} else {
error!(
self.log(),
"Got a response without an outstanding promise: {:?}", event
);
}
Handled::Ok
}
}
impl Actor for Manager {
type Message = Ask<Work, WorkResult>;
fn receive_local(&mut self, msg: Self::Message) -> Handled {
assert!(
self.outstanding_request.is_none(),
"One request at a time, please!"
);
let work: &Work = msg.request();
if self.num_workers == 0 {
// manager gotta work itself -> very unhappy manager
let res = work.data.iter().fold(work.neutral, work.merger);
msg.reply(WorkResult(res)).expect("reply");
} else {
let len = work.data.len();
let stride = len / self.num_workers;
let mut start = 0usize;
let mut index = 0;
while start < len && index < self.num_workers {
let end = len.min(start + stride);
let range = start..end;
info!(self.log(), "Assigning {:?} to worker #{}", range, index);
let msg = WorkPart::from(work, range);
let worker = &self.worker_refs[index];
worker.tell(msg);
start += stride;
index += 1;
}
if start < len {
// manager just does the rest itself
let res = work.data[start..len].iter().fold(work.neutral, work.merger);
self.result_accumulator.push(res);
} else {
// just put a neutral element in there, so our count is right in the end
self.result_accumulator.push(work.neutral);
}
self.outstanding_request = Some(msg);
}
Handled::Ok
}
fn receive_network(&mut self, _msg: NetMessage) -> Handled {
unimplemented!("Still ignoring networking stuff.");
}
}
#[derive(ComponentDefinition)]
struct Worker {
ctx: ComponentContext<Self>,
worker_port: ProvidedPort<WorkerPort>,
}
impl Worker {
fn new() -> Self {
Worker {
ctx: ComponentContext::uninitialised(),
worker_port: ProvidedPort::uninitialised(),
}
}
}
ignore_lifecycle!(Worker);
ignore_requests!(WorkerPort, Worker);
impl Actor for Worker {
type Message = WorkPart;
fn receive_local(&mut self, msg: Self::Message) -> Handled {
let my_slice = &msg.data[msg.range];
let res = my_slice.iter().fold(msg.neutral, msg.merger);
self.worker_port.trigger(WorkResult(res));
Handled::Ok
}
fn receive_network(&mut self, _msg: NetMessage) -> Handled {
unimplemented!("Still ignoring networking stuff.");
}
}
pub fn main() {
let args: Vec<String> = env::args().collect();
assert_eq!(
3,
args.len(),
"Invalid arguments! Must give number of workers and size of the data array."
);
let num_workers: usize = args[1].parse().expect("number");
let data_size: usize = args[2].parse().expect("number");
run_task(num_workers, data_size);
}
fn run_task(num_workers: usize, data_size: usize) {
let system = KompactConfig::default().build().expect("system");
let manager = system.create(move || Manager::new(num_workers));
system.start(&manager);
let manager_ref = manager.actor_ref().hold().expect("live");
let data: Vec<u64> = (1..=data_size).map(|v| v as u64).collect();
let work = Work::with(data, overflowing_sum, 0u64);
println!("Sending request...");
let res = manager_ref.ask(work).wait();
println!("*******\nGot result: {}\n*******", res.0);
assert_eq!(triangular_number(data_size as u64), res.0);
system.shutdown().expect("shutdown");
}
fn triangular_number(n: u64) -> u64 {
(n * (n + 1u64)) / 2u64
}
fn overflowing_sum(lhs: u64, rhs: &u64) -> u64 {
lhs.overflowing_add(*rhs).0
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_workers() {
run_task(3, 1000);
}
}
We will also use message based communicaton for the work assignments for the individual workers in the pool. Since we want to send a different message to each worker, message addressing is a better fit here, than component broadcasting. The WorkPart
message is really basically the same as the Work
message, except that we add the range, that this particular worker is supposed to aggregate, to it.
#![allow(clippy::unused_unit)]
use kompact::prelude::*;
use std::{env, fmt, ops::Range, sync::Arc};
struct Work {
data: Arc<[u64]>,
merger: fn(u64, &u64) -> u64,
neutral: u64,
}
impl Work {
fn with(data: Vec<u64>, merger: fn(u64, &u64) -> u64, neutral: u64) -> Self {
let moved_data: Arc<[u64]> = data.into_boxed_slice().into();
Work {
data: moved_data,
merger,
neutral,
}
}
}
impl fmt::Debug for Work {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Work{{
data=<data of length={}>,
merger=<function>,
neutral={}
}}",
self.data.len(),
self.neutral
)
}
}
struct WorkPart {
data: Arc<[u64]>,
range: Range<usize>,
merger: fn(u64, &u64) -> u64,
neutral: u64,
}
impl WorkPart {
fn from(work: &Work, range: Range<usize>) -> Self {
WorkPart {
data: work.data.clone(),
range,
merger: work.merger,
neutral: work.neutral,
}
}
}
impl fmt::Debug for WorkPart {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"WorkPart{{
data=<data of length={}>,
range={:?},
merger=<function>,
neutral={}
}}",
self.data.len(),
self.range,
self.neutral
)
}
}
#[derive(Clone, Debug)]
struct WorkResult(u64);
struct WorkerPort;
impl Port for WorkerPort {
type Indication = WorkResult;
type Request = Never;
}
#[derive(ComponentDefinition)]
struct Manager {
ctx: ComponentContext<Self>,
worker_port: RequiredPort<WorkerPort>,
num_workers: usize,
workers: Vec<Arc<Component<Worker>>>,
worker_refs: Vec<ActorRefStrong<WorkPart>>,
outstanding_request: Option<Ask<Work, WorkResult>>,
result_accumulator: Vec<u64>,
}
impl Manager {
fn new(num_workers: usize) -> Self {
Manager {
ctx: ComponentContext::uninitialised(),
worker_port: RequiredPort::uninitialised(),
num_workers,
workers: Vec::with_capacity(num_workers),
worker_refs: Vec::with_capacity(num_workers),
outstanding_request: None,
result_accumulator: Vec::with_capacity(num_workers + 1),
}
}
}
impl ComponentLifecycle for Manager {
fn on_start(&mut self) -> Handled {
// set up our workers
for _i in 0..self.num_workers {
let worker = self.ctx.system().create(Worker::new);
worker.connect_to_required(self.worker_port.share());
let worker_ref = worker.actor_ref().hold().expect("live");
self.ctx.system().start(&worker);
self.workers.push(worker);
self.worker_refs.push(worker_ref);
}
Handled::Ok
}
fn on_stop(&mut self) -> Handled {
// clean up after ourselves
self.worker_refs.clear();
let system = self.ctx.system();
self.workers.drain(..).for_each(|worker| {
system.stop(&worker);
});
Handled::Ok
}
fn on_kill(&mut self) -> Handled {
self.on_stop()
}
}
impl Require<WorkerPort> for Manager {
fn handle(&mut self, event: WorkResult) -> Handled {
if self.outstanding_request.is_some() {
self.result_accumulator.push(event.0);
if self.result_accumulator.len() == (self.num_workers + 1) {
let ask = self.outstanding_request.take().expect("ask");
let work: &Work = ask.request();
let res = self
.result_accumulator
.iter()
.fold(work.neutral, work.merger);
self.result_accumulator.clear();
let reply = WorkResult(res);
ask.reply(reply).expect("reply");
}
} else {
error!(
self.log(),
"Got a response without an outstanding promise: {:?}", event
);
}
Handled::Ok
}
}
impl Actor for Manager {
type Message = Ask<Work, WorkResult>;
fn receive_local(&mut self, msg: Self::Message) -> Handled {
assert!(
self.outstanding_request.is_none(),
"One request at a time, please!"
);
let work: &Work = msg.request();
if self.num_workers == 0 {
// manager gotta work itself -> very unhappy manager
let res = work.data.iter().fold(work.neutral, work.merger);
msg.reply(WorkResult(res)).expect("reply");
} else {
let len = work.data.len();
let stride = len / self.num_workers;
let mut start = 0usize;
let mut index = 0;
while start < len && index < self.num_workers {
let end = len.min(start + stride);
let range = start..end;
info!(self.log(), "Assigning {:?} to worker #{}", range, index);
let msg = WorkPart::from(work, range);
let worker = &self.worker_refs[index];
worker.tell(msg);
start += stride;
index += 1;
}
if start < len {
// manager just does the rest itself
let res = work.data[start..len].iter().fold(work.neutral, work.merger);
self.result_accumulator.push(res);
} else {
// just put a neutral element in there, so our count is right in the end
self.result_accumulator.push(work.neutral);
}
self.outstanding_request = Some(msg);
}
Handled::Ok
}
fn receive_network(&mut self, _msg: NetMessage) -> Handled {
unimplemented!("Still ignoring networking stuff.");
}
}
#[derive(ComponentDefinition)]
struct Worker {
ctx: ComponentContext<Self>,
worker_port: ProvidedPort<WorkerPort>,
}
impl Worker {
fn new() -> Self {
Worker {
ctx: ComponentContext::uninitialised(),
worker_port: ProvidedPort::uninitialised(),
}
}
}
ignore_lifecycle!(Worker);
ignore_requests!(WorkerPort, Worker);
impl Actor for Worker {
type Message = WorkPart;
fn receive_local(&mut self, msg: Self::Message) -> Handled {
let my_slice = &msg.data[msg.range];
let res = my_slice.iter().fold(msg.neutral, msg.merger);
self.worker_port.trigger(WorkResult(res));
Handled::Ok
}
fn receive_network(&mut self, _msg: NetMessage) -> Handled {
unimplemented!("Still ignoring networking stuff.");
}
}
pub fn main() {
let args: Vec<String> = env::args().collect();
assert_eq!(
3,
args.len(),
"Invalid arguments! Must give number of workers and size of the data array."
);
let num_workers: usize = args[1].parse().expect("number");
let data_size: usize = args[2].parse().expect("number");
run_task(num_workers, data_size);
}
fn run_task(num_workers: usize, data_size: usize) {
let system = KompactConfig::default().build().expect("system");
let manager = system.create(move || Manager::new(num_workers));
system.start(&manager);
let manager_ref = manager.actor_ref().hold().expect("live");
let data: Vec<u64> = (1..=data_size).map(|v| v as u64).collect();
let work = Work::with(data, overflowing_sum, 0u64);
println!("Sending request...");
let res = manager_ref.ask(work).wait();
println!("*******\nGot result: {}\n*******", res.0);
assert_eq!(triangular_number(data_size as u64), res.0);
system.shutdown().expect("shutdown");
}
fn triangular_number(n: u64) -> u64 {
(n * (n + 1u64)) / 2u64
}
fn overflowing_sum(lhs: u64, rhs: &u64) -> u64 {
lhs.overflowing_add(*rhs).0
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_workers() {
run_task(3, 1000);
}
}
Note: Both Actor messages and events must implement the
std::fmt::Debug
trait in Kompact. Since bothWork
andWorkPart
contain function pointers, which do not have a sensibleDebug
representation, we implement it manually instead of deriving and simply put a"<function>"
placeholder in its stead. Since our data arrays can be really big, we also only print their length.
Events
We will use Kompics-style communication for the results going from the workers to the manager. However, these are of the same type as the final result; a u64
wrapped in a WorkResult
. So all we have to do is add the std::clone::Clone
trait, which is required for events.
#![allow(clippy::unused_unit)]
use kompact::prelude::*;
use std::{env, fmt, ops::Range, sync::Arc};
struct Work {
data: Arc<[u64]>,
merger: fn(u64, &u64) -> u64,
neutral: u64,
}
impl Work {
fn with(data: Vec<u64>, merger: fn(u64, &u64) -> u64, neutral: u64) -> Self {
let moved_data: Arc<[u64]> = data.into_boxed_slice().into();
Work {
data: moved_data,
merger,
neutral,
}
}
}
impl fmt::Debug for Work {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Work{{
data=<data of length={}>,
merger=<function>,
neutral={}
}}",
self.data.len(),
self.neutral
)
}
}
struct WorkPart {
data: Arc<[u64]>,
range: Range<usize>,
merger: fn(u64, &u64) -> u64,
neutral: u64,
}
impl WorkPart {
fn from(work: &Work, range: Range<usize>) -> Self {
WorkPart {
data: work.data.clone(),
range,
merger: work.merger,
neutral: work.neutral,
}
}
}
impl fmt::Debug for WorkPart {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"WorkPart{{
data=<data of length={}>,
range={:?},
merger=<function>,
neutral={}
}}",
self.data.len(),
self.range,
self.neutral
)
}
}
#[derive(Clone, Debug)]
struct WorkResult(u64);
struct WorkerPort;
impl Port for WorkerPort {
type Indication = WorkResult;
type Request = Never;
}
#[derive(ComponentDefinition)]
struct Manager {
ctx: ComponentContext<Self>,
worker_port: RequiredPort<WorkerPort>,
num_workers: usize,
workers: Vec<Arc<Component<Worker>>>,
worker_refs: Vec<ActorRefStrong<WorkPart>>,
outstanding_request: Option<Ask<Work, WorkResult>>,
result_accumulator: Vec<u64>,
}
impl Manager {
fn new(num_workers: usize) -> Self {
Manager {
ctx: ComponentContext::uninitialised(),
worker_port: RequiredPort::uninitialised(),
num_workers,
workers: Vec::with_capacity(num_workers),
worker_refs: Vec::with_capacity(num_workers),
outstanding_request: None,
result_accumulator: Vec::with_capacity(num_workers + 1),
}
}
}
impl ComponentLifecycle for Manager {
fn on_start(&mut self) -> Handled {
// set up our workers
for _i in 0..self.num_workers {
let worker = self.ctx.system().create(Worker::new);
worker.connect_to_required(self.worker_port.share());
let worker_ref = worker.actor_ref().hold().expect("live");
self.ctx.system().start(&worker);
self.workers.push(worker);
self.worker_refs.push(worker_ref);
}
Handled::Ok
}
fn on_stop(&mut self) -> Handled {
// clean up after ourselves
self.worker_refs.clear();
let system = self.ctx.system();
self.workers.drain(..).for_each(|worker| {
system.stop(&worker);
});
Handled::Ok
}
fn on_kill(&mut self) -> Handled {
self.on_stop()
}
}
impl Require<WorkerPort> for Manager {
fn handle(&mut self, event: WorkResult) -> Handled {
if self.outstanding_request.is_some() {
self.result_accumulator.push(event.0);
if self.result_accumulator.len() == (self.num_workers + 1) {
let ask = self.outstanding_request.take().expect("ask");
let work: &Work = ask.request();
let res = self
.result_accumulator
.iter()
.fold(work.neutral, work.merger);
self.result_accumulator.clear();
let reply = WorkResult(res);
ask.reply(reply).expect("reply");
}
} else {
error!(
self.log(),
"Got a response without an outstanding promise: {:?}", event
);
}
Handled::Ok
}
}
impl Actor for Manager {
type Message = Ask<Work, WorkResult>;
fn receive_local(&mut self, msg: Self::Message) -> Handled {
assert!(
self.outstanding_request.is_none(),
"One request at a time, please!"
);
let work: &Work = msg.request();
if self.num_workers == 0 {
// manager gotta work itself -> very unhappy manager
let res = work.data.iter().fold(work.neutral, work.merger);
msg.reply(WorkResult(res)).expect("reply");
} else {
let len = work.data.len();
let stride = len / self.num_workers;
let mut start = 0usize;
let mut index = 0;
while start < len && index < self.num_workers {
let end = len.min(start + stride);
let range = start..end;
info!(self.log(), "Assigning {:?} to worker #{}", range, index);
let msg = WorkPart::from(work, range);
let worker = &self.worker_refs[index];
worker.tell(msg);
start += stride;
index += 1;
}
if start < len {
// manager just does the rest itself
let res = work.data[start..len].iter().fold(work.neutral, work.merger);
self.result_accumulator.push(res);
} else {
// just put a neutral element in there, so our count is right in the end
self.result_accumulator.push(work.neutral);
}
self.outstanding_request = Some(msg);
}
Handled::Ok
}
fn receive_network(&mut self, _msg: NetMessage) -> Handled {
unimplemented!("Still ignoring networking stuff.");
}
}
#[derive(ComponentDefinition)]
struct Worker {
ctx: ComponentContext<Self>,
worker_port: ProvidedPort<WorkerPort>,
}
impl Worker {
fn new() -> Self {
Worker {
ctx: ComponentContext::uninitialised(),
worker_port: ProvidedPort::uninitialised(),
}
}
}
ignore_lifecycle!(Worker);
ignore_requests!(WorkerPort, Worker);
impl Actor for Worker {
type Message = WorkPart;
fn receive_local(&mut self, msg: Self::Message) -> Handled {
let my_slice = &msg.data[msg.range];
let res = my_slice.iter().fold(msg.neutral, msg.merger);
self.worker_port.trigger(WorkResult(res));
Handled::Ok
}
fn receive_network(&mut self, _msg: NetMessage) -> Handled {
unimplemented!("Still ignoring networking stuff.");
}
}
pub fn main() {
let args: Vec<String> = env::args().collect();
assert_eq!(
3,
args.len(),
"Invalid arguments! Must give number of workers and size of the data array."
);
let num_workers: usize = args[1].parse().expect("number");
let data_size: usize = args[2].parse().expect("number");
run_task(num_workers, data_size);
}
fn run_task(num_workers: usize, data_size: usize) {
let system = KompactConfig::default().build().expect("system");
let manager = system.create(move || Manager::new(num_workers));
system.start(&manager);
let manager_ref = manager.actor_ref().hold().expect("live");
let data: Vec<u64> = (1..=data_size).map(|v| v as u64).collect();
let work = Work::with(data, overflowing_sum, 0u64);
println!("Sending request...");
let res = manager_ref.ask(work).wait();
println!("*******\nGot result: {}\n*******", res.0);
assert_eq!(triangular_number(data_size as u64), res.0);
system.shutdown().expect("shutdown");
}
fn triangular_number(n: u64) -> u64 {
(n * (n + 1u64)) / 2u64
}
fn overflowing_sum(lhs: u64, rhs: &u64) -> u64 {
lhs.overflowing_add(*rhs).0
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_workers() {
run_task(3, 1000);
}
}
We also need a port on which the WorkResult
can travel; let’s call it a WorkerPort
. Based on the naming, say a Worker
provides the WorkerPort
service, so messages from Worker
to Manager
are indications. Since we are using messages for requesting work to be done, we don’t need any request event on the WorkerPort
and will just use the empty Never
type again.
#![allow(clippy::unused_unit)]
use kompact::prelude::*;
use std::{env, fmt, ops::Range, sync::Arc};
struct Work {
data: Arc<[u64]>,
merger: fn(u64, &u64) -> u64,
neutral: u64,
}
impl Work {
fn with(data: Vec<u64>, merger: fn(u64, &u64) -> u64, neutral: u64) -> Self {
let moved_data: Arc<[u64]> = data.into_boxed_slice().into();
Work {
data: moved_data,
merger,
neutral,
}
}
}
impl fmt::Debug for Work {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Work{{
data=<data of length={}>,
merger=<function>,
neutral={}
}}",
self.data.len(),
self.neutral
)
}
}
struct WorkPart {
data: Arc<[u64]>,
range: Range<usize>,
merger: fn(u64, &u64) -> u64,
neutral: u64,
}
impl WorkPart {
fn from(work: &Work, range: Range<usize>) -> Self {
WorkPart {
data: work.data.clone(),
range,
merger: work.merger,
neutral: work.neutral,
}
}
}
impl fmt::Debug for WorkPart {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"WorkPart{{
data=<data of length={}>,
range={:?},
merger=<function>,
neutral={}
}}",
self.data.len(),
self.range,
self.neutral
)
}
}
#[derive(Clone, Debug)]
struct WorkResult(u64);
struct WorkerPort;
impl Port for WorkerPort {
type Indication = WorkResult;
type Request = Never;
}
#[derive(ComponentDefinition)]
struct Manager {
ctx: ComponentContext<Self>,
worker_port: RequiredPort<WorkerPort>,
num_workers: usize,
workers: Vec<Arc<Component<Worker>>>,
worker_refs: Vec<ActorRefStrong<WorkPart>>,
outstanding_request: Option<Ask<Work, WorkResult>>,
result_accumulator: Vec<u64>,
}
impl Manager {
fn new(num_workers: usize) -> Self {
Manager {
ctx: ComponentContext::uninitialised(),
worker_port: RequiredPort::uninitialised(),
num_workers,
workers: Vec::with_capacity(num_workers),
worker_refs: Vec::with_capacity(num_workers),
outstanding_request: None,
result_accumulator: Vec::with_capacity(num_workers + 1),
}
}
}
impl ComponentLifecycle for Manager {
fn on_start(&mut self) -> Handled {
// set up our workers
for _i in 0..self.num_workers {
let worker = self.ctx.system().create(Worker::new);
worker.connect_to_required(self.worker_port.share());
let worker_ref = worker.actor_ref().hold().expect("live");
self.ctx.system().start(&worker);
self.workers.push(worker);
self.worker_refs.push(worker_ref);
}
Handled::Ok
}
fn on_stop(&mut self) -> Handled {
// clean up after ourselves
self.worker_refs.clear();
let system = self.ctx.system();
self.workers.drain(..).for_each(|worker| {
system.stop(&worker);
});
Handled::Ok
}
fn on_kill(&mut self) -> Handled {
self.on_stop()
}
}
impl Require<WorkerPort> for Manager {
fn handle(&mut self, event: WorkResult) -> Handled {
if self.outstanding_request.is_some() {
self.result_accumulator.push(event.0);
if self.result_accumulator.len() == (self.num_workers + 1) {
let ask = self.outstanding_request.take().expect("ask");
let work: &Work = ask.request();
let res = self
.result_accumulator
.iter()
.fold(work.neutral, work.merger);
self.result_accumulator.clear();
let reply = WorkResult(res);
ask.reply(reply).expect("reply");
}
} else {
error!(
self.log(),
"Got a response without an outstanding promise: {:?}", event
);
}
Handled::Ok
}
}
impl Actor for Manager {
type Message = Ask<Work, WorkResult>;
fn receive_local(&mut self, msg: Self::Message) -> Handled {
assert!(
self.outstanding_request.is_none(),
"One request at a time, please!"
);
let work: &Work = msg.request();
if self.num_workers == 0 {
// manager gotta work itself -> very unhappy manager
let res = work.data.iter().fold(work.neutral, work.merger);
msg.reply(WorkResult(res)).expect("reply");
} else {
let len = work.data.len();
let stride = len / self.num_workers;
let mut start = 0usize;
let mut index = 0;
while start < len && index < self.num_workers {
let end = len.min(start + stride);
let range = start..end;
info!(self.log(), "Assigning {:?} to worker #{}", range, index);
let msg = WorkPart::from(work, range);
let worker = &self.worker_refs[index];
worker.tell(msg);
start += stride;
index += 1;
}
if start < len {
// manager just does the rest itself
let res = work.data[start..len].iter().fold(work.neutral, work.merger);
self.result_accumulator.push(res);
} else {
// just put a neutral element in there, so our count is right in the end
self.result_accumulator.push(work.neutral);
}
self.outstanding_request = Some(msg);
}
Handled::Ok
}
fn receive_network(&mut self, _msg: NetMessage) -> Handled {
unimplemented!("Still ignoring networking stuff.");
}
}
#[derive(ComponentDefinition)]
struct Worker {
ctx: ComponentContext<Self>,
worker_port: ProvidedPort<WorkerPort>,
}
impl Worker {
fn new() -> Self {
Worker {
ctx: ComponentContext::uninitialised(),
worker_port: ProvidedPort::uninitialised(),
}
}
}
ignore_lifecycle!(Worker);
ignore_requests!(WorkerPort, Worker);
impl Actor for Worker {
type Message = WorkPart;
fn receive_local(&mut self, msg: Self::Message) -> Handled {
let my_slice = &msg.data[msg.range];
let res = my_slice.iter().fold(msg.neutral, msg.merger);
self.worker_port.trigger(WorkResult(res));
Handled::Ok
}
fn receive_network(&mut self, _msg: NetMessage) -> Handled {
unimplemented!("Still ignoring networking stuff.");
}
}
pub fn main() {
let args: Vec<String> = env::args().collect();
assert_eq!(
3,
args.len(),
"Invalid arguments! Must give number of workers and size of the data array."
);
let num_workers: usize = args[1].parse().expect("number");
let data_size: usize = args[2].parse().expect("number");
run_task(num_workers, data_size);
}
fn run_task(num_workers: usize, data_size: usize) {
let system = KompactConfig::default().build().expect("system");
let manager = system.create(move || Manager::new(num_workers));
system.start(&manager);
let manager_ref = manager.actor_ref().hold().expect("live");
let data: Vec<u64> = (1..=data_size).map(|v| v as u64).collect();
let work = Work::with(data, overflowing_sum, 0u64);
println!("Sending request...");
let res = manager_ref.ask(work).wait();
println!("*******\nGot result: {}\n*******", res.0);
assert_eq!(triangular_number(data_size as u64), res.0);
system.shutdown().expect("shutdown");
}
fn triangular_number(n: u64) -> u64 {
(n * (n + 1u64)) / 2u64
}
fn overflowing_sum(lhs: u64, rhs: &u64) -> u64 {
lhs.overflowing_add(*rhs).0
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_workers() {
run_task(3, 1000);
}
}