Configuration

Since it is often inconvenient to pass around a large number of parameters when setting up a component system, Kompact also offers a configuration system allowing parameters to be loaded from a file or provided as a large string at the top level, for example. This system is powered by the Hocon crate and uses its APIs with very little additional support.

Configuration options must be set on the KompactConfig instance before the system is started and the resulting configuration remains immutable for the lifetime of the system. A configuration can be loaded from a file by passing a path to the file to the load_config_file(...) function. Alternatively, configuration values can be loaded directly from a string using load_config_str(...).

Within each component the Hocon configuration instance can be accessed via the context and individual keys via bracket notation, e.g. self.ctx.config()["my-key"]. The configuration can also be accessed outside a component via KompactSystem::config().

In addition to component configuration, many parts of Kompact’s runtime can also be configured via this mechanism. The complete set of available configuration keys and their effects is described in the modules below kompact::config_keys.

Example

We are going to reuse the Buncher from the timers section and pass its two parameters, batch_size and timeout, via configuration instead of the constructor.

We’ll start off by creating a configuration file application.conf in the working directory, so its easy to find later. Something like this:

buncher {
	batch-size = 100
	timeout = 100 ms
}
omega {
	initial-period = 10 ms
	delta = 1 ms
}

We can then add this file to the KompicsConfig instance using the load_config_file(...) function:

#![allow(clippy::unused_unit)]
use kompact::prelude::*;
use kompact_examples::batching::*;
use std::time::Duration;

#[derive(ComponentDefinition, Actor)]
struct Buncher {
    ctx: ComponentContext<Self>,
    batch_port: ProvidedPort<Batching>,
    batch_size: usize,
    timeout: Duration,
    current_batch: Vec<Ping>,
    outstanding_timeout: Option<ScheduledTimer>,
}

impl Buncher {
    fn new() -> Buncher {
        Buncher {
            ctx: ComponentContext::uninitialised(),
            batch_port: ProvidedPort::uninitialised(),
            batch_size: 0,
            timeout: Duration::from_millis(1),
            current_batch: Vec::new(),
            outstanding_timeout: None,
        }
    }


    fn trigger_batch(&mut self) -> () {
        let mut new_batch = Vec::with_capacity(self.batch_size);
        std::mem::swap(&mut new_batch, &mut self.current_batch);
        self.batch_port.trigger(Batch(new_batch))
    }

    fn handle_timeout(&mut self, timeout_id: ScheduledTimer) -> Handled {
        match self.outstanding_timeout {
            Some(ref timeout) if *timeout == timeout_id => {
                self.trigger_batch();
                let new_timeout = self.schedule_once(self.timeout, Self::handle_timeout);
                self.outstanding_timeout = Some(new_timeout);
                Handled::Ok
            }
            Some(_) => Handled::Ok, // just ignore outdated timeouts
            None => {
                warn!(self.log(), "Got unexpected timeout: {:?}", timeout_id);
                Handled::Ok
            } // can happen during restart or teardown
        }
    }
}

impl ComponentLifecycle for Buncher {
    fn on_start(&mut self) -> Handled {
        self.batch_size = self.ctx.config()["buncher"]["batch-size"]
            .as_i64()
            .expect("batch size") as usize;
        self.timeout = self.ctx.config()["buncher"]["timeout"]
            .as_duration()
            .expect("timeout");
        self.current_batch.reserve(self.batch_size);
        let timeout = self.schedule_once(self.timeout, Buncher::handle_timeout);
        self.outstanding_timeout = Some(timeout);
        Handled::Ok
    }


    fn on_stop(&mut self) -> Handled {
        if let Some(timeout) = self.outstanding_timeout.take() {
            self.cancel_timer(timeout);
        }
        Handled::Ok
    }

    fn on_kill(&mut self) -> Handled {
        self.on_stop()
    }
}

impl Provide<Batching> for Buncher {
    fn handle(&mut self, event: Ping) -> Handled {
        self.current_batch.push(event);
        if self.current_batch.len() >= self.batch_size {
            self.trigger_batch();
            if let Some(timeout) = self.outstanding_timeout.take() {
                self.cancel_timer(timeout);
            }
            let new_timeout = self.schedule_once(self.timeout, Buncher::handle_timeout);
            self.outstanding_timeout = Some(new_timeout);
        }
        Handled::Ok
    }
}

pub fn main() {
    let mut conf = KompactConfig::default();
    conf.load_config_file("./application.conf")
        .load_config_str("buncher.batch-size = 50");
    let system = conf.build().expect("system");
    let printer = system.create(BatchPrinter::new);
    let buncher = system.create(Buncher::new);
    biconnect_components::<Batching, _, _>(&buncher, &printer).expect("connection");
    let batching = buncher.on_definition(|cd| cd.batch_port.share());

    system.start(&printer);
    system.start(&buncher);

    // these should usually trigger due to full batches
    let sleep_dur = Duration::from_millis(1);
    for i in 0..500 {
        let ping = Ping(i);
        system.trigger_r(ping, &batching);
        std::thread::sleep(sleep_dur);
    }

    // these should usually trigger due to timeout
    let sleep_dur = Duration::from_millis(2);
    for i in 0..500 {
        let ping = Ping(i);
        system.trigger_r(ping, &batching);
        std::thread::sleep(sleep_dur);
    }

    system.shutdown().expect("shutdown");
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_buncher() {
        main();
    }
}

To show off how multiple configuration sources can be combined, we will override the batch-size value from the main function with a literal string, after the file has been loaded:

#![allow(clippy::unused_unit)]
use kompact::prelude::*;
use kompact_examples::batching::*;
use std::time::Duration;

#[derive(ComponentDefinition, Actor)]
struct Buncher {
    ctx: ComponentContext<Self>,
    batch_port: ProvidedPort<Batching>,
    batch_size: usize,
    timeout: Duration,
    current_batch: Vec<Ping>,
    outstanding_timeout: Option<ScheduledTimer>,
}

impl Buncher {
    fn new() -> Buncher {
        Buncher {
            ctx: ComponentContext::uninitialised(),
            batch_port: ProvidedPort::uninitialised(),
            batch_size: 0,
            timeout: Duration::from_millis(1),
            current_batch: Vec::new(),
            outstanding_timeout: None,
        }
    }


    fn trigger_batch(&mut self) -> () {
        let mut new_batch = Vec::with_capacity(self.batch_size);
        std::mem::swap(&mut new_batch, &mut self.current_batch);
        self.batch_port.trigger(Batch(new_batch))
    }

    fn handle_timeout(&mut self, timeout_id: ScheduledTimer) -> Handled {
        match self.outstanding_timeout {
            Some(ref timeout) if *timeout == timeout_id => {
                self.trigger_batch();
                let new_timeout = self.schedule_once(self.timeout, Self::handle_timeout);
                self.outstanding_timeout = Some(new_timeout);
                Handled::Ok
            }
            Some(_) => Handled::Ok, // just ignore outdated timeouts
            None => {
                warn!(self.log(), "Got unexpected timeout: {:?}", timeout_id);
                Handled::Ok
            } // can happen during restart or teardown
        }
    }
}

impl ComponentLifecycle for Buncher {
    fn on_start(&mut self) -> Handled {
        self.batch_size = self.ctx.config()["buncher"]["batch-size"]
            .as_i64()
            .expect("batch size") as usize;
        self.timeout = self.ctx.config()["buncher"]["timeout"]
            .as_duration()
            .expect("timeout");
        self.current_batch.reserve(self.batch_size);
        let timeout = self.schedule_once(self.timeout, Buncher::handle_timeout);
        self.outstanding_timeout = Some(timeout);
        Handled::Ok
    }


    fn on_stop(&mut self) -> Handled {
        if let Some(timeout) = self.outstanding_timeout.take() {
            self.cancel_timer(timeout);
        }
        Handled::Ok
    }

    fn on_kill(&mut self) -> Handled {
        self.on_stop()
    }
}

impl Provide<Batching> for Buncher {
    fn handle(&mut self, event: Ping) -> Handled {
        self.current_batch.push(event);
        if self.current_batch.len() >= self.batch_size {
            self.trigger_batch();
            if let Some(timeout) = self.outstanding_timeout.take() {
                self.cancel_timer(timeout);
            }
            let new_timeout = self.schedule_once(self.timeout, Buncher::handle_timeout);
            self.outstanding_timeout = Some(new_timeout);
        }
        Handled::Ok
    }
}

pub fn main() {
    let mut conf = KompactConfig::default();
    conf.load_config_file("./application.conf")
        .load_config_str("buncher.batch-size = 50");
    let system = conf.build().expect("system");
    let printer = system.create(BatchPrinter::new);
    let buncher = system.create(Buncher::new);
    biconnect_components::<Batching, _, _>(&buncher, &printer).expect("connection");
    let batching = buncher.on_definition(|cd| cd.batch_port.share());

    system.start(&printer);
    system.start(&buncher);

    // these should usually trigger due to full batches
    let sleep_dur = Duration::from_millis(1);
    for i in 0..500 {
        let ping = Ping(i);
        system.trigger_r(ping, &batching);
        std::thread::sleep(sleep_dur);
    }

    // these should usually trigger due to timeout
    let sleep_dur = Duration::from_millis(2);
    for i in 0..500 {
        let ping = Ping(i);
        system.trigger_r(ping, &batching);
        std::thread::sleep(sleep_dur);
    }

    system.shutdown().expect("shutdown");
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_buncher() {
        main();
    }
}

Now we change the Buncher constructor to not take any arguments anymore. Since we still need to put some values into the struct fields, let’s put some default values, say batch size of 0 and a timeout of 1ms. We could also go with an Option, if it’s important to know whether the component was initialised properly nor not. We also don’t know the required capacity for the vector anymore, so we just create an empty one, and extend it later once we have read the batch size from the config file.

#![allow(clippy::unused_unit)]
use kompact::prelude::*;
use kompact_examples::batching::*;
use std::time::Duration;

#[derive(ComponentDefinition, Actor)]
struct Buncher {
    ctx: ComponentContext<Self>,
    batch_port: ProvidedPort<Batching>,
    batch_size: usize,
    timeout: Duration,
    current_batch: Vec<Ping>,
    outstanding_timeout: Option<ScheduledTimer>,
}

impl Buncher {
    fn new() -> Buncher {
        Buncher {
            ctx: ComponentContext::uninitialised(),
            batch_port: ProvidedPort::uninitialised(),
            batch_size: 0,
            timeout: Duration::from_millis(1),
            current_batch: Vec::new(),
            outstanding_timeout: None,
        }
    }


    fn trigger_batch(&mut self) -> () {
        let mut new_batch = Vec::with_capacity(self.batch_size);
        std::mem::swap(&mut new_batch, &mut self.current_batch);
        self.batch_port.trigger(Batch(new_batch))
    }

    fn handle_timeout(&mut self, timeout_id: ScheduledTimer) -> Handled {
        match self.outstanding_timeout {
            Some(ref timeout) if *timeout == timeout_id => {
                self.trigger_batch();
                let new_timeout = self.schedule_once(self.timeout, Self::handle_timeout);
                self.outstanding_timeout = Some(new_timeout);
                Handled::Ok
            }
            Some(_) => Handled::Ok, // just ignore outdated timeouts
            None => {
                warn!(self.log(), "Got unexpected timeout: {:?}", timeout_id);
                Handled::Ok
            } // can happen during restart or teardown
        }
    }
}

impl ComponentLifecycle for Buncher {
    fn on_start(&mut self) -> Handled {
        self.batch_size = self.ctx.config()["buncher"]["batch-size"]
            .as_i64()
            .expect("batch size") as usize;
        self.timeout = self.ctx.config()["buncher"]["timeout"]
            .as_duration()
            .expect("timeout");
        self.current_batch.reserve(self.batch_size);
        let timeout = self.schedule_once(self.timeout, Buncher::handle_timeout);
        self.outstanding_timeout = Some(timeout);
        Handled::Ok
    }


    fn on_stop(&mut self) -> Handled {
        if let Some(timeout) = self.outstanding_timeout.take() {
            self.cancel_timer(timeout);
        }
        Handled::Ok
    }

    fn on_kill(&mut self) -> Handled {
        self.on_stop()
    }
}

impl Provide<Batching> for Buncher {
    fn handle(&mut self, event: Ping) -> Handled {
        self.current_batch.push(event);
        if self.current_batch.len() >= self.batch_size {
            self.trigger_batch();
            if let Some(timeout) = self.outstanding_timeout.take() {
                self.cancel_timer(timeout);
            }
            let new_timeout = self.schedule_once(self.timeout, Buncher::handle_timeout);
            self.outstanding_timeout = Some(new_timeout);
        }
        Handled::Ok
    }
}

pub fn main() {
    let mut conf = KompactConfig::default();
    conf.load_config_file("./application.conf")
        .load_config_str("buncher.batch-size = 50");
    let system = conf.build().expect("system");
    let printer = system.create(BatchPrinter::new);
    let buncher = system.create(Buncher::new);
    biconnect_components::<Batching, _, _>(&buncher, &printer).expect("connection");
    let batching = buncher.on_definition(|cd| cd.batch_port.share());

    system.start(&printer);
    system.start(&buncher);

    // these should usually trigger due to full batches
    let sleep_dur = Duration::from_millis(1);
    for i in 0..500 {
        let ping = Ping(i);
        system.trigger_r(ping, &batching);
        std::thread::sleep(sleep_dur);
    }

    // these should usually trigger due to timeout
    let sleep_dur = Duration::from_millis(2);
    for i in 0..500 {
        let ping = Ping(i);
        system.trigger_r(ping, &batching);
        std::thread::sleep(sleep_dur);
    }

    system.shutdown().expect("shutdown");
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_buncher() {
        main();
    }
}

And, of course, we must also update the matching create(...) call in the main function:

#![allow(clippy::unused_unit)]
use kompact::prelude::*;
use kompact_examples::batching::*;
use std::time::Duration;

#[derive(ComponentDefinition, Actor)]
struct Buncher {
    ctx: ComponentContext<Self>,
    batch_port: ProvidedPort<Batching>,
    batch_size: usize,
    timeout: Duration,
    current_batch: Vec<Ping>,
    outstanding_timeout: Option<ScheduledTimer>,
}

impl Buncher {
    fn new() -> Buncher {
        Buncher {
            ctx: ComponentContext::uninitialised(),
            batch_port: ProvidedPort::uninitialised(),
            batch_size: 0,
            timeout: Duration::from_millis(1),
            current_batch: Vec::new(),
            outstanding_timeout: None,
        }
    }


    fn trigger_batch(&mut self) -> () {
        let mut new_batch = Vec::with_capacity(self.batch_size);
        std::mem::swap(&mut new_batch, &mut self.current_batch);
        self.batch_port.trigger(Batch(new_batch))
    }

    fn handle_timeout(&mut self, timeout_id: ScheduledTimer) -> Handled {
        match self.outstanding_timeout {
            Some(ref timeout) if *timeout == timeout_id => {
                self.trigger_batch();
                let new_timeout = self.schedule_once(self.timeout, Self::handle_timeout);
                self.outstanding_timeout = Some(new_timeout);
                Handled::Ok
            }
            Some(_) => Handled::Ok, // just ignore outdated timeouts
            None => {
                warn!(self.log(), "Got unexpected timeout: {:?}", timeout_id);
                Handled::Ok
            } // can happen during restart or teardown
        }
    }
}

impl ComponentLifecycle for Buncher {
    fn on_start(&mut self) -> Handled {
        self.batch_size = self.ctx.config()["buncher"]["batch-size"]
            .as_i64()
            .expect("batch size") as usize;
        self.timeout = self.ctx.config()["buncher"]["timeout"]
            .as_duration()
            .expect("timeout");
        self.current_batch.reserve(self.batch_size);
        let timeout = self.schedule_once(self.timeout, Buncher::handle_timeout);
        self.outstanding_timeout = Some(timeout);
        Handled::Ok
    }


    fn on_stop(&mut self) -> Handled {
        if let Some(timeout) = self.outstanding_timeout.take() {
            self.cancel_timer(timeout);
        }
        Handled::Ok
    }

    fn on_kill(&mut self) -> Handled {
        self.on_stop()
    }
}

impl Provide<Batching> for Buncher {
    fn handle(&mut self, event: Ping) -> Handled {
        self.current_batch.push(event);
        if self.current_batch.len() >= self.batch_size {
            self.trigger_batch();
            if let Some(timeout) = self.outstanding_timeout.take() {
                self.cancel_timer(timeout);
            }
            let new_timeout = self.schedule_once(self.timeout, Buncher::handle_timeout);
            self.outstanding_timeout = Some(new_timeout);
        }
        Handled::Ok
    }
}

pub fn main() {
    let mut conf = KompactConfig::default();
    conf.load_config_file("./application.conf")
        .load_config_str("buncher.batch-size = 50");
    let system = conf.build().expect("system");
    let printer = system.create(BatchPrinter::new);
    let buncher = system.create(Buncher::new);
    biconnect_components::<Batching, _, _>(&buncher, &printer).expect("connection");
    let batching = buncher.on_definition(|cd| cd.batch_port.share());

    system.start(&printer);
    system.start(&buncher);

    // these should usually trigger due to full batches
    let sleep_dur = Duration::from_millis(1);
    for i in 0..500 {
        let ping = Ping(i);
        system.trigger_r(ping, &batching);
        std::thread::sleep(sleep_dur);
    }

    // these should usually trigger due to timeout
    let sleep_dur = Duration::from_millis(2);
    for i in 0..500 {
        let ping = Ping(i);
        system.trigger_r(ping, &batching);
        std::thread::sleep(sleep_dur);
    }

    system.shutdown().expect("shutdown");
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_buncher() {
        main();
    }
}

Finally, the actual config access happens in the on_startcode. At this point the component is properly initialised and we have acceess to configuration values. The Hocon type has a bunch of very convenient conversion functions, so we can get a Duration directly from the 100 ms string in the file, for example. Once we have read the values for batch_size and timeout, we can also go ahead and reserve the required additional space in the current_batch vector.

#![allow(clippy::unused_unit)]
use kompact::prelude::*;
use kompact_examples::batching::*;
use std::time::Duration;

#[derive(ComponentDefinition, Actor)]
struct Buncher {
    ctx: ComponentContext<Self>,
    batch_port: ProvidedPort<Batching>,
    batch_size: usize,
    timeout: Duration,
    current_batch: Vec<Ping>,
    outstanding_timeout: Option<ScheduledTimer>,
}

impl Buncher {
    fn new() -> Buncher {
        Buncher {
            ctx: ComponentContext::uninitialised(),
            batch_port: ProvidedPort::uninitialised(),
            batch_size: 0,
            timeout: Duration::from_millis(1),
            current_batch: Vec::new(),
            outstanding_timeout: None,
        }
    }


    fn trigger_batch(&mut self) -> () {
        let mut new_batch = Vec::with_capacity(self.batch_size);
        std::mem::swap(&mut new_batch, &mut self.current_batch);
        self.batch_port.trigger(Batch(new_batch))
    }

    fn handle_timeout(&mut self, timeout_id: ScheduledTimer) -> Handled {
        match self.outstanding_timeout {
            Some(ref timeout) if *timeout == timeout_id => {
                self.trigger_batch();
                let new_timeout = self.schedule_once(self.timeout, Self::handle_timeout);
                self.outstanding_timeout = Some(new_timeout);
                Handled::Ok
            }
            Some(_) => Handled::Ok, // just ignore outdated timeouts
            None => {
                warn!(self.log(), "Got unexpected timeout: {:?}", timeout_id);
                Handled::Ok
            } // can happen during restart or teardown
        }
    }
}

impl ComponentLifecycle for Buncher {
    fn on_start(&mut self) -> Handled {
        self.batch_size = self.ctx.config()["buncher"]["batch-size"]
            .as_i64()
            .expect("batch size") as usize;
        self.timeout = self.ctx.config()["buncher"]["timeout"]
            .as_duration()
            .expect("timeout");
        self.current_batch.reserve(self.batch_size);
        let timeout = self.schedule_once(self.timeout, Buncher::handle_timeout);
        self.outstanding_timeout = Some(timeout);
        Handled::Ok
    }


    fn on_stop(&mut self) -> Handled {
        if let Some(timeout) = self.outstanding_timeout.take() {
            self.cancel_timer(timeout);
        }
        Handled::Ok
    }

    fn on_kill(&mut self) -> Handled {
        self.on_stop()
    }
}

impl Provide<Batching> for Buncher {
    fn handle(&mut self, event: Ping) -> Handled {
        self.current_batch.push(event);
        if self.current_batch.len() >= self.batch_size {
            self.trigger_batch();
            if let Some(timeout) = self.outstanding_timeout.take() {
                self.cancel_timer(timeout);
            }
            let new_timeout = self.schedule_once(self.timeout, Buncher::handle_timeout);
            self.outstanding_timeout = Some(new_timeout);
        }
        Handled::Ok
    }
}

pub fn main() {
    let mut conf = KompactConfig::default();
    conf.load_config_file("./application.conf")
        .load_config_str("buncher.batch-size = 50");
    let system = conf.build().expect("system");
    let printer = system.create(BatchPrinter::new);
    let buncher = system.create(Buncher::new);
    biconnect_components::<Batching, _, _>(&buncher, &printer).expect("connection");
    let batching = buncher.on_definition(|cd| cd.batch_port.share());

    system.start(&printer);
    system.start(&buncher);

    // these should usually trigger due to full batches
    let sleep_dur = Duration::from_millis(1);
    for i in 0..500 {
        let ping = Ping(i);
        system.trigger_r(ping, &batching);
        std::thread::sleep(sleep_dur);
    }

    // these should usually trigger due to timeout
    let sleep_dur = Duration::from_millis(2);
    for i in 0..500 {
        let ping = Ping(i);
        system.trigger_r(ping, &batching);
        std::thread::sleep(sleep_dur);
    }

    system.shutdown().expect("shutdown");
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_buncher() {
        main();
    }
}

At this point we can run the example, and we can see from the regular “50 event”-sized batches in the beginning that our overriding of the batch size worked just fine.

Note: As before, if you have checked out the examples folder you can run the concrete binary with:

cargo run --release --bin buncher_config