Senders

The one communication-related thing we haven’t touched, yet, is how to do request-response style communication among Actors. The “ask”-pattern gave us request-reponse between an Actor and some arbitrary (non-pool) thread, ports basically give us some form request-response between request and indication events (with some broadcasting semantic caveats, of course). But for Actor to Actor communication, we have not seen anything of this sort, yet. In fact, you may have noticed that receive_local(...) does not actually give us any sender information, such as an ActorRef. Neither is this available via the component context as would be the case in Akka.

In Kompact, for local messages at least, sender information must be passed explicitly. This is for two reasons:

  1. It avoids creating an ActorRef for every message when it’s not needed, since actor references are not trivially cheap to create.
  2. It allows the sender reference to be typed with the appropriate message type.

This design gives us basically two variants to do request-reponse. If we know we are always going to respond to the same component instance, the most efficient thing to do is to get a reference to it once and then just keep it around as part of our internal state. This avoids constantly creating actor references, and is pretty efficient. If, however, we must respond to multiple different actors, which is often the case, we must make the sender reference part of the request message. We can do that either by adding a field to our custom message type, or simply wrapping our custom message type into the Kompact provided WithSender struct. WithSender is really the same idea as Ask, replacing the KPromise<Response> with an ActorRef<Response> (yes, there is also WithSenderStrong using an ActorRefStrong instead).

Workers with Senders

To illustrate this mechanism we are going to rewrite the Workers example from the previous sections to use WithSender instead of the WorkerPort communication. We will use WithSender here, instead of a stored manager actor reference, to illustrate the point, but it should be clear that the latter will be more efficient as we always reply to the manager.

First we remove all mentions of WorkerPort, of course. Then we change the worker’s Message type to WithSender<WorkPart, ManagerMessage>. Why ManagerMessage and not WorkResult? Well, since all communication with the manager now happens via messages, we need to differentiate between messages from the main-thread, which are of type Ask<Work, WorkResult> and messages from the worker, which are of type WorkResult. Since we can only have a single Message type, ManagerMessage is simply an enum of both options.

#![allow(clippy::unused_unit)]
use kompact::prelude::*;
use std::{env, fmt, ops::Range, sync::Arc};

struct Work {
    data: Arc<[u64]>,
    merger: fn(u64, &u64) -> u64,
    neutral: u64,
}
impl Work {
    fn with(data: Vec<u64>, merger: fn(u64, &u64) -> u64, neutral: u64) -> Self {
        let moved_data: Arc<[u64]> = data.into_boxed_slice().into();
        Work {
            data: moved_data,
            merger,
            neutral,
        }
    }
}
impl fmt::Debug for Work {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
            f,
            "Work{{
            data=<data of length={}>,
            merger=<function>,
            neutral={}
        }}",
            self.data.len(),
            self.neutral
        )
    }
}

struct WorkPart {
    data: Arc<[u64]>,
    range: Range<usize>,
    merger: fn(u64, &u64) -> u64,
    neutral: u64,
}
impl WorkPart {
    fn from(work: &Work, range: Range<usize>) -> Self {
        WorkPart {
            data: work.data.clone(),
            range,
            merger: work.merger,
            neutral: work.neutral,
        }
    }
}
impl fmt::Debug for WorkPart {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
            f,
            "WorkPart{{
            data=<data of length={}>,
            range={:?},
            merger=<function>,
            neutral={}
        }}",
            self.data.len(),
            self.range,
            self.neutral
        )
    }
}

#[derive(Debug)]
struct WorkResult(u64);

#[derive(Debug)]
enum ManagerMessage {
    Work(Ask<Work, WorkResult>),
    Result(WorkResult),
}
#[derive(ComponentDefinition)]
struct Manager {
    ctx: ComponentContext<Self>,
    num_workers: usize,
    workers: Vec<Arc<Component<Worker>>>,
    worker_refs: Vec<ActorRefStrong<WithSender<WorkPart, ManagerMessage>>>,
    outstanding_request: Option<Ask<Work, WorkResult>>,
    result_accumulator: Vec<u64>,
}
impl Manager {
    fn new(num_workers: usize) -> Self {
        Manager {
            ctx: ComponentContext::uninitialised(),
            num_workers,
            workers: Vec::with_capacity(num_workers),
            worker_refs: Vec::with_capacity(num_workers),
            outstanding_request: None,
            result_accumulator: Vec::with_capacity(num_workers + 1),
        }
    }
}

impl ComponentLifecycle for Manager {
    fn on_start(&mut self) -> Handled {
        // set up our workers
        for _i in 0..self.num_workers {
            let worker = self.ctx.system().create(Worker::new);
            let worker_ref = worker.actor_ref().hold().expect("live");
            self.ctx.system().start(&worker);
            self.workers.push(worker);
            self.worker_refs.push(worker_ref);
        }
        Handled::Ok
    }

    fn on_stop(&mut self) -> Handled {
        // clean up after ourselves
        self.worker_refs.clear();
        let system = self.ctx.system();
        self.workers.drain(..).for_each(|worker| {
            system.stop(&worker);
        });
        Handled::Ok
    }

    fn on_kill(&mut self) -> Handled {
        self.on_stop()
    }
}

impl Actor for Manager {
    type Message = ManagerMessage;

    fn receive_local(&mut self, msg: Self::Message) -> Handled {
        match msg {
            ManagerMessage::Work(msg) => {
                assert!(
                    self.outstanding_request.is_none(),
                    "One request at a time, please!"
                );
                let work: &Work = msg.request();
                if self.num_workers == 0 {
                    // manager gotta work itself -> very unhappy manager
                    let res = work.data.iter().fold(work.neutral, work.merger);
                    msg.reply(WorkResult(res)).expect("reply");
                } else {
                    let len = work.data.len();
                    let stride = len / self.num_workers;
                    let mut start = 0usize;
                    let mut index = 0;
                    while start < len && index < self.num_workers {
                        let end = len.min(start + stride);
                        let range = start..end;
                        info!(self.log(), "Assigning {:?} to worker #{}", range, index);
                        let msg = WorkPart::from(work, range);
                        let worker = &self.worker_refs[index];
                        worker.tell(WithSender::from(msg, self));
                        start += stride;
                        index += 1;
                    }
                    if start < len {
                        // manager just does the rest itself
                        let res = work.data[start..len].iter().fold(work.neutral, work.merger);
                        self.result_accumulator.push(res);
                    } else {
                        // just put a neutral element in there, so our count is right in the end
                        self.result_accumulator.push(work.neutral);
                    }
                    self.outstanding_request = Some(msg);
                }
            }
            ManagerMessage::Result(msg) => {
                if self.outstanding_request.is_some() {
                    self.result_accumulator.push(msg.0);
                    if self.result_accumulator.len() == (self.num_workers + 1) {
                        let ask = self.outstanding_request.take().expect("ask");
                        let work: &Work = ask.request();
                        let res = self
                            .result_accumulator
                            .iter()
                            .fold(work.neutral, work.merger);
                        self.result_accumulator.clear();
                        let reply = WorkResult(res);
                        ask.reply(reply).expect("reply");
                    }
                } else {
                    error!(
                        self.log(),
                        "Got a response without an outstanding promise: {:?}", msg
                    );
                }
            }
        }
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!("Still ignoring networking stuff.");
    }
}

#[derive(ComponentDefinition)]
struct Worker {
    ctx: ComponentContext<Self>,
}
impl Worker {
    fn new() -> Self {
        Worker {
            ctx: ComponentContext::uninitialised(),
        }
    }
}
ignore_lifecycle!(Worker);

impl Actor for Worker {
    type Message = WithSender<WorkPart, ManagerMessage>;

    fn receive_local(&mut self, msg: Self::Message) -> Handled {
        let my_slice = &msg.data[msg.range.clone()];
        let res = my_slice.iter().fold(msg.neutral, msg.merger);
        msg.reply(ManagerMessage::Result(WorkResult(res)));
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!("Still ignoring networking stuff.");
    }
}

pub fn main() {
    let args: Vec<String> = env::args().collect();
    assert_eq!(
        3,
        args.len(),
        "Invalid arguments! Must give number of workers and size of the data array."
    );
    let num_workers: usize = args[1].parse().expect("number");
    let data_size: usize = args[2].parse().expect("number");
    run_task(num_workers, data_size);
}

fn run_task(num_workers: usize, data_size: usize) {
    let system = KompactConfig::default().build().expect("system");
    let manager = system.create(move || Manager::new(num_workers));
    system.start(&manager);
    let manager_ref = manager.actor_ref().hold().expect("live");

    let data: Vec<u64> = (1..=data_size).map(|v| v as u64).collect();
    let work = Work::with(data, overflowing_sum, 0u64);
    println!("Sending request...");
    let res = manager_ref
        .ask_with(|promise| ManagerMessage::Work(Ask::new(promise, work)))
        .wait();
    println!("*******\nGot result: {}\n*******", res.0);
    assert_eq!(triangular_number(data_size as u64), res.0);
    system.shutdown().expect("shutdown");
}

fn triangular_number(n: u64) -> u64 {
    (n * (n + 1u64)) / 2u64
}

fn overflowing_sum(lhs: u64, rhs: &u64) -> u64 {
    lhs.overflowing_add(*rhs).0
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_workers() {
        run_task(3, 1000);
    }
}

Thus, when the worker wants to reply(...) with a WorkResult it actually needs to wrap it in a ManagerMessage instance or the compiler is going to reject it.

#![allow(clippy::unused_unit)]
use kompact::prelude::*;
use std::{env, fmt, ops::Range, sync::Arc};

struct Work {
    data: Arc<[u64]>,
    merger: fn(u64, &u64) -> u64,
    neutral: u64,
}
impl Work {
    fn with(data: Vec<u64>, merger: fn(u64, &u64) -> u64, neutral: u64) -> Self {
        let moved_data: Arc<[u64]> = data.into_boxed_slice().into();
        Work {
            data: moved_data,
            merger,
            neutral,
        }
    }
}
impl fmt::Debug for Work {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
            f,
            "Work{{
            data=<data of length={}>,
            merger=<function>,
            neutral={}
        }}",
            self.data.len(),
            self.neutral
        )
    }
}

struct WorkPart {
    data: Arc<[u64]>,
    range: Range<usize>,
    merger: fn(u64, &u64) -> u64,
    neutral: u64,
}
impl WorkPart {
    fn from(work: &Work, range: Range<usize>) -> Self {
        WorkPart {
            data: work.data.clone(),
            range,
            merger: work.merger,
            neutral: work.neutral,
        }
    }
}
impl fmt::Debug for WorkPart {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
            f,
            "WorkPart{{
            data=<data of length={}>,
            range={:?},
            merger=<function>,
            neutral={}
        }}",
            self.data.len(),
            self.range,
            self.neutral
        )
    }
}

#[derive(Debug)]
struct WorkResult(u64);

#[derive(Debug)]
enum ManagerMessage {
    Work(Ask<Work, WorkResult>),
    Result(WorkResult),
}
#[derive(ComponentDefinition)]
struct Manager {
    ctx: ComponentContext<Self>,
    num_workers: usize,
    workers: Vec<Arc<Component<Worker>>>,
    worker_refs: Vec<ActorRefStrong<WithSender<WorkPart, ManagerMessage>>>,
    outstanding_request: Option<Ask<Work, WorkResult>>,
    result_accumulator: Vec<u64>,
}
impl Manager {
    fn new(num_workers: usize) -> Self {
        Manager {
            ctx: ComponentContext::uninitialised(),
            num_workers,
            workers: Vec::with_capacity(num_workers),
            worker_refs: Vec::with_capacity(num_workers),
            outstanding_request: None,
            result_accumulator: Vec::with_capacity(num_workers + 1),
        }
    }
}

impl ComponentLifecycle for Manager {
    fn on_start(&mut self) -> Handled {
        // set up our workers
        for _i in 0..self.num_workers {
            let worker = self.ctx.system().create(Worker::new);
            let worker_ref = worker.actor_ref().hold().expect("live");
            self.ctx.system().start(&worker);
            self.workers.push(worker);
            self.worker_refs.push(worker_ref);
        }
        Handled::Ok
    }

    fn on_stop(&mut self) -> Handled {
        // clean up after ourselves
        self.worker_refs.clear();
        let system = self.ctx.system();
        self.workers.drain(..).for_each(|worker| {
            system.stop(&worker);
        });
        Handled::Ok
    }

    fn on_kill(&mut self) -> Handled {
        self.on_stop()
    }
}

impl Actor for Manager {
    type Message = ManagerMessage;

    fn receive_local(&mut self, msg: Self::Message) -> Handled {
        match msg {
            ManagerMessage::Work(msg) => {
                assert!(
                    self.outstanding_request.is_none(),
                    "One request at a time, please!"
                );
                let work: &Work = msg.request();
                if self.num_workers == 0 {
                    // manager gotta work itself -> very unhappy manager
                    let res = work.data.iter().fold(work.neutral, work.merger);
                    msg.reply(WorkResult(res)).expect("reply");
                } else {
                    let len = work.data.len();
                    let stride = len / self.num_workers;
                    let mut start = 0usize;
                    let mut index = 0;
                    while start < len && index < self.num_workers {
                        let end = len.min(start + stride);
                        let range = start..end;
                        info!(self.log(), "Assigning {:?} to worker #{}", range, index);
                        let msg = WorkPart::from(work, range);
                        let worker = &self.worker_refs[index];
                        worker.tell(WithSender::from(msg, self));
                        start += stride;
                        index += 1;
                    }
                    if start < len {
                        // manager just does the rest itself
                        let res = work.data[start..len].iter().fold(work.neutral, work.merger);
                        self.result_accumulator.push(res);
                    } else {
                        // just put a neutral element in there, so our count is right in the end
                        self.result_accumulator.push(work.neutral);
                    }
                    self.outstanding_request = Some(msg);
                }
            }
            ManagerMessage::Result(msg) => {
                if self.outstanding_request.is_some() {
                    self.result_accumulator.push(msg.0);
                    if self.result_accumulator.len() == (self.num_workers + 1) {
                        let ask = self.outstanding_request.take().expect("ask");
                        let work: &Work = ask.request();
                        let res = self
                            .result_accumulator
                            .iter()
                            .fold(work.neutral, work.merger);
                        self.result_accumulator.clear();
                        let reply = WorkResult(res);
                        ask.reply(reply).expect("reply");
                    }
                } else {
                    error!(
                        self.log(),
                        "Got a response without an outstanding promise: {:?}", msg
                    );
                }
            }
        }
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!("Still ignoring networking stuff.");
    }
}

#[derive(ComponentDefinition)]
struct Worker {
    ctx: ComponentContext<Self>,
}
impl Worker {
    fn new() -> Self {
        Worker {
            ctx: ComponentContext::uninitialised(),
        }
    }
}
ignore_lifecycle!(Worker);

impl Actor for Worker {
    type Message = WithSender<WorkPart, ManagerMessage>;

    fn receive_local(&mut self, msg: Self::Message) -> Handled {
        let my_slice = &msg.data[msg.range.clone()];
        let res = my_slice.iter().fold(msg.neutral, msg.merger);
        msg.reply(ManagerMessage::Result(WorkResult(res)));
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!("Still ignoring networking stuff.");
    }
}

pub fn main() {
    let args: Vec<String> = env::args().collect();
    assert_eq!(
        3,
        args.len(),
        "Invalid arguments! Must give number of workers and size of the data array."
    );
    let num_workers: usize = args[1].parse().expect("number");
    let data_size: usize = args[2].parse().expect("number");
    run_task(num_workers, data_size);
}

fn run_task(num_workers: usize, data_size: usize) {
    let system = KompactConfig::default().build().expect("system");
    let manager = system.create(move || Manager::new(num_workers));
    system.start(&manager);
    let manager_ref = manager.actor_ref().hold().expect("live");

    let data: Vec<u64> = (1..=data_size).map(|v| v as u64).collect();
    let work = Work::with(data, overflowing_sum, 0u64);
    println!("Sending request...");
    let res = manager_ref
        .ask_with(|promise| ManagerMessage::Work(Ask::new(promise, work)))
        .wait();
    println!("*******\nGot result: {}\n*******", res.0);
    assert_eq!(triangular_number(data_size as u64), res.0);
    system.shutdown().expect("shutdown");
}

fn triangular_number(n: u64) -> u64 {
    (n * (n + 1u64)) / 2u64
}

fn overflowing_sum(lhs: u64, rhs: &u64) -> u64 {
    lhs.overflowing_add(*rhs).0
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_workers() {
        run_task(3, 1000);
    }
}

In the manager we must first update our state to reflect the new message (and thus reference) types.

#![allow(clippy::unused_unit)]
use kompact::prelude::*;
use std::{env, fmt, ops::Range, sync::Arc};

struct Work {
    data: Arc<[u64]>,
    merger: fn(u64, &u64) -> u64,
    neutral: u64,
}
impl Work {
    fn with(data: Vec<u64>, merger: fn(u64, &u64) -> u64, neutral: u64) -> Self {
        let moved_data: Arc<[u64]> = data.into_boxed_slice().into();
        Work {
            data: moved_data,
            merger,
            neutral,
        }
    }
}
impl fmt::Debug for Work {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
            f,
            "Work{{
            data=<data of length={}>,
            merger=<function>,
            neutral={}
        }}",
            self.data.len(),
            self.neutral
        )
    }
}

struct WorkPart {
    data: Arc<[u64]>,
    range: Range<usize>,
    merger: fn(u64, &u64) -> u64,
    neutral: u64,
}
impl WorkPart {
    fn from(work: &Work, range: Range<usize>) -> Self {
        WorkPart {
            data: work.data.clone(),
            range,
            merger: work.merger,
            neutral: work.neutral,
        }
    }
}
impl fmt::Debug for WorkPart {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
            f,
            "WorkPart{{
            data=<data of length={}>,
            range={:?},
            merger=<function>,
            neutral={}
        }}",
            self.data.len(),
            self.range,
            self.neutral
        )
    }
}

#[derive(Debug)]
struct WorkResult(u64);

#[derive(Debug)]
enum ManagerMessage {
    Work(Ask<Work, WorkResult>),
    Result(WorkResult),
}
#[derive(ComponentDefinition)]
struct Manager {
    ctx: ComponentContext<Self>,
    num_workers: usize,
    workers: Vec<Arc<Component<Worker>>>,
    worker_refs: Vec<ActorRefStrong<WithSender<WorkPart, ManagerMessage>>>,
    outstanding_request: Option<Ask<Work, WorkResult>>,
    result_accumulator: Vec<u64>,
}
impl Manager {
    fn new(num_workers: usize) -> Self {
        Manager {
            ctx: ComponentContext::uninitialised(),
            num_workers,
            workers: Vec::with_capacity(num_workers),
            worker_refs: Vec::with_capacity(num_workers),
            outstanding_request: None,
            result_accumulator: Vec::with_capacity(num_workers + 1),
        }
    }
}

impl ComponentLifecycle for Manager {
    fn on_start(&mut self) -> Handled {
        // set up our workers
        for _i in 0..self.num_workers {
            let worker = self.ctx.system().create(Worker::new);
            let worker_ref = worker.actor_ref().hold().expect("live");
            self.ctx.system().start(&worker);
            self.workers.push(worker);
            self.worker_refs.push(worker_ref);
        }
        Handled::Ok
    }

    fn on_stop(&mut self) -> Handled {
        // clean up after ourselves
        self.worker_refs.clear();
        let system = self.ctx.system();
        self.workers.drain(..).for_each(|worker| {
            system.stop(&worker);
        });
        Handled::Ok
    }

    fn on_kill(&mut self) -> Handled {
        self.on_stop()
    }
}

impl Actor for Manager {
    type Message = ManagerMessage;

    fn receive_local(&mut self, msg: Self::Message) -> Handled {
        match msg {
            ManagerMessage::Work(msg) => {
                assert!(
                    self.outstanding_request.is_none(),
                    "One request at a time, please!"
                );
                let work: &Work = msg.request();
                if self.num_workers == 0 {
                    // manager gotta work itself -> very unhappy manager
                    let res = work.data.iter().fold(work.neutral, work.merger);
                    msg.reply(WorkResult(res)).expect("reply");
                } else {
                    let len = work.data.len();
                    let stride = len / self.num_workers;
                    let mut start = 0usize;
                    let mut index = 0;
                    while start < len && index < self.num_workers {
                        let end = len.min(start + stride);
                        let range = start..end;
                        info!(self.log(), "Assigning {:?} to worker #{}", range, index);
                        let msg = WorkPart::from(work, range);
                        let worker = &self.worker_refs[index];
                        worker.tell(WithSender::from(msg, self));
                        start += stride;
                        index += 1;
                    }
                    if start < len {
                        // manager just does the rest itself
                        let res = work.data[start..len].iter().fold(work.neutral, work.merger);
                        self.result_accumulator.push(res);
                    } else {
                        // just put a neutral element in there, so our count is right in the end
                        self.result_accumulator.push(work.neutral);
                    }
                    self.outstanding_request = Some(msg);
                }
            }
            ManagerMessage::Result(msg) => {
                if self.outstanding_request.is_some() {
                    self.result_accumulator.push(msg.0);
                    if self.result_accumulator.len() == (self.num_workers + 1) {
                        let ask = self.outstanding_request.take().expect("ask");
                        let work: &Work = ask.request();
                        let res = self
                            .result_accumulator
                            .iter()
                            .fold(work.neutral, work.merger);
                        self.result_accumulator.clear();
                        let reply = WorkResult(res);
                        ask.reply(reply).expect("reply");
                    }
                } else {
                    error!(
                        self.log(),
                        "Got a response without an outstanding promise: {:?}", msg
                    );
                }
            }
        }
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!("Still ignoring networking stuff.");
    }
}

#[derive(ComponentDefinition)]
struct Worker {
    ctx: ComponentContext<Self>,
}
impl Worker {
    fn new() -> Self {
        Worker {
            ctx: ComponentContext::uninitialised(),
        }
    }
}
ignore_lifecycle!(Worker);

impl Actor for Worker {
    type Message = WithSender<WorkPart, ManagerMessage>;

    fn receive_local(&mut self, msg: Self::Message) -> Handled {
        let my_slice = &msg.data[msg.range.clone()];
        let res = my_slice.iter().fold(msg.neutral, msg.merger);
        msg.reply(ManagerMessage::Result(WorkResult(res)));
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!("Still ignoring networking stuff.");
    }
}

pub fn main() {
    let args: Vec<String> = env::args().collect();
    assert_eq!(
        3,
        args.len(),
        "Invalid arguments! Must give number of workers and size of the data array."
    );
    let num_workers: usize = args[1].parse().expect("number");
    let data_size: usize = args[2].parse().expect("number");
    run_task(num_workers, data_size);
}

fn run_task(num_workers: usize, data_size: usize) {
    let system = KompactConfig::default().build().expect("system");
    let manager = system.create(move || Manager::new(num_workers));
    system.start(&manager);
    let manager_ref = manager.actor_ref().hold().expect("live");

    let data: Vec<u64> = (1..=data_size).map(|v| v as u64).collect();
    let work = Work::with(data, overflowing_sum, 0u64);
    println!("Sending request...");
    let res = manager_ref
        .ask_with(|promise| ManagerMessage::Work(Ask::new(promise, work)))
        .wait();
    println!("*******\nGot result: {}\n*******", res.0);
    assert_eq!(triangular_number(data_size as u64), res.0);
    system.shutdown().expect("shutdown");
}

fn triangular_number(n: u64) -> u64 {
    (n * (n + 1u64)) / 2u64
}

fn overflowing_sum(lhs: u64, rhs: &u64) -> u64 {
    lhs.overflowing_add(*rhs).0
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_workers() {
        run_task(3, 1000);
    }
}

We also remove the port connection logic from the ComponentLifecycle handler. Then we change the Message type of the manager to ManagerMessage and match on the ManagerMessage variant in the receive_local(...) function. For the ManagerMessage::Work variant, we basically do the same thing as in the old receive_local(...) function, except that we construct a WithSender instance from the WorkPart instead of sending it directly to the worker. We then simply copy the code from the old WorkResult handler into the branch for ManagerMessage::Result.

#![allow(clippy::unused_unit)]
use kompact::prelude::*;
use std::{env, fmt, ops::Range, sync::Arc};

struct Work {
    data: Arc<[u64]>,
    merger: fn(u64, &u64) -> u64,
    neutral: u64,
}
impl Work {
    fn with(data: Vec<u64>, merger: fn(u64, &u64) -> u64, neutral: u64) -> Self {
        let moved_data: Arc<[u64]> = data.into_boxed_slice().into();
        Work {
            data: moved_data,
            merger,
            neutral,
        }
    }
}
impl fmt::Debug for Work {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
            f,
            "Work{{
            data=<data of length={}>,
            merger=<function>,
            neutral={}
        }}",
            self.data.len(),
            self.neutral
        )
    }
}

struct WorkPart {
    data: Arc<[u64]>,
    range: Range<usize>,
    merger: fn(u64, &u64) -> u64,
    neutral: u64,
}
impl WorkPart {
    fn from(work: &Work, range: Range<usize>) -> Self {
        WorkPart {
            data: work.data.clone(),
            range,
            merger: work.merger,
            neutral: work.neutral,
        }
    }
}
impl fmt::Debug for WorkPart {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
            f,
            "WorkPart{{
            data=<data of length={}>,
            range={:?},
            merger=<function>,
            neutral={}
        }}",
            self.data.len(),
            self.range,
            self.neutral
        )
    }
}

#[derive(Debug)]
struct WorkResult(u64);

#[derive(Debug)]
enum ManagerMessage {
    Work(Ask<Work, WorkResult>),
    Result(WorkResult),
}
#[derive(ComponentDefinition)]
struct Manager {
    ctx: ComponentContext<Self>,
    num_workers: usize,
    workers: Vec<Arc<Component<Worker>>>,
    worker_refs: Vec<ActorRefStrong<WithSender<WorkPart, ManagerMessage>>>,
    outstanding_request: Option<Ask<Work, WorkResult>>,
    result_accumulator: Vec<u64>,
}
impl Manager {
    fn new(num_workers: usize) -> Self {
        Manager {
            ctx: ComponentContext::uninitialised(),
            num_workers,
            workers: Vec::with_capacity(num_workers),
            worker_refs: Vec::with_capacity(num_workers),
            outstanding_request: None,
            result_accumulator: Vec::with_capacity(num_workers + 1),
        }
    }
}

impl ComponentLifecycle for Manager {
    fn on_start(&mut self) -> Handled {
        // set up our workers
        for _i in 0..self.num_workers {
            let worker = self.ctx.system().create(Worker::new);
            let worker_ref = worker.actor_ref().hold().expect("live");
            self.ctx.system().start(&worker);
            self.workers.push(worker);
            self.worker_refs.push(worker_ref);
        }
        Handled::Ok
    }

    fn on_stop(&mut self) -> Handled {
        // clean up after ourselves
        self.worker_refs.clear();
        let system = self.ctx.system();
        self.workers.drain(..).for_each(|worker| {
            system.stop(&worker);
        });
        Handled::Ok
    }

    fn on_kill(&mut self) -> Handled {
        self.on_stop()
    }
}

impl Actor for Manager {
    type Message = ManagerMessage;

    fn receive_local(&mut self, msg: Self::Message) -> Handled {
        match msg {
            ManagerMessage::Work(msg) => {
                assert!(
                    self.outstanding_request.is_none(),
                    "One request at a time, please!"
                );
                let work: &Work = msg.request();
                if self.num_workers == 0 {
                    // manager gotta work itself -> very unhappy manager
                    let res = work.data.iter().fold(work.neutral, work.merger);
                    msg.reply(WorkResult(res)).expect("reply");
                } else {
                    let len = work.data.len();
                    let stride = len / self.num_workers;
                    let mut start = 0usize;
                    let mut index = 0;
                    while start < len && index < self.num_workers {
                        let end = len.min(start + stride);
                        let range = start..end;
                        info!(self.log(), "Assigning {:?} to worker #{}", range, index);
                        let msg = WorkPart::from(work, range);
                        let worker = &self.worker_refs[index];
                        worker.tell(WithSender::from(msg, self));
                        start += stride;
                        index += 1;
                    }
                    if start < len {
                        // manager just does the rest itself
                        let res = work.data[start..len].iter().fold(work.neutral, work.merger);
                        self.result_accumulator.push(res);
                    } else {
                        // just put a neutral element in there, so our count is right in the end
                        self.result_accumulator.push(work.neutral);
                    }
                    self.outstanding_request = Some(msg);
                }
            }
            ManagerMessage::Result(msg) => {
                if self.outstanding_request.is_some() {
                    self.result_accumulator.push(msg.0);
                    if self.result_accumulator.len() == (self.num_workers + 1) {
                        let ask = self.outstanding_request.take().expect("ask");
                        let work: &Work = ask.request();
                        let res = self
                            .result_accumulator
                            .iter()
                            .fold(work.neutral, work.merger);
                        self.result_accumulator.clear();
                        let reply = WorkResult(res);
                        ask.reply(reply).expect("reply");
                    }
                } else {
                    error!(
                        self.log(),
                        "Got a response without an outstanding promise: {:?}", msg
                    );
                }
            }
        }
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!("Still ignoring networking stuff.");
    }
}

#[derive(ComponentDefinition)]
struct Worker {
    ctx: ComponentContext<Self>,
}
impl Worker {
    fn new() -> Self {
        Worker {
            ctx: ComponentContext::uninitialised(),
        }
    }
}
ignore_lifecycle!(Worker);

impl Actor for Worker {
    type Message = WithSender<WorkPart, ManagerMessage>;

    fn receive_local(&mut self, msg: Self::Message) -> Handled {
        let my_slice = &msg.data[msg.range.clone()];
        let res = my_slice.iter().fold(msg.neutral, msg.merger);
        msg.reply(ManagerMessage::Result(WorkResult(res)));
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!("Still ignoring networking stuff.");
    }
}

pub fn main() {
    let args: Vec<String> = env::args().collect();
    assert_eq!(
        3,
        args.len(),
        "Invalid arguments! Must give number of workers and size of the data array."
    );
    let num_workers: usize = args[1].parse().expect("number");
    let data_size: usize = args[2].parse().expect("number");
    run_task(num_workers, data_size);
}

fn run_task(num_workers: usize, data_size: usize) {
    let system = KompactConfig::default().build().expect("system");
    let manager = system.create(move || Manager::new(num_workers));
    system.start(&manager);
    let manager_ref = manager.actor_ref().hold().expect("live");

    let data: Vec<u64> = (1..=data_size).map(|v| v as u64).collect();
    let work = Work::with(data, overflowing_sum, 0u64);
    println!("Sending request...");
    let res = manager_ref
        .ask_with(|promise| ManagerMessage::Work(Ask::new(promise, work)))
        .wait();
    println!("*******\nGot result: {}\n*******", res.0);
    assert_eq!(triangular_number(data_size as u64), res.0);
    system.shutdown().expect("shutdown");
}

fn triangular_number(n: u64) -> u64 {
    (n * (n + 1u64)) / 2u64
}

fn overflowing_sum(lhs: u64, rhs: &u64) -> u64 {
    lhs.overflowing_add(*rhs).0
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_workers() {
        run_task(3, 1000);
    }
}

The receive_local(...) function is getting pretty long, so we should probably decompose it into smaller private functions if we actually wanted to maintain this code.

Now finally, when we want to send the Ask from the main-thread, we also need to wrap it into ManagerMessage::Work. This prevents us from simply using ActorRef::ask, as it only produces an Ask instance, not our wrapper ManagerMessage. This gets us back to previously mentioned ActorRef::ask_with function, which allows us to construct our Ask instance and put it into our wrapper ourselves. If we were to use this construction in many places throughout or code, it would likely be a good idea to use a constructor function on ManagerMessage to map the promise and the work values to the proper structure.

#![allow(clippy::unused_unit)]
use kompact::prelude::*;
use std::{env, fmt, ops::Range, sync::Arc};

struct Work {
    data: Arc<[u64]>,
    merger: fn(u64, &u64) -> u64,
    neutral: u64,
}
impl Work {
    fn with(data: Vec<u64>, merger: fn(u64, &u64) -> u64, neutral: u64) -> Self {
        let moved_data: Arc<[u64]> = data.into_boxed_slice().into();
        Work {
            data: moved_data,
            merger,
            neutral,
        }
    }
}
impl fmt::Debug for Work {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
            f,
            "Work{{
            data=<data of length={}>,
            merger=<function>,
            neutral={}
        }}",
            self.data.len(),
            self.neutral
        )
    }
}

struct WorkPart {
    data: Arc<[u64]>,
    range: Range<usize>,
    merger: fn(u64, &u64) -> u64,
    neutral: u64,
}
impl WorkPart {
    fn from(work: &Work, range: Range<usize>) -> Self {
        WorkPart {
            data: work.data.clone(),
            range,
            merger: work.merger,
            neutral: work.neutral,
        }
    }
}
impl fmt::Debug for WorkPart {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
            f,
            "WorkPart{{
            data=<data of length={}>,
            range={:?},
            merger=<function>,
            neutral={}
        }}",
            self.data.len(),
            self.range,
            self.neutral
        )
    }
}

#[derive(Debug)]
struct WorkResult(u64);

#[derive(Debug)]
enum ManagerMessage {
    Work(Ask<Work, WorkResult>),
    Result(WorkResult),
}
#[derive(ComponentDefinition)]
struct Manager {
    ctx: ComponentContext<Self>,
    num_workers: usize,
    workers: Vec<Arc<Component<Worker>>>,
    worker_refs: Vec<ActorRefStrong<WithSender<WorkPart, ManagerMessage>>>,
    outstanding_request: Option<Ask<Work, WorkResult>>,
    result_accumulator: Vec<u64>,
}
impl Manager {
    fn new(num_workers: usize) -> Self {
        Manager {
            ctx: ComponentContext::uninitialised(),
            num_workers,
            workers: Vec::with_capacity(num_workers),
            worker_refs: Vec::with_capacity(num_workers),
            outstanding_request: None,
            result_accumulator: Vec::with_capacity(num_workers + 1),
        }
    }
}

impl ComponentLifecycle for Manager {
    fn on_start(&mut self) -> Handled {
        // set up our workers
        for _i in 0..self.num_workers {
            let worker = self.ctx.system().create(Worker::new);
            let worker_ref = worker.actor_ref().hold().expect("live");
            self.ctx.system().start(&worker);
            self.workers.push(worker);
            self.worker_refs.push(worker_ref);
        }
        Handled::Ok
    }

    fn on_stop(&mut self) -> Handled {
        // clean up after ourselves
        self.worker_refs.clear();
        let system = self.ctx.system();
        self.workers.drain(..).for_each(|worker| {
            system.stop(&worker);
        });
        Handled::Ok
    }

    fn on_kill(&mut self) -> Handled {
        self.on_stop()
    }
}

impl Actor for Manager {
    type Message = ManagerMessage;

    fn receive_local(&mut self, msg: Self::Message) -> Handled {
        match msg {
            ManagerMessage::Work(msg) => {
                assert!(
                    self.outstanding_request.is_none(),
                    "One request at a time, please!"
                );
                let work: &Work = msg.request();
                if self.num_workers == 0 {
                    // manager gotta work itself -> very unhappy manager
                    let res = work.data.iter().fold(work.neutral, work.merger);
                    msg.reply(WorkResult(res)).expect("reply");
                } else {
                    let len = work.data.len();
                    let stride = len / self.num_workers;
                    let mut start = 0usize;
                    let mut index = 0;
                    while start < len && index < self.num_workers {
                        let end = len.min(start + stride);
                        let range = start..end;
                        info!(self.log(), "Assigning {:?} to worker #{}", range, index);
                        let msg = WorkPart::from(work, range);
                        let worker = &self.worker_refs[index];
                        worker.tell(WithSender::from(msg, self));
                        start += stride;
                        index += 1;
                    }
                    if start < len {
                        // manager just does the rest itself
                        let res = work.data[start..len].iter().fold(work.neutral, work.merger);
                        self.result_accumulator.push(res);
                    } else {
                        // just put a neutral element in there, so our count is right in the end
                        self.result_accumulator.push(work.neutral);
                    }
                    self.outstanding_request = Some(msg);
                }
            }
            ManagerMessage::Result(msg) => {
                if self.outstanding_request.is_some() {
                    self.result_accumulator.push(msg.0);
                    if self.result_accumulator.len() == (self.num_workers + 1) {
                        let ask = self.outstanding_request.take().expect("ask");
                        let work: &Work = ask.request();
                        let res = self
                            .result_accumulator
                            .iter()
                            .fold(work.neutral, work.merger);
                        self.result_accumulator.clear();
                        let reply = WorkResult(res);
                        ask.reply(reply).expect("reply");
                    }
                } else {
                    error!(
                        self.log(),
                        "Got a response without an outstanding promise: {:?}", msg
                    );
                }
            }
        }
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!("Still ignoring networking stuff.");
    }
}

#[derive(ComponentDefinition)]
struct Worker {
    ctx: ComponentContext<Self>,
}
impl Worker {
    fn new() -> Self {
        Worker {
            ctx: ComponentContext::uninitialised(),
        }
    }
}
ignore_lifecycle!(Worker);

impl Actor for Worker {
    type Message = WithSender<WorkPart, ManagerMessage>;

    fn receive_local(&mut self, msg: Self::Message) -> Handled {
        let my_slice = &msg.data[msg.range.clone()];
        let res = my_slice.iter().fold(msg.neutral, msg.merger);
        msg.reply(ManagerMessage::Result(WorkResult(res)));
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!("Still ignoring networking stuff.");
    }
}

pub fn main() {
    let args: Vec<String> = env::args().collect();
    assert_eq!(
        3,
        args.len(),
        "Invalid arguments! Must give number of workers and size of the data array."
    );
    let num_workers: usize = args[1].parse().expect("number");
    let data_size: usize = args[2].parse().expect("number");
    run_task(num_workers, data_size);
}

fn run_task(num_workers: usize, data_size: usize) {
    let system = KompactConfig::default().build().expect("system");
    let manager = system.create(move || Manager::new(num_workers));
    system.start(&manager);
    let manager_ref = manager.actor_ref().hold().expect("live");

    let data: Vec<u64> = (1..=data_size).map(|v| v as u64).collect();
    let work = Work::with(data, overflowing_sum, 0u64);
    println!("Sending request...");
    let res = manager_ref
        .ask_with(|promise| ManagerMessage::Work(Ask::new(promise, work)))
        .wait();
    println!("*******\nGot result: {}\n*******", res.0);
    assert_eq!(triangular_number(data_size as u64), res.0);
    system.shutdown().expect("shutdown");
}

fn triangular_number(n: u64) -> u64 {
    (n * (n + 1u64)) / 2u64
}

fn overflowing_sum(lhs: u64, rhs: &u64) -> u64 {
    lhs.overflowing_add(*rhs).0
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_workers() {
        run_task(3, 1000);
    }
}

At this point we should able to run the example again, and see the same behaviour as before.

Note: As before, if you have checked out the examples folder you can run the concrete binary with:

cargo run --release --bin workers_sender 4 100000