Dynamic Components

Kompact is a very strictly and statically typed framework. Sometimes, however, it is beneficial to be a little more dynamic. There are many reasons you might want to introduce some dynamism into your component system: modularity, ease of modeling, or sometimes even performance: static dispatch in Rust often involves monomorphising substantial amounts of generic code, which leads to code bloat. The more instructions the CPU has to load the more likely it is that something won’t fit in the cache, which can incur performance penalties.

Because of this, we introduced a way to deal with components with a little bit of dynamic typing. Namely, you’re able to create components from type-erased definitions with {System,SystemHandle}::create_erased (nightly only), and query type-erased components for ports they may provide and/or require with on_dyn_definition and get_{provided,required}_port.

Note: While creating type-erased components from type-erased definitions is nightly-only, you can create component just normally and then cast it to a type-erased component on stable.

Let’s create a dynamic interactive system showcasing these features. We’ll build a little REPL which the user can use to spawn some components, set their settings, and send them some data to process.

First some basic components:

#![allow(clippy::unused_unit)]

use kompact::{component::AbstractComponent, prelude::*};
use std::{
    error::Error,
    fmt,
    io::{stdin, BufRead},
    sync::Arc,
};

#[derive(ComponentDefinition)]
struct Adder {
    ctx: ComponentContext<Self>,
    offset: f32,
    set_offset: ProvidedPort<SetOffset>,
}
info_lifecycle!(Adder);

impl Actor for Adder {
    type Message = f32;

    fn receive_local(&mut self, a: Self::Message) -> Handled {
        let res = a + self.offset;
        info!(self.log(), "Adder result = {}", res);
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!()
    }
}

struct SetOffset;

impl Port for SetOffset {
    type Indication = Never;
    type Request = f32;
}

impl Provide<SetOffset> for Adder {
    fn handle(&mut self, value: f32) -> Handled {
        self.offset = value;
        Handled::Ok
    }
}

impl Adder {
    pub fn new() -> Self {
        Adder {
            ctx: ComponentContext::uninitialised(),
            offset: 0f32,
            set_offset: ProvidedPort::uninitialised(),
        }
    }
}

#[derive(ComponentDefinition)]
struct Multiplier {
    ctx: ComponentContext<Self>,
    scale: f32,
    set_scale: ProvidedPort<SetScale>,
}
info_lifecycle!(Multiplier);

impl Actor for Multiplier {
    type Message = f32;

    fn receive_local(&mut self, a: Self::Message) -> Handled {
        let res = a * self.scale;
        info!(self.log(), "Multiplier result = {}", res);
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!()
    }
}

struct SetScale;

impl Port for SetScale {
    type Indication = Never;
    type Request = f32;
}

impl Provide<SetScale> for Multiplier {
    fn handle(&mut self, value: f32) -> Handled {
        self.scale = value;
        Handled::Ok
    }
}

impl Multiplier {
    fn new() -> Multiplier {
        Multiplier {
            ctx: ComponentContext::uninitialised(),
            scale: 1.0,
            set_scale: ProvidedPort::uninitialised(),
        }
    }
}

#[derive(ComponentDefinition)]
struct Linear {
    ctx: ComponentContext<Self>,
    scale: f32,
    offset: f32,
    set_scale: ProvidedPort<SetScale>,
    set_offset: ProvidedPort<SetOffset>,
}
info_lifecycle!(Linear);

impl Actor for Linear {
    type Message = f32;

    fn receive_local(&mut self, a: Self::Message) -> Handled {
        let res = a * self.scale + self.offset;
        info!(self.log(), "Linear result = {}", res);
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!()
    }
}

impl Provide<SetOffset> for Linear {
    fn handle(&mut self, value: f32) -> Handled {
        self.offset = value;
        Handled::Ok
    }
}

impl Provide<SetScale> for Linear {
    fn handle(&mut self, value: f32) -> Handled {
        self.scale = value;
        Handled::Ok
    }
}

impl Linear {
    fn new() -> Linear {
        Linear {
            ctx: ComponentContext::uninitialised(),
            scale: 1.0,
            offset: 0.0,
            set_scale: ProvidedPort::uninitialised(),
            set_offset: ProvidedPort::uninitialised(),
        }
    }
}

#[derive(ComponentDefinition)]
struct DynamicManager {
    ctx: ComponentContext<Self>,
    arithmetic_units: Vec<Arc<dyn AbstractComponent<Message = f32>>>,
    set_offsets: RequiredPort<SetOffset>,
    set_scales: RequiredPort<SetScale>,
}

ignore_indications!(SetOffset, DynamicManager);
ignore_indications!(SetScale, DynamicManager);
ignore_lifecycle!(DynamicManager);

enum ManagerMessage {
    Spawn(Box<dyn CreateErased<f32> + Send>),
    Compute(f32),
    SetScales(f32),
    SetOffsets(f32),
    KillAll,
    Quit,
}

impl fmt::Debug for ManagerMessage {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            ManagerMessage::Spawn(_) => {
                write!(f, "Spawn(_)")
            }
            ManagerMessage::Compute(val) => {
                write!(f, "Compute({})", *val)
            }
            ManagerMessage::SetScales(scale) => {
                write!(f, "SetScales({})", *scale)
            }
            ManagerMessage::SetOffsets(offset) => {
                write!(f, "SetOffsets({})", *offset)
            }
            ManagerMessage::KillAll => {
                write!(f, "KillAll")
            }
            ManagerMessage::Quit => {
                write!(f, "Quit")
            }
        }
    }
}

impl Actor for DynamicManager {
    type Message = ManagerMessage;

    fn receive_local(&mut self, msg: ManagerMessage) -> Handled {
        match msg {
            ManagerMessage::Spawn(definition) => {
                let system = self.ctx.system();
                let component = system.create_erased(definition);
                component.on_dyn_definition(|def| {
                    if let Some(set_scale) = def.get_provided_port::<SetScale>() {
                        biconnect_ports(set_scale, &mut self.set_scales);
                    }
                    if let Some(set_offset) = def.get_provided_port::<SetOffset>() {
                        biconnect_ports(set_offset, &mut self.set_offsets);
                    }
                });
                system.start(&component);

                self.arithmetic_units.push(component);
            }
            ManagerMessage::Compute(val) => {
                for unit in &self.arithmetic_units {
                    unit.actor_ref().tell(val);
                }
            }
            ManagerMessage::SetScales(scale) => self.set_scales.trigger(scale),
            ManagerMessage::SetOffsets(offset) => self.set_offsets.trigger(offset),
            ManagerMessage::KillAll => {
                self.kill_all();
            }
            ManagerMessage::Quit => {
                self.kill_all();
                self.ctx.system().shutdown_async();
            }
        }

        Handled::Ok
    }

    fn receive_network(&mut self, _: NetMessage) -> Handled {
        unimplemented!()
    }
}

impl DynamicManager {
    fn kill_all(&mut self) {
        let system = self.ctx.system();
        for unit in self.arithmetic_units.drain(..) {
            system.kill(unit);
        }
    }
}

fn main() {
    let system = KompactConfig::default().build().expect("system");
    let manager: Arc<Component<DynamicManager>> = system.create(|| DynamicManager {
        ctx: ComponentContext::uninitialised(),
        arithmetic_units: vec![],
        set_offsets: RequiredPort::uninitialised(),
        set_scales: RequiredPort::uninitialised(),
    });
    system.start(&manager);
    let manager_ref = manager.actor_ref();

    std::thread::spawn(move || {
        for line in stdin().lock().lines() {
            let res = (|| -> Result<(), Box<dyn Error>> {
                let line = line?;

                let message = match line.trim() {
                    "spawn adder" => ManagerMessage::Spawn(Box::new(Adder::new())),
                    "spawn multiplier" => ManagerMessage::Spawn(Box::new(Multiplier::new())),
                    "spawn linear" => ManagerMessage::Spawn(Box::new(Linear::new())),
                    "kill all" => ManagerMessage::KillAll,
                    "quit" => ManagerMessage::Quit,
                    other => {
                        if let Some(offset) = other.strip_prefix("set offset ") {
                            ManagerMessage::SetOffsets(offset.parse()?)
                        } else if let Some(scale) = other.strip_prefix("set scale ") {
                            ManagerMessage::SetScales(scale.parse()?)
                        } else if let Some(val) = other.strip_prefix("compute ") {
                            ManagerMessage::Compute(val.parse()?)
                        } else {
                            Err("unknown command!")?
                        }
                    }
                };

                manager_ref.tell(message);

                Ok(())
            })();

            if let Err(e) = res {
                println!("{}", e);
            }
        }
    });

    system.await_termination();
}

Our components perform simple arithmetic operations on the incoming message and log the results (as well as their lifecycle). The internal state of the components can be set via Set{Offset,Scale} ports. So far we just have components with either a scale or an offset. Let’s add something slightly more interesting, which uses both.

#![allow(clippy::unused_unit)]

use kompact::{component::AbstractComponent, prelude::*};
use std::{
    error::Error,
    fmt,
    io::{stdin, BufRead},
    sync::Arc,
};

#[derive(ComponentDefinition)]
struct Adder {
    ctx: ComponentContext<Self>,
    offset: f32,
    set_offset: ProvidedPort<SetOffset>,
}
info_lifecycle!(Adder);

impl Actor for Adder {
    type Message = f32;

    fn receive_local(&mut self, a: Self::Message) -> Handled {
        let res = a + self.offset;
        info!(self.log(), "Adder result = {}", res);
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!()
    }
}

struct SetOffset;

impl Port for SetOffset {
    type Indication = Never;
    type Request = f32;
}

impl Provide<SetOffset> for Adder {
    fn handle(&mut self, value: f32) -> Handled {
        self.offset = value;
        Handled::Ok
    }
}

impl Adder {
    pub fn new() -> Self {
        Adder {
            ctx: ComponentContext::uninitialised(),
            offset: 0f32,
            set_offset: ProvidedPort::uninitialised(),
        }
    }
}

#[derive(ComponentDefinition)]
struct Multiplier {
    ctx: ComponentContext<Self>,
    scale: f32,
    set_scale: ProvidedPort<SetScale>,
}
info_lifecycle!(Multiplier);

impl Actor for Multiplier {
    type Message = f32;

    fn receive_local(&mut self, a: Self::Message) -> Handled {
        let res = a * self.scale;
        info!(self.log(), "Multiplier result = {}", res);
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!()
    }
}

struct SetScale;

impl Port for SetScale {
    type Indication = Never;
    type Request = f32;
}

impl Provide<SetScale> for Multiplier {
    fn handle(&mut self, value: f32) -> Handled {
        self.scale = value;
        Handled::Ok
    }
}

impl Multiplier {
    fn new() -> Multiplier {
        Multiplier {
            ctx: ComponentContext::uninitialised(),
            scale: 1.0,
            set_scale: ProvidedPort::uninitialised(),
        }
    }
}

#[derive(ComponentDefinition)]
struct Linear {
    ctx: ComponentContext<Self>,
    scale: f32,
    offset: f32,
    set_scale: ProvidedPort<SetScale>,
    set_offset: ProvidedPort<SetOffset>,
}
info_lifecycle!(Linear);

impl Actor for Linear {
    type Message = f32;

    fn receive_local(&mut self, a: Self::Message) -> Handled {
        let res = a * self.scale + self.offset;
        info!(self.log(), "Linear result = {}", res);
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!()
    }
}

impl Provide<SetOffset> for Linear {
    fn handle(&mut self, value: f32) -> Handled {
        self.offset = value;
        Handled::Ok
    }
}

impl Provide<SetScale> for Linear {
    fn handle(&mut self, value: f32) -> Handled {
        self.scale = value;
        Handled::Ok
    }
}

impl Linear {
    fn new() -> Linear {
        Linear {
            ctx: ComponentContext::uninitialised(),
            scale: 1.0,
            offset: 0.0,
            set_scale: ProvidedPort::uninitialised(),
            set_offset: ProvidedPort::uninitialised(),
        }
    }
}

#[derive(ComponentDefinition)]
struct DynamicManager {
    ctx: ComponentContext<Self>,
    arithmetic_units: Vec<Arc<dyn AbstractComponent<Message = f32>>>,
    set_offsets: RequiredPort<SetOffset>,
    set_scales: RequiredPort<SetScale>,
}

ignore_indications!(SetOffset, DynamicManager);
ignore_indications!(SetScale, DynamicManager);
ignore_lifecycle!(DynamicManager);

enum ManagerMessage {
    Spawn(Box<dyn CreateErased<f32> + Send>),
    Compute(f32),
    SetScales(f32),
    SetOffsets(f32),
    KillAll,
    Quit,
}

impl fmt::Debug for ManagerMessage {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            ManagerMessage::Spawn(_) => {
                write!(f, "Spawn(_)")
            }
            ManagerMessage::Compute(val) => {
                write!(f, "Compute({})", *val)
            }
            ManagerMessage::SetScales(scale) => {
                write!(f, "SetScales({})", *scale)
            }
            ManagerMessage::SetOffsets(offset) => {
                write!(f, "SetOffsets({})", *offset)
            }
            ManagerMessage::KillAll => {
                write!(f, "KillAll")
            }
            ManagerMessage::Quit => {
                write!(f, "Quit")
            }
        }
    }
}

impl Actor for DynamicManager {
    type Message = ManagerMessage;

    fn receive_local(&mut self, msg: ManagerMessage) -> Handled {
        match msg {
            ManagerMessage::Spawn(definition) => {
                let system = self.ctx.system();
                let component = system.create_erased(definition);
                component.on_dyn_definition(|def| {
                    if let Some(set_scale) = def.get_provided_port::<SetScale>() {
                        biconnect_ports(set_scale, &mut self.set_scales);
                    }
                    if let Some(set_offset) = def.get_provided_port::<SetOffset>() {
                        biconnect_ports(set_offset, &mut self.set_offsets);
                    }
                });
                system.start(&component);

                self.arithmetic_units.push(component);
            }
            ManagerMessage::Compute(val) => {
                for unit in &self.arithmetic_units {
                    unit.actor_ref().tell(val);
                }
            }
            ManagerMessage::SetScales(scale) => self.set_scales.trigger(scale),
            ManagerMessage::SetOffsets(offset) => self.set_offsets.trigger(offset),
            ManagerMessage::KillAll => {
                self.kill_all();
            }
            ManagerMessage::Quit => {
                self.kill_all();
                self.ctx.system().shutdown_async();
            }
        }

        Handled::Ok
    }

    fn receive_network(&mut self, _: NetMessage) -> Handled {
        unimplemented!()
    }
}

impl DynamicManager {
    fn kill_all(&mut self) {
        let system = self.ctx.system();
        for unit in self.arithmetic_units.drain(..) {
            system.kill(unit);
        }
    }
}

fn main() {
    let system = KompactConfig::default().build().expect("system");
    let manager: Arc<Component<DynamicManager>> = system.create(|| DynamicManager {
        ctx: ComponentContext::uninitialised(),
        arithmetic_units: vec![],
        set_offsets: RequiredPort::uninitialised(),
        set_scales: RequiredPort::uninitialised(),
    });
    system.start(&manager);
    let manager_ref = manager.actor_ref();

    std::thread::spawn(move || {
        for line in stdin().lock().lines() {
            let res = (|| -> Result<(), Box<dyn Error>> {
                let line = line?;

                let message = match line.trim() {
                    "spawn adder" => ManagerMessage::Spawn(Box::new(Adder::new())),
                    "spawn multiplier" => ManagerMessage::Spawn(Box::new(Multiplier::new())),
                    "spawn linear" => ManagerMessage::Spawn(Box::new(Linear::new())),
                    "kill all" => ManagerMessage::KillAll,
                    "quit" => ManagerMessage::Quit,
                    other => {
                        if let Some(offset) = other.strip_prefix("set offset ") {
                            ManagerMessage::SetOffsets(offset.parse()?)
                        } else if let Some(scale) = other.strip_prefix("set scale ") {
                            ManagerMessage::SetScales(scale.parse()?)
                        } else if let Some(val) = other.strip_prefix("compute ") {
                            ManagerMessage::Compute(val.parse()?)
                        } else {
                            Err("unknown command!")?
                        }
                    }
                };

                manager_ref.tell(message);

                Ok(())
            })();

            if let Err(e) = res {
                println!("{}", e);
            }
        }
    });

    system.await_termination();
}

Now let’s write a manager component, which will take care of creating the components described above, killing them, modifying their settings, and sending them data to process. In this case we have just three different types of worker components, but imagine we had tens (still sharing the same message type and some subsets of “settings”). In that case it would be very tedious to manage all these component types explicitly.

#![allow(clippy::unused_unit)]

use kompact::{component::AbstractComponent, prelude::*};
use std::{
    error::Error,
    fmt,
    io::{stdin, BufRead},
    sync::Arc,
};

#[derive(ComponentDefinition)]
struct Adder {
    ctx: ComponentContext<Self>,
    offset: f32,
    set_offset: ProvidedPort<SetOffset>,
}
info_lifecycle!(Adder);

impl Actor for Adder {
    type Message = f32;

    fn receive_local(&mut self, a: Self::Message) -> Handled {
        let res = a + self.offset;
        info!(self.log(), "Adder result = {}", res);
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!()
    }
}

struct SetOffset;

impl Port for SetOffset {
    type Indication = Never;
    type Request = f32;
}

impl Provide<SetOffset> for Adder {
    fn handle(&mut self, value: f32) -> Handled {
        self.offset = value;
        Handled::Ok
    }
}

impl Adder {
    pub fn new() -> Self {
        Adder {
            ctx: ComponentContext::uninitialised(),
            offset: 0f32,
            set_offset: ProvidedPort::uninitialised(),
        }
    }
}

#[derive(ComponentDefinition)]
struct Multiplier {
    ctx: ComponentContext<Self>,
    scale: f32,
    set_scale: ProvidedPort<SetScale>,
}
info_lifecycle!(Multiplier);

impl Actor for Multiplier {
    type Message = f32;

    fn receive_local(&mut self, a: Self::Message) -> Handled {
        let res = a * self.scale;
        info!(self.log(), "Multiplier result = {}", res);
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!()
    }
}

struct SetScale;

impl Port for SetScale {
    type Indication = Never;
    type Request = f32;
}

impl Provide<SetScale> for Multiplier {
    fn handle(&mut self, value: f32) -> Handled {
        self.scale = value;
        Handled::Ok
    }
}

impl Multiplier {
    fn new() -> Multiplier {
        Multiplier {
            ctx: ComponentContext::uninitialised(),
            scale: 1.0,
            set_scale: ProvidedPort::uninitialised(),
        }
    }
}

#[derive(ComponentDefinition)]
struct Linear {
    ctx: ComponentContext<Self>,
    scale: f32,
    offset: f32,
    set_scale: ProvidedPort<SetScale>,
    set_offset: ProvidedPort<SetOffset>,
}
info_lifecycle!(Linear);

impl Actor for Linear {
    type Message = f32;

    fn receive_local(&mut self, a: Self::Message) -> Handled {
        let res = a * self.scale + self.offset;
        info!(self.log(), "Linear result = {}", res);
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!()
    }
}

impl Provide<SetOffset> for Linear {
    fn handle(&mut self, value: f32) -> Handled {
        self.offset = value;
        Handled::Ok
    }
}

impl Provide<SetScale> for Linear {
    fn handle(&mut self, value: f32) -> Handled {
        self.scale = value;
        Handled::Ok
    }
}

impl Linear {
    fn new() -> Linear {
        Linear {
            ctx: ComponentContext::uninitialised(),
            scale: 1.0,
            offset: 0.0,
            set_scale: ProvidedPort::uninitialised(),
            set_offset: ProvidedPort::uninitialised(),
        }
    }
}

#[derive(ComponentDefinition)]
struct DynamicManager {
    ctx: ComponentContext<Self>,
    arithmetic_units: Vec<Arc<dyn AbstractComponent<Message = f32>>>,
    set_offsets: RequiredPort<SetOffset>,
    set_scales: RequiredPort<SetScale>,
}

ignore_indications!(SetOffset, DynamicManager);
ignore_indications!(SetScale, DynamicManager);
ignore_lifecycle!(DynamicManager);

enum ManagerMessage {
    Spawn(Box<dyn CreateErased<f32> + Send>),
    Compute(f32),
    SetScales(f32),
    SetOffsets(f32),
    KillAll,
    Quit,
}

impl fmt::Debug for ManagerMessage {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            ManagerMessage::Spawn(_) => {
                write!(f, "Spawn(_)")
            }
            ManagerMessage::Compute(val) => {
                write!(f, "Compute({})", *val)
            }
            ManagerMessage::SetScales(scale) => {
                write!(f, "SetScales({})", *scale)
            }
            ManagerMessage::SetOffsets(offset) => {
                write!(f, "SetOffsets({})", *offset)
            }
            ManagerMessage::KillAll => {
                write!(f, "KillAll")
            }
            ManagerMessage::Quit => {
                write!(f, "Quit")
            }
        }
    }
}

impl Actor for DynamicManager {
    type Message = ManagerMessage;

    fn receive_local(&mut self, msg: ManagerMessage) -> Handled {
        match msg {
            ManagerMessage::Spawn(definition) => {
                let system = self.ctx.system();
                let component = system.create_erased(definition);
                component.on_dyn_definition(|def| {
                    if let Some(set_scale) = def.get_provided_port::<SetScale>() {
                        biconnect_ports(set_scale, &mut self.set_scales);
                    }
                    if let Some(set_offset) = def.get_provided_port::<SetOffset>() {
                        biconnect_ports(set_offset, &mut self.set_offsets);
                    }
                });
                system.start(&component);

                self.arithmetic_units.push(component);
            }
            ManagerMessage::Compute(val) => {
                for unit in &self.arithmetic_units {
                    unit.actor_ref().tell(val);
                }
            }
            ManagerMessage::SetScales(scale) => self.set_scales.trigger(scale),
            ManagerMessage::SetOffsets(offset) => self.set_offsets.trigger(offset),
            ManagerMessage::KillAll => {
                self.kill_all();
            }
            ManagerMessage::Quit => {
                self.kill_all();
                self.ctx.system().shutdown_async();
            }
        }

        Handled::Ok
    }

    fn receive_network(&mut self, _: NetMessage) -> Handled {
        unimplemented!()
    }
}

impl DynamicManager {
    fn kill_all(&mut self) {
        let system = self.ctx.system();
        for unit in self.arithmetic_units.drain(..) {
            system.kill(unit);
        }
    }
}

fn main() {
    let system = KompactConfig::default().build().expect("system");
    let manager: Arc<Component<DynamicManager>> = system.create(|| DynamicManager {
        ctx: ComponentContext::uninitialised(),
        arithmetic_units: vec![],
        set_offsets: RequiredPort::uninitialised(),
        set_scales: RequiredPort::uninitialised(),
    });
    system.start(&manager);
    let manager_ref = manager.actor_ref();

    std::thread::spawn(move || {
        for line in stdin().lock().lines() {
            let res = (|| -> Result<(), Box<dyn Error>> {
                let line = line?;

                let message = match line.trim() {
                    "spawn adder" => ManagerMessage::Spawn(Box::new(Adder::new())),
                    "spawn multiplier" => ManagerMessage::Spawn(Box::new(Multiplier::new())),
                    "spawn linear" => ManagerMessage::Spawn(Box::new(Linear::new())),
                    "kill all" => ManagerMessage::KillAll,
                    "quit" => ManagerMessage::Quit,
                    other => {
                        if let Some(offset) = other.strip_prefix("set offset ") {
                            ManagerMessage::SetOffsets(offset.parse()?)
                        } else if let Some(scale) = other.strip_prefix("set scale ") {
                            ManagerMessage::SetScales(scale.parse()?)
                        } else if let Some(val) = other.strip_prefix("compute ") {
                            ManagerMessage::Compute(val.parse()?)
                        } else {
                            Err("unknown command!")?
                        }
                    }
                };

                manager_ref.tell(message);

                Ok(())
            })();

            if let Err(e) = res {
                println!("{}", e);
            }
        }
    });

    system.await_termination();
}

Using Arc<dyn AbstractComponent<Message=M>> we can mix different components that take the same type of message in one collection. Now to fill that Vec with something useful. We’ll define some messages for the manager and start creating some components.

#![allow(clippy::unused_unit)]

use kompact::{component::AbstractComponent, prelude::*};
use std::{
    error::Error,
    fmt,
    io::{stdin, BufRead},
    sync::Arc,
};

#[derive(ComponentDefinition)]
struct Adder {
    ctx: ComponentContext<Self>,
    offset: f32,
    set_offset: ProvidedPort<SetOffset>,
}
info_lifecycle!(Adder);

impl Actor for Adder {
    type Message = f32;

    fn receive_local(&mut self, a: Self::Message) -> Handled {
        let res = a + self.offset;
        info!(self.log(), "Adder result = {}", res);
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!()
    }
}

struct SetOffset;

impl Port for SetOffset {
    type Indication = Never;
    type Request = f32;
}

impl Provide<SetOffset> for Adder {
    fn handle(&mut self, value: f32) -> Handled {
        self.offset = value;
        Handled::Ok
    }
}

impl Adder {
    pub fn new() -> Self {
        Adder {
            ctx: ComponentContext::uninitialised(),
            offset: 0f32,
            set_offset: ProvidedPort::uninitialised(),
        }
    }
}

#[derive(ComponentDefinition)]
struct Multiplier {
    ctx: ComponentContext<Self>,
    scale: f32,
    set_scale: ProvidedPort<SetScale>,
}
info_lifecycle!(Multiplier);

impl Actor for Multiplier {
    type Message = f32;

    fn receive_local(&mut self, a: Self::Message) -> Handled {
        let res = a * self.scale;
        info!(self.log(), "Multiplier result = {}", res);
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!()
    }
}

struct SetScale;

impl Port for SetScale {
    type Indication = Never;
    type Request = f32;
}

impl Provide<SetScale> for Multiplier {
    fn handle(&mut self, value: f32) -> Handled {
        self.scale = value;
        Handled::Ok
    }
}

impl Multiplier {
    fn new() -> Multiplier {
        Multiplier {
            ctx: ComponentContext::uninitialised(),
            scale: 1.0,
            set_scale: ProvidedPort::uninitialised(),
        }
    }
}

#[derive(ComponentDefinition)]
struct Linear {
    ctx: ComponentContext<Self>,
    scale: f32,
    offset: f32,
    set_scale: ProvidedPort<SetScale>,
    set_offset: ProvidedPort<SetOffset>,
}
info_lifecycle!(Linear);

impl Actor for Linear {
    type Message = f32;

    fn receive_local(&mut self, a: Self::Message) -> Handled {
        let res = a * self.scale + self.offset;
        info!(self.log(), "Linear result = {}", res);
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!()
    }
}

impl Provide<SetOffset> for Linear {
    fn handle(&mut self, value: f32) -> Handled {
        self.offset = value;
        Handled::Ok
    }
}

impl Provide<SetScale> for Linear {
    fn handle(&mut self, value: f32) -> Handled {
        self.scale = value;
        Handled::Ok
    }
}

impl Linear {
    fn new() -> Linear {
        Linear {
            ctx: ComponentContext::uninitialised(),
            scale: 1.0,
            offset: 0.0,
            set_scale: ProvidedPort::uninitialised(),
            set_offset: ProvidedPort::uninitialised(),
        }
    }
}

#[derive(ComponentDefinition)]
struct DynamicManager {
    ctx: ComponentContext<Self>,
    arithmetic_units: Vec<Arc<dyn AbstractComponent<Message = f32>>>,
    set_offsets: RequiredPort<SetOffset>,
    set_scales: RequiredPort<SetScale>,
}

ignore_indications!(SetOffset, DynamicManager);
ignore_indications!(SetScale, DynamicManager);
ignore_lifecycle!(DynamicManager);

enum ManagerMessage {
    Spawn(Box<dyn CreateErased<f32> + Send>),
    Compute(f32),
    SetScales(f32),
    SetOffsets(f32),
    KillAll,
    Quit,
}

impl fmt::Debug for ManagerMessage {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            ManagerMessage::Spawn(_) => {
                write!(f, "Spawn(_)")
            }
            ManagerMessage::Compute(val) => {
                write!(f, "Compute({})", *val)
            }
            ManagerMessage::SetScales(scale) => {
                write!(f, "SetScales({})", *scale)
            }
            ManagerMessage::SetOffsets(offset) => {
                write!(f, "SetOffsets({})", *offset)
            }
            ManagerMessage::KillAll => {
                write!(f, "KillAll")
            }
            ManagerMessage::Quit => {
                write!(f, "Quit")
            }
        }
    }
}

impl Actor for DynamicManager {
    type Message = ManagerMessage;

    fn receive_local(&mut self, msg: ManagerMessage) -> Handled {
        match msg {
            ManagerMessage::Spawn(definition) => {
                let system = self.ctx.system();
                let component = system.create_erased(definition);
                component.on_dyn_definition(|def| {
                    if let Some(set_scale) = def.get_provided_port::<SetScale>() {
                        biconnect_ports(set_scale, &mut self.set_scales);
                    }
                    if let Some(set_offset) = def.get_provided_port::<SetOffset>() {
                        biconnect_ports(set_offset, &mut self.set_offsets);
                    }
                });
                system.start(&component);

                self.arithmetic_units.push(component);
            }
            ManagerMessage::Compute(val) => {
                for unit in &self.arithmetic_units {
                    unit.actor_ref().tell(val);
                }
            }
            ManagerMessage::SetScales(scale) => self.set_scales.trigger(scale),
            ManagerMessage::SetOffsets(offset) => self.set_offsets.trigger(offset),
            ManagerMessage::KillAll => {
                self.kill_all();
            }
            ManagerMessage::Quit => {
                self.kill_all();
                self.ctx.system().shutdown_async();
            }
        }

        Handled::Ok
    }

    fn receive_network(&mut self, _: NetMessage) -> Handled {
        unimplemented!()
    }
}

impl DynamicManager {
    fn kill_all(&mut self) {
        let system = self.ctx.system();
        for unit in self.arithmetic_units.drain(..) {
            system.kill(unit);
        }
    }
}

fn main() {
    let system = KompactConfig::default().build().expect("system");
    let manager: Arc<Component<DynamicManager>> = system.create(|| DynamicManager {
        ctx: ComponentContext::uninitialised(),
        arithmetic_units: vec![],
        set_offsets: RequiredPort::uninitialised(),
        set_scales: RequiredPort::uninitialised(),
    });
    system.start(&manager);
    let manager_ref = manager.actor_ref();

    std::thread::spawn(move || {
        for line in stdin().lock().lines() {
            let res = (|| -> Result<(), Box<dyn Error>> {
                let line = line?;

                let message = match line.trim() {
                    "spawn adder" => ManagerMessage::Spawn(Box::new(Adder::new())),
                    "spawn multiplier" => ManagerMessage::Spawn(Box::new(Multiplier::new())),
                    "spawn linear" => ManagerMessage::Spawn(Box::new(Linear::new())),
                    "kill all" => ManagerMessage::KillAll,
                    "quit" => ManagerMessage::Quit,
                    other => {
                        if let Some(offset) = other.strip_prefix("set offset ") {
                            ManagerMessage::SetOffsets(offset.parse()?)
                        } else if let Some(scale) = other.strip_prefix("set scale ") {
                            ManagerMessage::SetScales(scale.parse()?)
                        } else if let Some(val) = other.strip_prefix("compute ") {
                            ManagerMessage::Compute(val.parse()?)
                        } else {
                            Err("unknown command!")?
                        }
                    }
                };

                manager_ref.tell(message);

                Ok(())
            })();

            if let Err(e) = res {
                println!("{}", e);
            }
        }
    });

    system.await_termination();
}

As we don’t want the manager type to know about the concrete component types at all, the Spawn message above contains a boxed, type-erased component definition, which we then turn into a component using create_erased.

Normally, after creating the components we would connect the ports to each other using connect_to_required, or maybe on_definition and direct port access. However, all of those require concrete types, like Arc<Component<Adder>> or Arc<Component<Linear>>, which is not what we get here (Arc<dyn AbstractComponent<Message=f32>). Instead we can use on_dyn_definition together with the Option-returning get_{provided,required}_port to dynamically check if a given port exists on the abstract component and, if so, fetch it.

#![allow(clippy::unused_unit)]

use kompact::{component::AbstractComponent, prelude::*};
use std::{
    error::Error,
    fmt,
    io::{stdin, BufRead},
    sync::Arc,
};

#[derive(ComponentDefinition)]
struct Adder {
    ctx: ComponentContext<Self>,
    offset: f32,
    set_offset: ProvidedPort<SetOffset>,
}
info_lifecycle!(Adder);

impl Actor for Adder {
    type Message = f32;

    fn receive_local(&mut self, a: Self::Message) -> Handled {
        let res = a + self.offset;
        info!(self.log(), "Adder result = {}", res);
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!()
    }
}

struct SetOffset;

impl Port for SetOffset {
    type Indication = Never;
    type Request = f32;
}

impl Provide<SetOffset> for Adder {
    fn handle(&mut self, value: f32) -> Handled {
        self.offset = value;
        Handled::Ok
    }
}

impl Adder {
    pub fn new() -> Self {
        Adder {
            ctx: ComponentContext::uninitialised(),
            offset: 0f32,
            set_offset: ProvidedPort::uninitialised(),
        }
    }
}

#[derive(ComponentDefinition)]
struct Multiplier {
    ctx: ComponentContext<Self>,
    scale: f32,
    set_scale: ProvidedPort<SetScale>,
}
info_lifecycle!(Multiplier);

impl Actor for Multiplier {
    type Message = f32;

    fn receive_local(&mut self, a: Self::Message) -> Handled {
        let res = a * self.scale;
        info!(self.log(), "Multiplier result = {}", res);
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!()
    }
}

struct SetScale;

impl Port for SetScale {
    type Indication = Never;
    type Request = f32;
}

impl Provide<SetScale> for Multiplier {
    fn handle(&mut self, value: f32) -> Handled {
        self.scale = value;
        Handled::Ok
    }
}

impl Multiplier {
    fn new() -> Multiplier {
        Multiplier {
            ctx: ComponentContext::uninitialised(),
            scale: 1.0,
            set_scale: ProvidedPort::uninitialised(),
        }
    }
}

#[derive(ComponentDefinition)]
struct Linear {
    ctx: ComponentContext<Self>,
    scale: f32,
    offset: f32,
    set_scale: ProvidedPort<SetScale>,
    set_offset: ProvidedPort<SetOffset>,
}
info_lifecycle!(Linear);

impl Actor for Linear {
    type Message = f32;

    fn receive_local(&mut self, a: Self::Message) -> Handled {
        let res = a * self.scale + self.offset;
        info!(self.log(), "Linear result = {}", res);
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!()
    }
}

impl Provide<SetOffset> for Linear {
    fn handle(&mut self, value: f32) -> Handled {
        self.offset = value;
        Handled::Ok
    }
}

impl Provide<SetScale> for Linear {
    fn handle(&mut self, value: f32) -> Handled {
        self.scale = value;
        Handled::Ok
    }
}

impl Linear {
    fn new() -> Linear {
        Linear {
            ctx: ComponentContext::uninitialised(),
            scale: 1.0,
            offset: 0.0,
            set_scale: ProvidedPort::uninitialised(),
            set_offset: ProvidedPort::uninitialised(),
        }
    }
}

#[derive(ComponentDefinition)]
struct DynamicManager {
    ctx: ComponentContext<Self>,
    arithmetic_units: Vec<Arc<dyn AbstractComponent<Message = f32>>>,
    set_offsets: RequiredPort<SetOffset>,
    set_scales: RequiredPort<SetScale>,
}

ignore_indications!(SetOffset, DynamicManager);
ignore_indications!(SetScale, DynamicManager);
ignore_lifecycle!(DynamicManager);

enum ManagerMessage {
    Spawn(Box<dyn CreateErased<f32> + Send>),
    Compute(f32),
    SetScales(f32),
    SetOffsets(f32),
    KillAll,
    Quit,
}

impl fmt::Debug for ManagerMessage {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            ManagerMessage::Spawn(_) => {
                write!(f, "Spawn(_)")
            }
            ManagerMessage::Compute(val) => {
                write!(f, "Compute({})", *val)
            }
            ManagerMessage::SetScales(scale) => {
                write!(f, "SetScales({})", *scale)
            }
            ManagerMessage::SetOffsets(offset) => {
                write!(f, "SetOffsets({})", *offset)
            }
            ManagerMessage::KillAll => {
                write!(f, "KillAll")
            }
            ManagerMessage::Quit => {
                write!(f, "Quit")
            }
        }
    }
}

impl Actor for DynamicManager {
    type Message = ManagerMessage;

    fn receive_local(&mut self, msg: ManagerMessage) -> Handled {
        match msg {
            ManagerMessage::Spawn(definition) => {
                let system = self.ctx.system();
                let component = system.create_erased(definition);
                component.on_dyn_definition(|def| {
                    if let Some(set_scale) = def.get_provided_port::<SetScale>() {
                        biconnect_ports(set_scale, &mut self.set_scales);
                    }
                    if let Some(set_offset) = def.get_provided_port::<SetOffset>() {
                        biconnect_ports(set_offset, &mut self.set_offsets);
                    }
                });
                system.start(&component);

                self.arithmetic_units.push(component);
            }
            ManagerMessage::Compute(val) => {
                for unit in &self.arithmetic_units {
                    unit.actor_ref().tell(val);
                }
            }
            ManagerMessage::SetScales(scale) => self.set_scales.trigger(scale),
            ManagerMessage::SetOffsets(offset) => self.set_offsets.trigger(offset),
            ManagerMessage::KillAll => {
                self.kill_all();
            }
            ManagerMessage::Quit => {
                self.kill_all();
                self.ctx.system().shutdown_async();
            }
        }

        Handled::Ok
    }

    fn receive_network(&mut self, _: NetMessage) -> Handled {
        unimplemented!()
    }
}

impl DynamicManager {
    fn kill_all(&mut self) {
        let system = self.ctx.system();
        for unit in self.arithmetic_units.drain(..) {
            system.kill(unit);
        }
    }
}

fn main() {
    let system = KompactConfig::default().build().expect("system");
    let manager: Arc<Component<DynamicManager>> = system.create(|| DynamicManager {
        ctx: ComponentContext::uninitialised(),
        arithmetic_units: vec![],
        set_offsets: RequiredPort::uninitialised(),
        set_scales: RequiredPort::uninitialised(),
    });
    system.start(&manager);
    let manager_ref = manager.actor_ref();

    std::thread::spawn(move || {
        for line in stdin().lock().lines() {
            let res = (|| -> Result<(), Box<dyn Error>> {
                let line = line?;

                let message = match line.trim() {
                    "spawn adder" => ManagerMessage::Spawn(Box::new(Adder::new())),
                    "spawn multiplier" => ManagerMessage::Spawn(Box::new(Multiplier::new())),
                    "spawn linear" => ManagerMessage::Spawn(Box::new(Linear::new())),
                    "kill all" => ManagerMessage::KillAll,
                    "quit" => ManagerMessage::Quit,
                    other => {
                        if let Some(offset) = other.strip_prefix("set offset ") {
                            ManagerMessage::SetOffsets(offset.parse()?)
                        } else if let Some(scale) = other.strip_prefix("set scale ") {
                            ManagerMessage::SetScales(scale.parse()?)
                        } else if let Some(val) = other.strip_prefix("compute ") {
                            ManagerMessage::Compute(val.parse()?)
                        } else {
                            Err("unknown command!")?
                        }
                    }
                };

                manager_ref.tell(message);

                Ok(())
            })();

            if let Err(e) = res {
                println!("{}", e);
            }
        }
    });

    system.await_termination();
}

Now that we have the dynamic component part done, we can write a very simple repl. We’ll start the Kompact system in the main thread, create the manager there, and await system termination. In a separate thread we’ll continuously read stdin and interpret the lines as commands to send to the manager.

#![allow(clippy::unused_unit)]

use kompact::{component::AbstractComponent, prelude::*};
use std::{
    error::Error,
    fmt,
    io::{stdin, BufRead},
    sync::Arc,
};

#[derive(ComponentDefinition)]
struct Adder {
    ctx: ComponentContext<Self>,
    offset: f32,
    set_offset: ProvidedPort<SetOffset>,
}
info_lifecycle!(Adder);

impl Actor for Adder {
    type Message = f32;

    fn receive_local(&mut self, a: Self::Message) -> Handled {
        let res = a + self.offset;
        info!(self.log(), "Adder result = {}", res);
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!()
    }
}

struct SetOffset;

impl Port for SetOffset {
    type Indication = Never;
    type Request = f32;
}

impl Provide<SetOffset> for Adder {
    fn handle(&mut self, value: f32) -> Handled {
        self.offset = value;
        Handled::Ok
    }
}

impl Adder {
    pub fn new() -> Self {
        Adder {
            ctx: ComponentContext::uninitialised(),
            offset: 0f32,
            set_offset: ProvidedPort::uninitialised(),
        }
    }
}

#[derive(ComponentDefinition)]
struct Multiplier {
    ctx: ComponentContext<Self>,
    scale: f32,
    set_scale: ProvidedPort<SetScale>,
}
info_lifecycle!(Multiplier);

impl Actor for Multiplier {
    type Message = f32;

    fn receive_local(&mut self, a: Self::Message) -> Handled {
        let res = a * self.scale;
        info!(self.log(), "Multiplier result = {}", res);
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!()
    }
}

struct SetScale;

impl Port for SetScale {
    type Indication = Never;
    type Request = f32;
}

impl Provide<SetScale> for Multiplier {
    fn handle(&mut self, value: f32) -> Handled {
        self.scale = value;
        Handled::Ok
    }
}

impl Multiplier {
    fn new() -> Multiplier {
        Multiplier {
            ctx: ComponentContext::uninitialised(),
            scale: 1.0,
            set_scale: ProvidedPort::uninitialised(),
        }
    }
}

#[derive(ComponentDefinition)]
struct Linear {
    ctx: ComponentContext<Self>,
    scale: f32,
    offset: f32,
    set_scale: ProvidedPort<SetScale>,
    set_offset: ProvidedPort<SetOffset>,
}
info_lifecycle!(Linear);

impl Actor for Linear {
    type Message = f32;

    fn receive_local(&mut self, a: Self::Message) -> Handled {
        let res = a * self.scale + self.offset;
        info!(self.log(), "Linear result = {}", res);
        Handled::Ok
    }

    fn receive_network(&mut self, _msg: NetMessage) -> Handled {
        unimplemented!()
    }
}

impl Provide<SetOffset> for Linear {
    fn handle(&mut self, value: f32) -> Handled {
        self.offset = value;
        Handled::Ok
    }
}

impl Provide<SetScale> for Linear {
    fn handle(&mut self, value: f32) -> Handled {
        self.scale = value;
        Handled::Ok
    }
}

impl Linear {
    fn new() -> Linear {
        Linear {
            ctx: ComponentContext::uninitialised(),
            scale: 1.0,
            offset: 0.0,
            set_scale: ProvidedPort::uninitialised(),
            set_offset: ProvidedPort::uninitialised(),
        }
    }
}

#[derive(ComponentDefinition)]
struct DynamicManager {
    ctx: ComponentContext<Self>,
    arithmetic_units: Vec<Arc<dyn AbstractComponent<Message = f32>>>,
    set_offsets: RequiredPort<SetOffset>,
    set_scales: RequiredPort<SetScale>,
}

ignore_indications!(SetOffset, DynamicManager);
ignore_indications!(SetScale, DynamicManager);
ignore_lifecycle!(DynamicManager);

enum ManagerMessage {
    Spawn(Box<dyn CreateErased<f32> + Send>),
    Compute(f32),
    SetScales(f32),
    SetOffsets(f32),
    KillAll,
    Quit,
}

impl fmt::Debug for ManagerMessage {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            ManagerMessage::Spawn(_) => {
                write!(f, "Spawn(_)")
            }
            ManagerMessage::Compute(val) => {
                write!(f, "Compute({})", *val)
            }
            ManagerMessage::SetScales(scale) => {
                write!(f, "SetScales({})", *scale)
            }
            ManagerMessage::SetOffsets(offset) => {
                write!(f, "SetOffsets({})", *offset)
            }
            ManagerMessage::KillAll => {
                write!(f, "KillAll")
            }
            ManagerMessage::Quit => {
                write!(f, "Quit")
            }
        }
    }
}

impl Actor for DynamicManager {
    type Message = ManagerMessage;

    fn receive_local(&mut self, msg: ManagerMessage) -> Handled {
        match msg {
            ManagerMessage::Spawn(definition) => {
                let system = self.ctx.system();
                let component = system.create_erased(definition);
                component.on_dyn_definition(|def| {
                    if let Some(set_scale) = def.get_provided_port::<SetScale>() {
                        biconnect_ports(set_scale, &mut self.set_scales);
                    }
                    if let Some(set_offset) = def.get_provided_port::<SetOffset>() {
                        biconnect_ports(set_offset, &mut self.set_offsets);
                    }
                });
                system.start(&component);

                self.arithmetic_units.push(component);
            }
            ManagerMessage::Compute(val) => {
                for unit in &self.arithmetic_units {
                    unit.actor_ref().tell(val);
                }
            }
            ManagerMessage::SetScales(scale) => self.set_scales.trigger(scale),
            ManagerMessage::SetOffsets(offset) => self.set_offsets.trigger(offset),
            ManagerMessage::KillAll => {
                self.kill_all();
            }
            ManagerMessage::Quit => {
                self.kill_all();
                self.ctx.system().shutdown_async();
            }
        }

        Handled::Ok
    }

    fn receive_network(&mut self, _: NetMessage) -> Handled {
        unimplemented!()
    }
}

impl DynamicManager {
    fn kill_all(&mut self) {
        let system = self.ctx.system();
        for unit in self.arithmetic_units.drain(..) {
            system.kill(unit);
        }
    }
}

fn main() {
   let system = KompactConfig::default().build().expect("system");
   let manager: Arc<Component<DynamicManager>> = system.create(|| DynamicManager {
       ctx: ComponentContext::uninitialised(),
       arithmetic_units: vec![],
       set_offsets: RequiredPort::uninitialised(),
       set_scales: RequiredPort::uninitialised(),
   });
   system.start(&manager);
   let manager_ref = manager.actor_ref();

   std::thread::spawn(move || {
       for line in stdin().lock().lines() {
           let res = (|| -> Result<(), Box<dyn Error>> {
               let line = line?;

               let message = match line.trim() {
                   "spawn adder" => ManagerMessage::Spawn(Box::new(Adder::new())),
                   "spawn multiplier" => ManagerMessage::Spawn(Box::new(Multiplier::new())),
                   "spawn linear" => ManagerMessage::Spawn(Box::new(Linear::new())),
                   "kill all" => ManagerMessage::KillAll,
                   "quit" => ManagerMessage::Quit,
                   other => {
                       if let Some(offset) = other.strip_prefix("set offset ") {
                           ManagerMessage::SetOffsets(offset.parse()?)
                       } else if let Some(scale) = other.strip_prefix("set scale ") {
                           ManagerMessage::SetScales(scale.parse()?)
                       } else if let Some(val) = other.strip_prefix("compute ") {
                           ManagerMessage::Compute(val.parse()?)
                       } else {
                           Err("unknown command!")?
                       }
                   }
               };

               manager_ref.tell(message);

               Ok(())
           })();

           if let Err(e) = res {
               println!("{}", e);
           }
       }
   });

   system.await_termination();
}

When run, it looks something like this:

❯ cargo run --features=type_erasure,silent_logging --bin dynamic_components
   Compiling kompact-examples v0.10.0 (/home/mrobakowski/projects/kompact/docs/examples)
    Finished dev [unoptimized + debuginfo] target(s) in 2.56s
     Running `/home/mrobakowski/projects/kompact/target/debug/dynamic_components`
compute 1
spawn adder
Nov 09 00:55:59.917 INFO Starting..., ctype: Adder, cid: 79bd396b-de75-4284-bc57-e0cf8193f72f, system: kompact-runtime-1, location: docs/examples/src/bin/dynamic_components.rs:39
set offset 5
compute 1
Nov 09 00:56:43.465 INFO Adder result = 6, ctype: Adder, cid: 79bd396b-de75-4284-bc57-e0cf8193f72f, system: kompact-runtime-1, location: docs/examples/src/bin/dynamic_components.rs:46
spawn multiplier
Nov 09 00:56:55.518 INFO Starting..., ctype: Multiplier, cid: 47dd4827-8d35-4351-a717-344ec7fe70fe, system: kompact-runtime-1, location: docs/examples/src/bin/dynamic_components.rs:85
set scale 2
compute 2
Nov 09 00:57:09.684 INFO Adder result = 7, ctype: Adder, cid: 79bd396b-de75-4284-bc57-e0cf8193f72f, system: kompact-runtime-1, location: docs/examples/src/bin/dynamic_components.rs:46
Nov 09 00:57:09.684 INFO Multiplier result = 4, ctype: Multiplier, cid: 47dd4827-8d35-4351-a717-344ec7fe70fe, system: kompact-runtime-1, location: docs/examples/src/bin/dynamic_components.rs:92
kill all
Nov 09 00:57:17.769 INFO Killing..., ctype: Adder, cid: 79bd396b-de75-4284-bc57-e0cf8193f72f, system: kompact-runtime-1, location: docs/examples/src/bin/dynamic_components.rs:39
Nov 09 00:57:17.769 INFO Killing..., ctype: Multiplier, cid: 47dd4827-8d35-4351-a717-344ec7fe70fe, system: kompact-runtime-1, location: docs/examples/src/bin/dynamic_components.rs:85
spawn linear
Nov 09 00:57:24.840 INFO Starting..., ctype: Linear, cid: d0f01d1a-b448-4b5f-bddd-701d764992ea, system: kompact-runtime-1, location: docs/examples/src/bin/dynamic_components.rs:135
spawn adder
Nov 09 00:57:32.136 INFO Starting..., ctype: Adder, cid: c3b9a0c5-875d-4e1d-8c70-3b414fe2a7bb, system: kompact-runtime-1, location: docs/examples/src/bin/dynamic_components.rs:39
set offset 2
set scale 3
compute 4
Nov 09 00:57:41.558 INFO Linear result = 14, ctype: Linear, cid: d0f01d1a-b448-4b5f-bddd-701d764992ea, system: kompact-runtime-1, location: docs/examples/src/bin/dynamic_components.rs:142
Nov 09 00:57:41.558 INFO Adder result = 6, ctype: Adder, cid: c3b9a0c5-875d-4e1d-8c70-3b414fe2a7bb, system: kompact-runtime-1, location: docs/examples/src/bin/dynamic_components.rs:46
quit
Nov 09 00:57:51.351 INFO Killing..., ctype: Linear, cid: d0f01d1a-b448-4b5f-bddd-701d764992ea, system: kompact-runtime-1, location: docs/examples/src/bin/dynamic_components.rs:135
Nov 09 00:57:51.352 INFO Killing..., ctype: Adder, cid: c3b9a0c5-875d-4e1d-8c70-3b414fe2a7bb, system: kompact-runtime-1, location: docs/examples/src/bin/dynamic_components.rs:39