Configuration
Since it is often inconvenient to pass around a large number of parameters when setting up a component system, Kompact also offers a configuration system allowing parameters to be loaded from a file or provided as a large string at the top level, for example. This system is powered by the Hocon crate and uses its APIs with very little additional support.
Configuration options must be set on the KompactConfig
instance before the system is started and the resulting configuration remains immutable for the lifetime of the system. A configuration can be loaded from a file by passing a path to the file to the load_config_file(...)
function. Alternatively, configuration values can be loaded directly from a string using load_config_str(...)
.
Within each component the Hocon configuration instance can be accessed via the context and individual keys via bracket notation, e.g. self.ctx.config()["my-key"]
. The configuration can also be accessed outside a component via KompactSystem::config()
.
In addition to component configuration, many parts of Kompact’s runtime can also be configured via this mechanism. The complete set of available configuration keys and their effects is described in the modules below kompact::config_keys.
Example
We are going to reuse the Buncher
from the timers section and pass its two parameters, batch_size
and timeout
, via configuration instead of the constructor.
We’ll start off by creating a configuration file application.conf
in the working directory, so its easy to find later. Something like this:
buncher {
batch-size = 100
timeout = 100 ms
}
omega {
initial-period = 10 ms
delta = 1 ms
}
We can then add this file to the KompicsConfig
instance using the load_config_file(...)
function:
#![allow(clippy::unused_unit)]
use kompact::prelude::*;
use kompact_examples::batching::*;
use std::time::Duration;
#[derive(ComponentDefinition, Actor)]
struct Buncher {
ctx: ComponentContext<Self>,
batch_port: ProvidedPort<Batching>,
batch_size: usize,
timeout: Duration,
current_batch: Vec<Ping>,
outstanding_timeout: Option<ScheduledTimer>,
}
impl Buncher {
fn new() -> Buncher {
Buncher {
ctx: ComponentContext::uninitialised(),
batch_port: ProvidedPort::uninitialised(),
batch_size: 0,
timeout: Duration::from_millis(1),
current_batch: Vec::new(),
outstanding_timeout: None,
}
}
fn trigger_batch(&mut self) -> () {
let mut new_batch = Vec::with_capacity(self.batch_size);
std::mem::swap(&mut new_batch, &mut self.current_batch);
self.batch_port.trigger(Batch(new_batch))
}
fn handle_timeout(&mut self, timeout_id: ScheduledTimer) -> Handled {
match self.outstanding_timeout {
Some(ref timeout) if *timeout == timeout_id => {
self.trigger_batch();
let new_timeout = self.schedule_once(self.timeout, Self::handle_timeout);
self.outstanding_timeout = Some(new_timeout);
Handled::Ok
}
Some(_) => Handled::Ok, // just ignore outdated timeouts
None => {
warn!(self.log(), "Got unexpected timeout: {:?}", timeout_id);
Handled::Ok
} // can happen during restart or teardown
}
}
}
impl ComponentLifecycle for Buncher {
fn on_start(&mut self) -> Handled {
self.batch_size = self.ctx.config()["buncher"]["batch-size"]
.as_i64()
.expect("batch size") as usize;
self.timeout = self.ctx.config()["buncher"]["timeout"]
.as_duration()
.expect("timeout");
self.current_batch.reserve(self.batch_size);
let timeout = self.schedule_once(self.timeout, Buncher::handle_timeout);
self.outstanding_timeout = Some(timeout);
Handled::Ok
}
fn on_stop(&mut self) -> Handled {
if let Some(timeout) = self.outstanding_timeout.take() {
self.cancel_timer(timeout);
}
Handled::Ok
}
fn on_kill(&mut self) -> Handled {
self.on_stop()
}
}
impl Provide<Batching> for Buncher {
fn handle(&mut self, event: Ping) -> Handled {
self.current_batch.push(event);
if self.current_batch.len() >= self.batch_size {
self.trigger_batch();
if let Some(timeout) = self.outstanding_timeout.take() {
self.cancel_timer(timeout);
}
let new_timeout = self.schedule_once(self.timeout, Buncher::handle_timeout);
self.outstanding_timeout = Some(new_timeout);
}
Handled::Ok
}
}
pub fn main() {
let mut conf = KompactConfig::default();
conf.load_config_file("./application.conf")
.load_config_str("buncher.batch-size = 50");
let system = conf.build().expect("system");
let printer = system.create(BatchPrinter::new);
let buncher = system.create(Buncher::new);
biconnect_components::<Batching, _, _>(&buncher, &printer).expect("connection");
let batching = buncher.on_definition(|cd| cd.batch_port.share());
system.start(&printer);
system.start(&buncher);
// these should usually trigger due to full batches
let sleep_dur = Duration::from_millis(1);
for i in 0..500 {
let ping = Ping(i);
system.trigger_r(ping, &batching);
std::thread::sleep(sleep_dur);
}
// these should usually trigger due to timeout
let sleep_dur = Duration::from_millis(2);
for i in 0..500 {
let ping = Ping(i);
system.trigger_r(ping, &batching);
std::thread::sleep(sleep_dur);
}
system.shutdown().expect("shutdown");
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_buncher() {
main();
}
}
To show off how multiple configuration sources can be combined, we will override the batch-size
value from the main function with a literal string, after the file has been loaded:
#![allow(clippy::unused_unit)]
use kompact::prelude::*;
use kompact_examples::batching::*;
use std::time::Duration;
#[derive(ComponentDefinition, Actor)]
struct Buncher {
ctx: ComponentContext<Self>,
batch_port: ProvidedPort<Batching>,
batch_size: usize,
timeout: Duration,
current_batch: Vec<Ping>,
outstanding_timeout: Option<ScheduledTimer>,
}
impl Buncher {
fn new() -> Buncher {
Buncher {
ctx: ComponentContext::uninitialised(),
batch_port: ProvidedPort::uninitialised(),
batch_size: 0,
timeout: Duration::from_millis(1),
current_batch: Vec::new(),
outstanding_timeout: None,
}
}
fn trigger_batch(&mut self) -> () {
let mut new_batch = Vec::with_capacity(self.batch_size);
std::mem::swap(&mut new_batch, &mut self.current_batch);
self.batch_port.trigger(Batch(new_batch))
}
fn handle_timeout(&mut self, timeout_id: ScheduledTimer) -> Handled {
match self.outstanding_timeout {
Some(ref timeout) if *timeout == timeout_id => {
self.trigger_batch();
let new_timeout = self.schedule_once(self.timeout, Self::handle_timeout);
self.outstanding_timeout = Some(new_timeout);
Handled::Ok
}
Some(_) => Handled::Ok, // just ignore outdated timeouts
None => {
warn!(self.log(), "Got unexpected timeout: {:?}", timeout_id);
Handled::Ok
} // can happen during restart or teardown
}
}
}
impl ComponentLifecycle for Buncher {
fn on_start(&mut self) -> Handled {
self.batch_size = self.ctx.config()["buncher"]["batch-size"]
.as_i64()
.expect("batch size") as usize;
self.timeout = self.ctx.config()["buncher"]["timeout"]
.as_duration()
.expect("timeout");
self.current_batch.reserve(self.batch_size);
let timeout = self.schedule_once(self.timeout, Buncher::handle_timeout);
self.outstanding_timeout = Some(timeout);
Handled::Ok
}
fn on_stop(&mut self) -> Handled {
if let Some(timeout) = self.outstanding_timeout.take() {
self.cancel_timer(timeout);
}
Handled::Ok
}
fn on_kill(&mut self) -> Handled {
self.on_stop()
}
}
impl Provide<Batching> for Buncher {
fn handle(&mut self, event: Ping) -> Handled {
self.current_batch.push(event);
if self.current_batch.len() >= self.batch_size {
self.trigger_batch();
if let Some(timeout) = self.outstanding_timeout.take() {
self.cancel_timer(timeout);
}
let new_timeout = self.schedule_once(self.timeout, Buncher::handle_timeout);
self.outstanding_timeout = Some(new_timeout);
}
Handled::Ok
}
}
pub fn main() {
let mut conf = KompactConfig::default();
conf.load_config_file("./application.conf")
.load_config_str("buncher.batch-size = 50");
let system = conf.build().expect("system");
let printer = system.create(BatchPrinter::new);
let buncher = system.create(Buncher::new);
biconnect_components::<Batching, _, _>(&buncher, &printer).expect("connection");
let batching = buncher.on_definition(|cd| cd.batch_port.share());
system.start(&printer);
system.start(&buncher);
// these should usually trigger due to full batches
let sleep_dur = Duration::from_millis(1);
for i in 0..500 {
let ping = Ping(i);
system.trigger_r(ping, &batching);
std::thread::sleep(sleep_dur);
}
// these should usually trigger due to timeout
let sleep_dur = Duration::from_millis(2);
for i in 0..500 {
let ping = Ping(i);
system.trigger_r(ping, &batching);
std::thread::sleep(sleep_dur);
}
system.shutdown().expect("shutdown");
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_buncher() {
main();
}
}
Now we change the Buncher
constructor to not take any arguments anymore. Since we still need to put some values into the struct fields, let’s put some default values, say batch size of 0 and a timeout of 1ms. We could also go with an Option
, if it’s important to know whether the component was initialised properly nor not. We also don’t know the required capacity for the vector anymore, so we just create an empty one, and extend it later once we have read the batch size from the config file.
#![allow(clippy::unused_unit)]
use kompact::prelude::*;
use kompact_examples::batching::*;
use std::time::Duration;
#[derive(ComponentDefinition, Actor)]
struct Buncher {
ctx: ComponentContext<Self>,
batch_port: ProvidedPort<Batching>,
batch_size: usize,
timeout: Duration,
current_batch: Vec<Ping>,
outstanding_timeout: Option<ScheduledTimer>,
}
impl Buncher {
fn new() -> Buncher {
Buncher {
ctx: ComponentContext::uninitialised(),
batch_port: ProvidedPort::uninitialised(),
batch_size: 0,
timeout: Duration::from_millis(1),
current_batch: Vec::new(),
outstanding_timeout: None,
}
}
fn trigger_batch(&mut self) -> () {
let mut new_batch = Vec::with_capacity(self.batch_size);
std::mem::swap(&mut new_batch, &mut self.current_batch);
self.batch_port.trigger(Batch(new_batch))
}
fn handle_timeout(&mut self, timeout_id: ScheduledTimer) -> Handled {
match self.outstanding_timeout {
Some(ref timeout) if *timeout == timeout_id => {
self.trigger_batch();
let new_timeout = self.schedule_once(self.timeout, Self::handle_timeout);
self.outstanding_timeout = Some(new_timeout);
Handled::Ok
}
Some(_) => Handled::Ok, // just ignore outdated timeouts
None => {
warn!(self.log(), "Got unexpected timeout: {:?}", timeout_id);
Handled::Ok
} // can happen during restart or teardown
}
}
}
impl ComponentLifecycle for Buncher {
fn on_start(&mut self) -> Handled {
self.batch_size = self.ctx.config()["buncher"]["batch-size"]
.as_i64()
.expect("batch size") as usize;
self.timeout = self.ctx.config()["buncher"]["timeout"]
.as_duration()
.expect("timeout");
self.current_batch.reserve(self.batch_size);
let timeout = self.schedule_once(self.timeout, Buncher::handle_timeout);
self.outstanding_timeout = Some(timeout);
Handled::Ok
}
fn on_stop(&mut self) -> Handled {
if let Some(timeout) = self.outstanding_timeout.take() {
self.cancel_timer(timeout);
}
Handled::Ok
}
fn on_kill(&mut self) -> Handled {
self.on_stop()
}
}
impl Provide<Batching> for Buncher {
fn handle(&mut self, event: Ping) -> Handled {
self.current_batch.push(event);
if self.current_batch.len() >= self.batch_size {
self.trigger_batch();
if let Some(timeout) = self.outstanding_timeout.take() {
self.cancel_timer(timeout);
}
let new_timeout = self.schedule_once(self.timeout, Buncher::handle_timeout);
self.outstanding_timeout = Some(new_timeout);
}
Handled::Ok
}
}
pub fn main() {
let mut conf = KompactConfig::default();
conf.load_config_file("./application.conf")
.load_config_str("buncher.batch-size = 50");
let system = conf.build().expect("system");
let printer = system.create(BatchPrinter::new);
let buncher = system.create(Buncher::new);
biconnect_components::<Batching, _, _>(&buncher, &printer).expect("connection");
let batching = buncher.on_definition(|cd| cd.batch_port.share());
system.start(&printer);
system.start(&buncher);
// these should usually trigger due to full batches
let sleep_dur = Duration::from_millis(1);
for i in 0..500 {
let ping = Ping(i);
system.trigger_r(ping, &batching);
std::thread::sleep(sleep_dur);
}
// these should usually trigger due to timeout
let sleep_dur = Duration::from_millis(2);
for i in 0..500 {
let ping = Ping(i);
system.trigger_r(ping, &batching);
std::thread::sleep(sleep_dur);
}
system.shutdown().expect("shutdown");
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_buncher() {
main();
}
}
And, of course, we must also update the matching create(...)
call in the main function:
#![allow(clippy::unused_unit)]
use kompact::prelude::*;
use kompact_examples::batching::*;
use std::time::Duration;
#[derive(ComponentDefinition, Actor)]
struct Buncher {
ctx: ComponentContext<Self>,
batch_port: ProvidedPort<Batching>,
batch_size: usize,
timeout: Duration,
current_batch: Vec<Ping>,
outstanding_timeout: Option<ScheduledTimer>,
}
impl Buncher {
fn new() -> Buncher {
Buncher {
ctx: ComponentContext::uninitialised(),
batch_port: ProvidedPort::uninitialised(),
batch_size: 0,
timeout: Duration::from_millis(1),
current_batch: Vec::new(),
outstanding_timeout: None,
}
}
fn trigger_batch(&mut self) -> () {
let mut new_batch = Vec::with_capacity(self.batch_size);
std::mem::swap(&mut new_batch, &mut self.current_batch);
self.batch_port.trigger(Batch(new_batch))
}
fn handle_timeout(&mut self, timeout_id: ScheduledTimer) -> Handled {
match self.outstanding_timeout {
Some(ref timeout) if *timeout == timeout_id => {
self.trigger_batch();
let new_timeout = self.schedule_once(self.timeout, Self::handle_timeout);
self.outstanding_timeout = Some(new_timeout);
Handled::Ok
}
Some(_) => Handled::Ok, // just ignore outdated timeouts
None => {
warn!(self.log(), "Got unexpected timeout: {:?}", timeout_id);
Handled::Ok
} // can happen during restart or teardown
}
}
}
impl ComponentLifecycle for Buncher {
fn on_start(&mut self) -> Handled {
self.batch_size = self.ctx.config()["buncher"]["batch-size"]
.as_i64()
.expect("batch size") as usize;
self.timeout = self.ctx.config()["buncher"]["timeout"]
.as_duration()
.expect("timeout");
self.current_batch.reserve(self.batch_size);
let timeout = self.schedule_once(self.timeout, Buncher::handle_timeout);
self.outstanding_timeout = Some(timeout);
Handled::Ok
}
fn on_stop(&mut self) -> Handled {
if let Some(timeout) = self.outstanding_timeout.take() {
self.cancel_timer(timeout);
}
Handled::Ok
}
fn on_kill(&mut self) -> Handled {
self.on_stop()
}
}
impl Provide<Batching> for Buncher {
fn handle(&mut self, event: Ping) -> Handled {
self.current_batch.push(event);
if self.current_batch.len() >= self.batch_size {
self.trigger_batch();
if let Some(timeout) = self.outstanding_timeout.take() {
self.cancel_timer(timeout);
}
let new_timeout = self.schedule_once(self.timeout, Buncher::handle_timeout);
self.outstanding_timeout = Some(new_timeout);
}
Handled::Ok
}
}
pub fn main() {
let mut conf = KompactConfig::default();
conf.load_config_file("./application.conf")
.load_config_str("buncher.batch-size = 50");
let system = conf.build().expect("system");
let printer = system.create(BatchPrinter::new);
let buncher = system.create(Buncher::new);
biconnect_components::<Batching, _, _>(&buncher, &printer).expect("connection");
let batching = buncher.on_definition(|cd| cd.batch_port.share());
system.start(&printer);
system.start(&buncher);
// these should usually trigger due to full batches
let sleep_dur = Duration::from_millis(1);
for i in 0..500 {
let ping = Ping(i);
system.trigger_r(ping, &batching);
std::thread::sleep(sleep_dur);
}
// these should usually trigger due to timeout
let sleep_dur = Duration::from_millis(2);
for i in 0..500 {
let ping = Ping(i);
system.trigger_r(ping, &batching);
std::thread::sleep(sleep_dur);
}
system.shutdown().expect("shutdown");
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_buncher() {
main();
}
}
Finally, the actual config access happens in the on_start
code. At this point the component is properly initialised and we have acceess to configuration values. The Hocon type has a bunch of very convenient conversion functions, so we can get a Duration
directly from the 100 ms
string in the file, for example. Once we have read the values for batch_size
and timeout
, we can also go ahead and reserve the required additional space in the current_batch
vector.
#![allow(clippy::unused_unit)]
use kompact::prelude::*;
use kompact_examples::batching::*;
use std::time::Duration;
#[derive(ComponentDefinition, Actor)]
struct Buncher {
ctx: ComponentContext<Self>,
batch_port: ProvidedPort<Batching>,
batch_size: usize,
timeout: Duration,
current_batch: Vec<Ping>,
outstanding_timeout: Option<ScheduledTimer>,
}
impl Buncher {
fn new() -> Buncher {
Buncher {
ctx: ComponentContext::uninitialised(),
batch_port: ProvidedPort::uninitialised(),
batch_size: 0,
timeout: Duration::from_millis(1),
current_batch: Vec::new(),
outstanding_timeout: None,
}
}
fn trigger_batch(&mut self) -> () {
let mut new_batch = Vec::with_capacity(self.batch_size);
std::mem::swap(&mut new_batch, &mut self.current_batch);
self.batch_port.trigger(Batch(new_batch))
}
fn handle_timeout(&mut self, timeout_id: ScheduledTimer) -> Handled {
match self.outstanding_timeout {
Some(ref timeout) if *timeout == timeout_id => {
self.trigger_batch();
let new_timeout = self.schedule_once(self.timeout, Self::handle_timeout);
self.outstanding_timeout = Some(new_timeout);
Handled::Ok
}
Some(_) => Handled::Ok, // just ignore outdated timeouts
None => {
warn!(self.log(), "Got unexpected timeout: {:?}", timeout_id);
Handled::Ok
} // can happen during restart or teardown
}
}
}
impl ComponentLifecycle for Buncher {
fn on_start(&mut self) -> Handled {
self.batch_size = self.ctx.config()["buncher"]["batch-size"]
.as_i64()
.expect("batch size") as usize;
self.timeout = self.ctx.config()["buncher"]["timeout"]
.as_duration()
.expect("timeout");
self.current_batch.reserve(self.batch_size);
let timeout = self.schedule_once(self.timeout, Buncher::handle_timeout);
self.outstanding_timeout = Some(timeout);
Handled::Ok
}
fn on_stop(&mut self) -> Handled {
if let Some(timeout) = self.outstanding_timeout.take() {
self.cancel_timer(timeout);
}
Handled::Ok
}
fn on_kill(&mut self) -> Handled {
self.on_stop()
}
}
impl Provide<Batching> for Buncher {
fn handle(&mut self, event: Ping) -> Handled {
self.current_batch.push(event);
if self.current_batch.len() >= self.batch_size {
self.trigger_batch();
if let Some(timeout) = self.outstanding_timeout.take() {
self.cancel_timer(timeout);
}
let new_timeout = self.schedule_once(self.timeout, Buncher::handle_timeout);
self.outstanding_timeout = Some(new_timeout);
}
Handled::Ok
}
}
pub fn main() {
let mut conf = KompactConfig::default();
conf.load_config_file("./application.conf")
.load_config_str("buncher.batch-size = 50");
let system = conf.build().expect("system");
let printer = system.create(BatchPrinter::new);
let buncher = system.create(Buncher::new);
biconnect_components::<Batching, _, _>(&buncher, &printer).expect("connection");
let batching = buncher.on_definition(|cd| cd.batch_port.share());
system.start(&printer);
system.start(&buncher);
// these should usually trigger due to full batches
let sleep_dur = Duration::from_millis(1);
for i in 0..500 {
let ping = Ping(i);
system.trigger_r(ping, &batching);
std::thread::sleep(sleep_dur);
}
// these should usually trigger due to timeout
let sleep_dur = Duration::from_millis(2);
for i in 0..500 {
let ping = Ping(i);
system.trigger_r(ping, &batching);
std::thread::sleep(sleep_dur);
}
system.shutdown().expect("shutdown");
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_buncher() {
main();
}
}
At this point we can run the example, and we can see from the regular “50 event”-sized batches in the beginning that our overriding of the batch size worked just fine.
Note: As before, if you have checked out the examples folder you can run the concrete binary with:
cargo run --release --bin buncher_config