Schedulers

Kompact allows the core component scheduler to be exchanged, in order to support different kinds of workloads. The default crossbeam_workstealing_pool scheduler from the executors crate, for example, is designed for fork-join type workloads. That is, workloads where a small number of (pool) external events spawns a large number of (pool) internal events. But not all workloads are of this type. Sometimes the majority of events are (pool) external, and there is little communication between components running on the thread pool. Our somewhat contrived “counter”-example from the introduction was of this nature, for example. We were sending events and messages from the main-thread to the Counter, which was running on Kompact’s thread-pool. But we never sent any messages or events to any other component on that pool. In fact, we also only had a single component, and running it a large thread pool seems rather silly. (Kompact’s default thread pool has one thread for each CPU core, as reported by num_cpus.)

Changing Pool Size

We will first change just the pool size for the “counter”-example, since that is easily done.

The number of threads in Kompact’s thread pool is configured with the config value at system::THREADS using the set_config_value function on a KompactConfig instance. We will simply pass in 1usize there, before constructing our Kompact system.

That we change this line

#![allow(clippy::unused_unit)]
use kompact::prelude::*;
use std::time::Duration;

#[derive(Clone, Debug, PartialEq, Eq)]
struct CurrentCount {
    messages: u64,
    events: u64,
}
#[derive(Clone, Debug, PartialEq, Eq)]
struct CountMe;

struct CounterPort;
impl Port for CounterPort {
    type Indication = CurrentCount;
    type Request = CountMe;
}

#[derive(ComponentDefinition)]
struct Counter {
    ctx: ComponentContext<Self>,
    counter_port: ProvidedPort<CounterPort>,
    msg_count: u64,
    event_count: u64,
}
impl Counter {
    pub fn new() -> Self {
        Counter {
            ctx: ComponentContext::uninitialised(),
            counter_port: ProvidedPort::uninitialised(),
            msg_count: 0u64,
            event_count: 0u64,
        }
    }

    fn current_count(&self) -> CurrentCount {
        CurrentCount {
            messages: self.msg_count,
            events: self.event_count,
        }
    }
}

impl ComponentLifecycle for Counter {
    fn on_start(&mut self) -> Handled {
        info!(self.ctx.log(), "Got a start event!");
        self.event_count += 1u64;
        Handled::Ok
    }

    fn on_stop(&mut self) -> Handled {
        info!(self.ctx.log(), "Got a stop event!");
        self.event_count += 1u64;
        Handled::Ok
    }

    fn on_kill(&mut self) -> Handled {
        info!(self.ctx.log(), "Got a kill event!");
        self.event_count += 1u64;
        Handled::Ok
    }
}

impl Provide<CounterPort> for Counter {
    fn handle(&mut self, _event: CountMe) -> Handled {
        info!(self.ctx.log(), "Got a counter event!");
        self.event_count += 1u64;
        self.counter_port.trigger(self.current_count());
        Handled::Ok
    }
}

impl Actor for Counter {
    type Message = Ask<CountMe, CurrentCount>;

    fn receive_local(&mut self, msg: Self::Message) -> Handled {
        msg.complete(|_request| {
            info!(self.ctx.log(), "Got a message!");
            self.msg_count += 1u64;
            self.current_count()
        })
        .expect("complete");
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!("We are still ignoring network messages.");
    }
}

pub fn main() {
    let system = KompactConfig::default().build().expect("system");
    let counter = system.create(Counter::new);
    system.start(&counter);
    let actor_ref = counter.actor_ref();
    let port_ref: ProvidedRef<CounterPort> = counter.provided_ref();
    for _i in 0..100 {
        let current_count = actor_ref.ask(CountMe).wait();
        info!(system.logger(), "The current count is: {:?}", current_count);
    }
    for _i in 0..100 {
        system.trigger_r(CountMe, &port_ref);
        // Where do the answers go?
    }
    std::thread::sleep(Duration::from_millis(1000));
    let current_count = actor_ref.ask(CountMe).wait();
    info!(system.logger(), "The final count is: {:?}", current_count);
    system.shutdown().expect("shutdown");
    // Wait a bit longer, so all output is logged (asynchronously) before shutting down
    std::thread::sleep(Duration::from_millis(10));
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_counters() {
        main();
    }
}

to this:

#![allow(clippy::unused_unit)]
use kompact::prelude::*;
use std::time::Duration;

#[derive(Clone, Debug, PartialEq, Eq)]
struct CurrentCount {
    messages: u64,
    events: u64,
}
#[derive(Clone, Debug, PartialEq, Eq)]
struct CountMe;

struct CounterPort;
impl Port for CounterPort {
    type Indication = CurrentCount;
    type Request = CountMe;
}

#[derive(ComponentDefinition)]
struct Counter {
    ctx: ComponentContext<Self>,
    counter_port: ProvidedPort<CounterPort>,
    msg_count: u64,
    event_count: u64,
}
impl Counter {
    pub fn new() -> Self {
        Counter {
            ctx: ComponentContext::uninitialised(),
            counter_port: ProvidedPort::uninitialised(),
            msg_count: 0u64,
            event_count: 0u64,
        }
    }

    fn current_count(&self) -> CurrentCount {
        CurrentCount {
            messages: self.msg_count,
            events: self.event_count,
        }
    }
}
impl ComponentLifecycle for Counter {
    fn on_start(&mut self) -> Handled {
        info!(self.ctx.log(), "Got a start event!");
        self.event_count += 1u64;
        Handled::Ok
    }

    fn on_stop(&mut self) -> Handled {
        info!(self.ctx.log(), "Got a stop event!");
        self.event_count += 1u64;
        Handled::Ok
    }

    fn on_kill(&mut self) -> Handled {
        info!(self.ctx.log(), "Got a kill event!");
        self.event_count += 1u64;
        Handled::Ok
    }
}
impl Provide<CounterPort> for Counter {
    fn handle(&mut self, _event: CountMe) -> Handled {
        info!(self.ctx.log(), "Got a counter event!");
        self.event_count += 1u64;
        self.counter_port.trigger(self.current_count());
        Handled::Ok
    }
}

impl Actor for Counter {
    type Message = Ask<CountMe, CurrentCount>;

    fn receive_local(&mut self, msg: Self::Message) -> Handled {
        msg.complete(|_request| {
            info!(self.ctx.log(), "Got a message!");
            self.msg_count += 1u64;
            self.current_count()
        })
        .expect("complete");
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!("We are still ignoring network messages.");
    }
}

pub fn main() {
    use kompact::config_keys::system;
    let mut conf = KompactConfig::default();
    conf.set_config_value(&system::THREADS, 1usize);
    let system = conf.build().expect("system");
    let counter = system.create(Counter::new);
    system.start(&counter);
    let actor_ref = counter.actor_ref();
    let port_ref: ProvidedRef<CounterPort> = counter.provided_ref();
    for _i in 0..100 {
        let current_count = actor_ref.ask(CountMe).wait();
        info!(system.logger(), "The current count is: {:?}", current_count);
    }
    for _i in 0..100 {
        system.trigger_r(CountMe, &port_ref);
        // Where do the answers go?
    }
    std::thread::sleep(Duration::from_millis(1000));
    let current_count = actor_ref.ask(CountMe).wait();
    info!(system.logger(), "The final count is: {:?}", current_count);
    system.shutdown().expect("shutdown");
    // Wait a bit longer, so all output is logged (asynchronously) before shutting down
    std::thread::sleep(Duration::from_millis(10));
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_counters() {
        main();
    }
}

The same effect could be achieved via a configuration file by setting kompact.runtime.threads = 1.

If we run this, we will see exactly (modulo event timing) the same output as when running on the larger pool with Kompact’s default settings.

Note: As before, if you have checked out the examples folder you can run the concrete binary with:

cargo run --release --bin counters_pool

Changing Scheduler Implementation

Ok, so now we’ll switch to a pool that is designed for external events: crossbeam_channel_pool, also from the executors crate.

We can set the scheduler implementation to be used by our Kompact system using the executor(...) function on a KompactConfig instance. That function expects a closure from the number of threads (usize) to something that implements the executors::FuturesExecutor trait.

Note: There is actually a more general API for changing scheduler, in the scheduler(...) function, which expects a function returning a Box<dyn kompact::runtime::Scheduler>. The executor(...) function is simply a shortcut for using schedulers that are compatible with the executors crate.

In order to use the crossbeam_channel_pool scheduler, we need to import the kompact::executors module, which is simply a re-export from the executors crate:

use kompact::executors;

With that, all we need to add is the following line of code, which selects the ThreadPool implementation from the crossbeam_channel_pool module, instead of the one from the crossbeam_workstealing_pool module, that is default.

#![allow(clippy::unused_unit)]
use kompact::{executors, prelude::*};
use std::time::Duration;

#[derive(Clone, Debug, PartialEq, Eq)]
struct CurrentCount {
    messages: u64,
    events: u64,
}
#[derive(Clone, Debug, PartialEq, Eq)]
struct CountMe;

struct CounterPort;
impl Port for CounterPort {
    type Indication = CurrentCount;
    type Request = CountMe;
}

#[derive(ComponentDefinition)]
struct Counter {
    ctx: ComponentContext<Self>,
    counter_port: ProvidedPort<CounterPort>,
    msg_count: u64,
    event_count: u64,
}
impl Counter {
    pub fn new() -> Self {
        Counter {
            ctx: ComponentContext::uninitialised(),
            counter_port: ProvidedPort::uninitialised(),
            msg_count: 0u64,
            event_count: 0u64,
        }
    }

    fn current_count(&self) -> CurrentCount {
        CurrentCount {
            messages: self.msg_count,
            events: self.event_count,
        }
    }
}
impl ComponentLifecycle for Counter {
    fn on_start(&mut self) -> Handled {
        info!(self.ctx.log(), "Got a start event!");
        self.event_count += 1u64;
        Handled::Ok
    }

    fn on_stop(&mut self) -> Handled {
        info!(self.ctx.log(), "Got a stop event!");
        self.event_count += 1u64;
        Handled::Ok
    }

    fn on_kill(&mut self) -> Handled {
        info!(self.ctx.log(), "Got a kill event!");
        self.event_count += 1u64;
        Handled::Ok
    }
}
impl Provide<CounterPort> for Counter {
    fn handle(&mut self, _event: CountMe) -> Handled {
        info!(self.ctx.log(), "Got a counter event!");
        self.event_count += 1u64;
        self.counter_port.trigger(self.current_count());
        Handled::Ok
    }
}

impl Actor for Counter {
    type Message = Ask<CountMe, CurrentCount>;

    fn receive_local(&mut self, msg: Self::Message) -> Handled {
        msg.complete(|_request| {
            info!(self.ctx.log(), "Got a message!");
            self.msg_count += 1u64;
            self.current_count()
        })
        .expect("complete");
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!("We are still ignoring network messages.");
    }
}

pub fn main() {
    use kompact::config_keys::system;
    let mut conf = KompactConfig::default();
    conf.set_config_value(&system::THREADS, 1usize);
    conf.executor(executors::crossbeam_channel_pool::ThreadPool::new);
    let system = conf.build().expect("system");
    let counter = system.create(Counter::new);
    system.start(&counter);
    let actor_ref = counter.actor_ref();
    let port_ref: ProvidedRef<CounterPort> = counter.provided_ref();
    for _i in 0..100 {
        let current_count = actor_ref.ask(CountMe).wait();
        info!(system.logger(), "The current count is: {:?}", current_count);
    }
    for _i in 0..100 {
        system.trigger_r(CountMe, &port_ref);
        // Where do the answers go?
    }
    std::thread::sleep(Duration::from_millis(1000));
    let current_count = actor_ref.ask(CountMe).wait();
    info!(system.logger(), "The final count is: {:?}", current_count);
    system.shutdown().expect("shutdown");
    // Wait a bit longer, so all output is logged (asynchronously) before shutting down
    std::thread::sleep(Duration::from_millis(10));
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_counters() {
        main();
    }
}

If we run this, again, we will see exactly (modulo event timing) the same output as when running on the larger pool with Kompact’s default settings.

Note: As before, if you have checked out the examples folder you can run the concrete binary with:

cargo run --release --bin counters_channel_pool