System

In order to run any Kompact component, we need a KompactSystem. The system manages the runtime variables, the thread pool, logging, and many aspects of the Kompact. Such a system is created from a KompactConfig via the build() function. The config instance allows customisation of many parameters of the runtime, which we will discuss in upcoming sections. For now, the default() instance will do just fine. It creates a thread pool with one thread for each CPU core, as reported by num_cpus, schedules fairly between messags and events, and does some internal message/event batching to improve performance.

Note: As opposed to Kompics, in Kompact it is perfectly viable to have multiple systems running in the same process, for example with different configurations.

When a Kompact system is not used anymore it should be shut down via the shutdown() function. Sometimes it is a component instead of the main-thread that must decide when to shut down. In that case, it can use self.ctx.system().shutdown_async() and the main-thread can wait for this to complete with await_termination().

Note: Neither shutdown_async() nor await_termination() has a particularly efficient implementation, as this should be a relatively rare thing to do in the lifetime of a Kompact system, and thus doesn’t warrant optimisation at this point. That also means, though, that await_termination() should definitely not be used as a timing marker in a benchmark, as some people have done with the equivalent Akka API.

Tying Things Together

For our worker pool example, we will simply use the default configuration, and start a Manager component with a configurable number of workers. We then create some data array of a configurable size and send a work request with it and an aggregation function to the manager instance. We’ll use simple addition with overflow as our aggregation function, which means our neutral element is 0u64. The data array we’ll generate is simply the integers from 1 to data_size, which means our aggregate will actually calculate a triangular number (modulo overflows, for which we probably don’t have enough memory for the data array anyway). Since that particular number has a much simpler solution, i.e. \( \sum_{k=1}^n k = \frac{n\cdot(n+1)}{2} \), we will also use an assertion to verify we are actually producing the right result (again, this probably won’t work if we actually do overflow during aggregration, but oh well...details ;).

#![allow(clippy::unused_unit)]
use kompact::prelude::*;
use std::{env, fmt, ops::Range, sync::Arc};

struct Work {
    data: Arc<[u64]>,
    merger: fn(u64, &u64) -> u64,
    neutral: u64,
}
impl Work {
    fn with(data: Vec<u64>, merger: fn(u64, &u64) -> u64, neutral: u64) -> Self {
        let moved_data: Arc<[u64]> = data.into_boxed_slice().into();
        Work {
            data: moved_data,
            merger,
            neutral,
        }
    }
}
impl fmt::Debug for Work {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
            f,
            "Work{{
            data=<data of length={}>,
            merger=<function>,
            neutral={}
        }}",
            self.data.len(),
            self.neutral
        )
    }
}

struct WorkPart {
    data: Arc<[u64]>,
    range: Range<usize>,
    merger: fn(u64, &u64) -> u64,
    neutral: u64,
}
impl WorkPart {
    fn from(work: &Work, range: Range<usize>) -> Self {
        WorkPart {
            data: work.data.clone(),
            range,
            merger: work.merger,
            neutral: work.neutral,
        }
    }
}
impl fmt::Debug for WorkPart {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
            f,
            "WorkPart{{
            data=<data of length={}>,
            range={:?},
            merger=<function>,
            neutral={}
        }}",
            self.data.len(),
            self.range,
            self.neutral
        )
    }
}

#[derive(Clone, Debug)]
struct WorkResult(u64);
struct WorkerPort;
impl Port for WorkerPort {
    type Indication = WorkResult;
    type Request = Never;
}

#[derive(ComponentDefinition)]
struct Manager {
    ctx: ComponentContext<Self>,
    worker_port: RequiredPort<WorkerPort>,
    num_workers: usize,
    workers: Vec<Arc<Component<Worker>>>,
    worker_refs: Vec<ActorRefStrong<WorkPart>>,
    outstanding_request: Option<Ask<Work, WorkResult>>,
    result_accumulator: Vec<u64>,
}
impl Manager {
    fn new(num_workers: usize) -> Self {
        Manager {
            ctx: ComponentContext::uninitialised(),
            worker_port: RequiredPort::uninitialised(),
            num_workers,
            workers: Vec::with_capacity(num_workers),
            worker_refs: Vec::with_capacity(num_workers),
            outstanding_request: None,
            result_accumulator: Vec::with_capacity(num_workers + 1),
        }
    }
}

impl ComponentLifecycle for Manager {
    fn on_start(&mut self) -> Handled {
        // set up our workers
        for _i in 0..self.num_workers {
            let worker = self.ctx.system().create(Worker::new);
            worker.connect_to_required(self.worker_port.share());
            let worker_ref = worker.actor_ref().hold().expect("live");
            self.ctx.system().start(&worker);
            self.workers.push(worker);
            self.worker_refs.push(worker_ref);
        }
        Handled::Ok
    }

    fn on_stop(&mut self) -> Handled {
        // clean up after ourselves
        self.worker_refs.clear();
        let system = self.ctx.system();
        self.workers.drain(..).for_each(|worker| {
            system.stop(&worker);
        });
        Handled::Ok
    }

    fn on_kill(&mut self) -> Handled {
        self.on_stop()
    }
}
impl Require<WorkerPort> for Manager {
    fn handle(&mut self, event: WorkResult) -> Handled {
        if self.outstanding_request.is_some() {
            self.result_accumulator.push(event.0);
            if self.result_accumulator.len() == (self.num_workers + 1) {
                let ask = self.outstanding_request.take().expect("ask");
                let work: &Work = ask.request();
                let res = self
                    .result_accumulator
                    .iter()
                    .fold(work.neutral, work.merger);
                self.result_accumulator.clear();
                let reply = WorkResult(res);
                ask.reply(reply).expect("reply");
            }
        } else {
            error!(
                self.log(),
                "Got a response without an outstanding promise: {:?}", event
            );
        }
        Handled::Ok
    }
}
impl Actor for Manager {
    type Message = Ask<Work, WorkResult>;

    fn receive_local(&mut self, msg: Self::Message) -> Handled {
        assert!(
            self.outstanding_request.is_none(),
            "One request at a time, please!"
        );
        let work: &Work = msg.request();
        if self.num_workers == 0 {
            // manager gotta work itself -> very unhappy manager
            let res = work.data.iter().fold(work.neutral, work.merger);
            msg.reply(WorkResult(res)).expect("reply");
        } else {
            let len = work.data.len();
            let stride = len / self.num_workers;
            let mut start = 0usize;
            let mut index = 0;
            while start < len && index < self.num_workers {
                let end = len.min(start + stride);
                let range = start..end;
                info!(self.log(), "Assigning {:?} to worker #{}", range, index);
                let msg = WorkPart::from(work, range);
                let worker = &self.worker_refs[index];
                worker.tell(msg);
                start += stride;
                index += 1;
            }
            if start < len {
                // manager just does the rest itself
                let res = work.data[start..len].iter().fold(work.neutral, work.merger);
                self.result_accumulator.push(res);
            } else {
                // just put a neutral element in there, so our count is right in the end
                self.result_accumulator.push(work.neutral);
            }
            self.outstanding_request = Some(msg);
        }
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!("Still ignoring networking stuff.");
    }
}

#[derive(ComponentDefinition)]
struct Worker {
    ctx: ComponentContext<Self>,
    worker_port: ProvidedPort<WorkerPort>,
}
impl Worker {
    fn new() -> Self {
        Worker {
            ctx: ComponentContext::uninitialised(),
            worker_port: ProvidedPort::uninitialised(),
        }
    }
}

ignore_lifecycle!(Worker);
ignore_requests!(WorkerPort, Worker);

impl Actor for Worker {
    type Message = WorkPart;

    fn receive_local(&mut self, msg: Self::Message) -> Handled {
        let my_slice = &msg.data[msg.range];
        let res = my_slice.iter().fold(msg.neutral, msg.merger);
        self.worker_port.trigger(WorkResult(res));
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!("Still ignoring networking stuff.");
    }
}

pub fn main() {
    let args: Vec<String> = env::args().collect();
    assert_eq!(
        3,
        args.len(),
        "Invalid arguments! Must give number of workers and size of the data array."
    );
    let num_workers: usize = args[1].parse().expect("number");
    let data_size: usize = args[2].parse().expect("number");
    run_task(num_workers, data_size);
}
fn run_task(num_workers: usize, data_size: usize) {
    let system = KompactConfig::default().build().expect("system");
    let manager = system.create(move || Manager::new(num_workers));
    system.start(&manager);
    let manager_ref = manager.actor_ref().hold().expect("live");

    let data: Vec<u64> = (1..=data_size).map(|v| v as u64).collect();
    let work = Work::with(data, overflowing_sum, 0u64);
    println!("Sending request...");
    let res = manager_ref.ask(work).wait();
    println!("*******\nGot result: {}\n*******", res.0);
    assert_eq!(triangular_number(data_size as u64), res.0);
    system.shutdown().expect("shutdown");
}

fn triangular_number(n: u64) -> u64 {
    (n * (n + 1u64)) / 2u64
}

fn overflowing_sum(lhs: u64, rhs: &u64) -> u64 {
    lhs.overflowing_add(*rhs).0
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_workers() {
        run_task(3, 1000);
    }
}

Now all we are missing are values for the two parameters; num_workers and data_size. We’ll read those from command-line so we can play around with them.

#![allow(clippy::unused_unit)]
use kompact::prelude::*;
use std::{env, fmt, ops::Range, sync::Arc};

struct Work {
    data: Arc<[u64]>,
    merger: fn(u64, &u64) -> u64,
    neutral: u64,
}
impl Work {
    fn with(data: Vec<u64>, merger: fn(u64, &u64) -> u64, neutral: u64) -> Self {
        let moved_data: Arc<[u64]> = data.into_boxed_slice().into();
        Work {
            data: moved_data,
            merger,
            neutral,
        }
    }
}
impl fmt::Debug for Work {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
            f,
            "Work{{
            data=<data of length={}>,
            merger=<function>,
            neutral={}
        }}",
            self.data.len(),
            self.neutral
        )
    }
}

struct WorkPart {
    data: Arc<[u64]>,
    range: Range<usize>,
    merger: fn(u64, &u64) -> u64,
    neutral: u64,
}
impl WorkPart {
    fn from(work: &Work, range: Range<usize>) -> Self {
        WorkPart {
            data: work.data.clone(),
            range,
            merger: work.merger,
            neutral: work.neutral,
        }
    }
}
impl fmt::Debug for WorkPart {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
            f,
            "WorkPart{{
            data=<data of length={}>,
            range={:?},
            merger=<function>,
            neutral={}
        }}",
            self.data.len(),
            self.range,
            self.neutral
        )
    }
}

#[derive(Clone, Debug)]
struct WorkResult(u64);
struct WorkerPort;
impl Port for WorkerPort {
    type Indication = WorkResult;
    type Request = Never;
}

#[derive(ComponentDefinition)]
struct Manager {
    ctx: ComponentContext<Self>,
    worker_port: RequiredPort<WorkerPort>,
    num_workers: usize,
    workers: Vec<Arc<Component<Worker>>>,
    worker_refs: Vec<ActorRefStrong<WorkPart>>,
    outstanding_request: Option<Ask<Work, WorkResult>>,
    result_accumulator: Vec<u64>,
}
impl Manager {
    fn new(num_workers: usize) -> Self {
        Manager {
            ctx: ComponentContext::uninitialised(),
            worker_port: RequiredPort::uninitialised(),
            num_workers,
            workers: Vec::with_capacity(num_workers),
            worker_refs: Vec::with_capacity(num_workers),
            outstanding_request: None,
            result_accumulator: Vec::with_capacity(num_workers + 1),
        }
    }
}

impl ComponentLifecycle for Manager {
    fn on_start(&mut self) -> Handled {
        // set up our workers
        for _i in 0..self.num_workers {
            let worker = self.ctx.system().create(Worker::new);
            worker.connect_to_required(self.worker_port.share());
            let worker_ref = worker.actor_ref().hold().expect("live");
            self.ctx.system().start(&worker);
            self.workers.push(worker);
            self.worker_refs.push(worker_ref);
        }
        Handled::Ok
    }

    fn on_stop(&mut self) -> Handled {
        // clean up after ourselves
        self.worker_refs.clear();
        let system = self.ctx.system();
        self.workers.drain(..).for_each(|worker| {
            system.stop(&worker);
        });
        Handled::Ok
    }

    fn on_kill(&mut self) -> Handled {
        self.on_stop()
    }
}
impl Require<WorkerPort> for Manager {
    fn handle(&mut self, event: WorkResult) -> Handled {
        if self.outstanding_request.is_some() {
            self.result_accumulator.push(event.0);
            if self.result_accumulator.len() == (self.num_workers + 1) {
                let ask = self.outstanding_request.take().expect("ask");
                let work: &Work = ask.request();
                let res = self
                    .result_accumulator
                    .iter()
                    .fold(work.neutral, work.merger);
                self.result_accumulator.clear();
                let reply = WorkResult(res);
                ask.reply(reply).expect("reply");
            }
        } else {
            error!(
                self.log(),
                "Got a response without an outstanding promise: {:?}", event
            );
        }
        Handled::Ok
    }
}
impl Actor for Manager {
    type Message = Ask<Work, WorkResult>;

    fn receive_local(&mut self, msg: Self::Message) -> Handled {
        assert!(
            self.outstanding_request.is_none(),
            "One request at a time, please!"
        );
        let work: &Work = msg.request();
        if self.num_workers == 0 {
            // manager gotta work itself -> very unhappy manager
            let res = work.data.iter().fold(work.neutral, work.merger);
            msg.reply(WorkResult(res)).expect("reply");
        } else {
            let len = work.data.len();
            let stride = len / self.num_workers;
            let mut start = 0usize;
            let mut index = 0;
            while start < len && index < self.num_workers {
                let end = len.min(start + stride);
                let range = start..end;
                info!(self.log(), "Assigning {:?} to worker #{}", range, index);
                let msg = WorkPart::from(work, range);
                let worker = &self.worker_refs[index];
                worker.tell(msg);
                start += stride;
                index += 1;
            }
            if start < len {
                // manager just does the rest itself
                let res = work.data[start..len].iter().fold(work.neutral, work.merger);
                self.result_accumulator.push(res);
            } else {
                // just put a neutral element in there, so our count is right in the end
                self.result_accumulator.push(work.neutral);
            }
            self.outstanding_request = Some(msg);
        }
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!("Still ignoring networking stuff.");
    }
}

#[derive(ComponentDefinition)]
struct Worker {
    ctx: ComponentContext<Self>,
    worker_port: ProvidedPort<WorkerPort>,
}
impl Worker {
    fn new() -> Self {
        Worker {
            ctx: ComponentContext::uninitialised(),
            worker_port: ProvidedPort::uninitialised(),
        }
    }
}

ignore_lifecycle!(Worker);
ignore_requests!(WorkerPort, Worker);

impl Actor for Worker {
    type Message = WorkPart;

    fn receive_local(&mut self, msg: Self::Message) -> Handled {
        let my_slice = &msg.data[msg.range];
        let res = my_slice.iter().fold(msg.neutral, msg.merger);
        self.worker_port.trigger(WorkResult(res));
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!("Still ignoring networking stuff.");
    }
}

pub fn main() {
    let args: Vec<String> = env::args().collect();
    assert_eq!(
        3,
        args.len(),
        "Invalid arguments! Must give number of workers and size of the data array."
    );
    let num_workers: usize = args[1].parse().expect("number");
    let data_size: usize = args[2].parse().expect("number");
    run_task(num_workers, data_size);
}
fn run_task(num_workers: usize, data_size: usize) {
    let system = KompactConfig::default().build().expect("system");
    let manager = system.create(move || Manager::new(num_workers));
    system.start(&manager);
    let manager_ref = manager.actor_ref().hold().expect("live");

    let data: Vec<u64> = (1..=data_size).map(|v| v as u64).collect();
    let work = Work::with(data, overflowing_sum, 0u64);
    println!("Sending request...");
    let res = manager_ref.ask(work).wait();
    println!("*******\nGot result: {}\n*******", res.0);
    assert_eq!(triangular_number(data_size as u64), res.0);
    system.shutdown().expect("shutdown");
}

fn triangular_number(n: u64) -> u64 {
    (n * (n + 1u64)) / 2u64
}

fn overflowing_sum(lhs: u64, rhs: &u64) -> u64 {
    lhs.overflowing_add(*rhs).0
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_workers() {
        run_task(3, 1000);
    }
}

Now we can run our example, by giving it some parameters, say 4 100000 to run 4 workers and calculate the 100000th triangular number. If you play with larger numbers you’ll see that a) it uses more and more memory and b) it will spend most of its time creating the original array, as our aggregration function is very simple and parallelisable, while data creation is done sequentially. Of course, in a real worker pool we’d probably read data from disk somewhere or from an already memory resident set, perhaps. But this is good enough for our little example.

Note: As before, if you have checked out the examples folder you can run the concrete binary with:

cargo run --release --bin workers 4 100000