Named Services

In the last section we discussed how to build a leader election mechanism with a bunch of networked Kompact systems. But we couldn’t actually run it in deployment, because we couldn’t really figure out how to collect a list of actor paths for all the processes and then distribute that list to every process. This happens because we can only know the actor path of an actor after we have created it. We could have manually distributed the actor paths, by writing the assigned path to a file, then collecting it externally, and finally parsing paths from said collected file and passing them to each elector component. But that wouldn’t be a very nice system now, would it?

What we are missing here is a way to predict an ActorPath for a particular actor on a particular system. If we can know even a single path on a single host in the distributed actor system, we can have everyone send a message there, which will give that special component the unique paths for everyone that sends there, which it can in turn distribute back to everyone who has “checked in” in this manner. This process is often referred to as “bootstrapping”. In this section we are going to use named actor paths, which we can predict given some information about the system, to build a bootstrapping “service” for our leader election group.

Messages

For the bootstrapping communication we require a new CheckIn message. It doesn’t actually need any content, since we really only care about the ActorPath of the sender. We will reply to this message with our UpdateProcesses message from the previous section. However, since that has to go over the network now, we need to make it serialisable. We also aren’t locally sharing the process set anymore, so we turn the Arc<[ActorPath]> into a simple Vec<ActorPath>.

#![allow(clippy::unused_unit)]
use kompact::{prelude::*, serde_serialisers::*};
use kompact_examples::trusting::*;
use serde::{Deserialize, Serialize};
use std::{
    collections::HashSet,
    net::{IpAddr, Ipv4Addr, SocketAddr},
    time::Duration,
};

#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
struct CheckIn;
impl SerialisationId for CheckIn {
    const SER_ID: SerId = 2345;
}

#[derive(Serialize, Deserialize, Debug, Clone)]
struct UpdateProcesses(Vec<ActorPath>);
impl SerialisationId for UpdateProcesses {
    const SER_ID: SerId = 3456;
}

#[derive(ComponentDefinition)]
struct BootstrapServer {
    ctx: ComponentContext<Self>,
    processes: HashSet<ActorPath>,
}
impl BootstrapServer {
    fn new() -> Self {
        BootstrapServer {
            ctx: ComponentContext::uninitialised(),
            processes: HashSet::new(),
        }
    }


    fn broadcast_processess(&self) -> () {
        let procs: Vec<ActorPath> = self.processes.iter().cloned().collect();
        let msg = UpdateProcesses(procs);
        self.processes.iter().for_each(|process| {
            process.tell((msg.clone(), Serde), self);
        });
    }
}
ignore_lifecycle!(BootstrapServer);
impl NetworkActor for BootstrapServer {
    type Deserialiser = Serde;
    type Message = CheckIn;

    fn receive(&mut self, source: Option<ActorPath>, _msg: Self::Message) -> Handled {
        if let Some(process) = source {
            if self.processes.insert(process) {
                self.broadcast_processess();
            }
        }
        Handled::Ok
    }
}

#[derive(ComponentDefinition)]
struct EventualLeaderElector {
    ctx: ComponentContext<Self>,
    omega_port: ProvidedPort<EventualLeaderDetection>,
    bootstrap_server: ActorPath,
    processes: Box<[ActorPath]>,
    candidates: HashSet<ActorPath>,
    period: Duration,
    delta: Duration,
    timer_handle: Option<ScheduledTimer>,
    leader: Option<ActorPath>,
}
impl EventualLeaderElector {
    fn new(bootstrap_server: ActorPath) -> Self {
        let minimal_period = Duration::from_millis(1);
        EventualLeaderElector {
            ctx: ComponentContext::uninitialised(),
            omega_port: ProvidedPort::uninitialised(),
            bootstrap_server,
            processes: Vec::new().into_boxed_slice(),
            candidates: HashSet::new(),
            period: minimal_period,
            delta: minimal_period,
            timer_handle: None,
            leader: None,
        }
    }


    fn select_leader(&mut self) -> Option<ActorPath> {
        let mut candidates: Vec<ActorPath> = self.candidates.drain().collect();
        candidates.sort_unstable();
        candidates.reverse(); // pick smallest instead of largest
        candidates.pop()
    }

    fn handle_timeout(&mut self, timeout_id: ScheduledTimer) -> Handled {
        match self.timer_handle.take() {
            Some(timeout) if timeout == timeout_id => {
                let new_leader = self.select_leader();
                if new_leader != self.leader {
                    self.period += self.delta;
                    self.leader = new_leader;
                    if let Some(ref leader) = self.leader {
                        self.omega_port.trigger(Trust(leader.clone()));
                    }
                    self.cancel_timer(timeout);
                    let new_timer =
                        self.schedule_periodic(self.period, self.period, Self::handle_timeout);
                    self.timer_handle = Some(new_timer);
                } else {
                    // just put it back
                    self.timer_handle = Some(timeout);
                }
                self.send_heartbeats();
                Handled::Ok
            }
            Some(_) => Handled::Ok, // just ignore outdated timeouts
            None => {
                warn!(self.log(), "Got unexpected timeout: {:?}", timeout_id);
                Handled::Ok
            } // can happen during restart or teardown
        }
    }

    fn send_heartbeats(&self) {
        self.processes.iter().for_each(|process| {
            process.tell((Heartbeat, Serde), self);
        });
    }
}

impl ComponentLifecycle for EventualLeaderElector {
    fn on_start(&mut self) -> Handled {
        self.bootstrap_server.tell((CheckIn, Serde), self);

        self.period = self.ctx.config()["omega"]["initial-period"]
            .as_duration()
            .expect("initial period");
        self.delta = self.ctx.config()["omega"]["delta"]
            .as_duration()
            .expect("delta");
        let timeout = self.schedule_periodic(self.period, self.period, Self::handle_timeout);
        self.timer_handle = Some(timeout);
        Handled::Ok
    }

    fn on_stop(&mut self) -> Handled {
        if let Some(timeout) = self.timer_handle.take() {
            self.cancel_timer(timeout);
        }
        Handled::Ok
    }

    fn on_kill(&mut self) -> Handled {
        self.on_stop()
    }
}

// Doesn't have any requests
ignore_requests!(EventualLeaderDetection, EventualLeaderElector);

impl Actor for EventualLeaderElector {
    type Message = Never;

    fn receive_local(&mut self, _msg: Self::Message) -> Handled {
        unreachable!();
    }

    fn receive_network(&mut self, msg: NetMessage) -> Handled {
        let sender = msg.sender;

        match_deser! {
            (msg.data) {
                msg(_heartbeat): Heartbeat [using Serde] => {
                    self.candidates.insert(sender);
                },
                msg(UpdateProcesses(processes)): UpdateProcesses [using Serde] => {
                    info!(
                        self.log(),
                        "Received new process set with {} processes",
                        processes.len()
                    );
                    self.processes = processes.into_boxed_slice();
                },
            }
        };
        Handled::Ok
    }
}

pub fn main() {
    let args: Vec<String> = std::env::args().collect();
    match args.len() {
        2 => {
            let bootstrap_port: u16 = args[1].parse().expect("port number");
            let bootstrap_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), bootstrap_port);
            let system = run_server(bootstrap_socket);
            system.await_termination(); // gotta quit it from command line
        }
        3 => {
            let bootstrap_port: u16 = args[1].parse().expect("port number");
            let bootstrap_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), bootstrap_port);
            let client_port: u16 = args[2].parse().expect("port number");
            let client_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), client_port);
            let system = run_client(bootstrap_socket, client_socket);
            system.await_termination(); // gotta quit it from command line
        }
        x => panic!("Expected either 1 argument (the port for the bootstrap server to bind on) or 2 arguments (boostrap server and client port), but got {} instead!", x-1),
    }
}

const BOOTSTRAP_PATH: &str = "bootstrap";

pub fn run_server(socket: SocketAddr) -> KompactSystem {
    let mut cfg = KompactConfig::default();
    cfg.load_config_file("./application.conf");
    cfg.system_components(DeadletterBox::new, NetworkConfig::new(socket).build());

    let system = cfg.build().expect("KompactSystem");

    let (bootstrap, bootstrap_registration) = system.create_and_register(BootstrapServer::new);
    let bootstrap_service_registration = system.register_by_alias(&bootstrap, BOOTSTRAP_PATH);

    let _bootstrap_unique = bootstrap_registration
        .wait_expect(Duration::from_millis(1000), "bootstrap never registered");
    let bootstrap_service = bootstrap_service_registration
        .wait_expect(Duration::from_millis(1000), "bootstrap never registered");
    system.start(&bootstrap);

    let printer = system.create(TrustPrinter::new);
    let (detector, registration) =
        system.create_and_register(|| EventualLeaderElector::new(bootstrap_service));
    biconnect_components::<EventualLeaderDetection, _, _>(&detector, &printer).expect("connection");
    let _path = registration.wait_expect(Duration::from_millis(1000), "detector never registered");
    system.start(&printer);
    system.start(&detector);

    system
}

pub fn run_client(bootstrap_socket: SocketAddr, client_socket: SocketAddr) -> KompactSystem {
    let mut cfg = KompactConfig::default();
    cfg.load_config_file("./application.conf");
    cfg.system_components(
        DeadletterBox::new,
        NetworkConfig::new(client_socket).build(),
    );

    let system = cfg.build().expect("KompactSystem");

    let bootstrap_service: ActorPath = NamedPath::with_socket(
        Transport::Tcp,
        bootstrap_socket,
        vec![BOOTSTRAP_PATH.into()],
    )
    .into();

    let printer = system.create(TrustPrinter::new);
    let (detector, registration) =
        system.create_and_register(|| EventualLeaderElector::new(bootstrap_service));
    biconnect_components::<EventualLeaderDetection, _, _>(&detector, &printer).expect("connection");
    let _path = registration.wait_expect(Duration::from_millis(1000), "detector never registered");
    system.start(&printer);
    system.start(&detector);

    system
}

#[cfg(test)]
mod tests {
    use super::*;

    const SERVER_SOCKET: &str = "127.0.0.1:12345";
    const CLIENT_SOCKET: &str = "127.0.0.1:0";

    #[test]
    fn test_bootstrapping() {
        let server_socket: SocketAddr = SERVER_SOCKET.parse().unwrap();
        let server_system = run_server(server_socket);
        let client_socket: SocketAddr = CLIENT_SOCKET.parse().unwrap();
        let mut clients_systems: Vec<KompactSystem> = (0..3)
            .map(|_i| run_client(server_socket, client_socket))
            .collect();
        // let them settle
        std::thread::sleep(Duration::from_millis(1000));
        // shut down systems one by one
        for sys in clients_systems.drain(..) {
            std::thread::sleep(Duration::from_millis(1000));
            sys.shutdown().expect("shutdown");
        }
        std::thread::sleep(Duration::from_millis(1000));
        server_system.shutdown().expect("shutdown");
    }
}

State

Our bootstrap server’s state is almost trivial. All it needs to keep track of is the current process set.

#![allow(clippy::unused_unit)]
use kompact::{prelude::*, serde_serialisers::*};
use kompact_examples::trusting::*;
use serde::{Deserialize, Serialize};
use std::{
    collections::HashSet,
    net::{IpAddr, Ipv4Addr, SocketAddr},
    time::Duration,
};

#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
struct CheckIn;
impl SerialisationId for CheckIn {
    const SER_ID: SerId = 2345;
}

#[derive(Serialize, Deserialize, Debug, Clone)]
struct UpdateProcesses(Vec<ActorPath>);
impl SerialisationId for UpdateProcesses {
    const SER_ID: SerId = 3456;
}

#[derive(ComponentDefinition)]
struct BootstrapServer {
    ctx: ComponentContext<Self>,
    processes: HashSet<ActorPath>,
}
impl BootstrapServer {
    fn new() -> Self {
        BootstrapServer {
            ctx: ComponentContext::uninitialised(),
            processes: HashSet::new(),
        }
    }


    fn broadcast_processess(&self) -> () {
        let procs: Vec<ActorPath> = self.processes.iter().cloned().collect();
        let msg = UpdateProcesses(procs);
        self.processes.iter().for_each(|process| {
            process.tell((msg.clone(), Serde), self);
        });
    }
}
ignore_lifecycle!(BootstrapServer);
impl NetworkActor for BootstrapServer {
    type Deserialiser = Serde;
    type Message = CheckIn;

    fn receive(&mut self, source: Option<ActorPath>, _msg: Self::Message) -> Handled {
        if let Some(process) = source {
            if self.processes.insert(process) {
                self.broadcast_processess();
            }
        }
        Handled::Ok
    }
}

#[derive(ComponentDefinition)]
struct EventualLeaderElector {
    ctx: ComponentContext<Self>,
    omega_port: ProvidedPort<EventualLeaderDetection>,
    bootstrap_server: ActorPath,
    processes: Box<[ActorPath]>,
    candidates: HashSet<ActorPath>,
    period: Duration,
    delta: Duration,
    timer_handle: Option<ScheduledTimer>,
    leader: Option<ActorPath>,
}
impl EventualLeaderElector {
    fn new(bootstrap_server: ActorPath) -> Self {
        let minimal_period = Duration::from_millis(1);
        EventualLeaderElector {
            ctx: ComponentContext::uninitialised(),
            omega_port: ProvidedPort::uninitialised(),
            bootstrap_server,
            processes: Vec::new().into_boxed_slice(),
            candidates: HashSet::new(),
            period: minimal_period,
            delta: minimal_period,
            timer_handle: None,
            leader: None,
        }
    }


    fn select_leader(&mut self) -> Option<ActorPath> {
        let mut candidates: Vec<ActorPath> = self.candidates.drain().collect();
        candidates.sort_unstable();
        candidates.reverse(); // pick smallest instead of largest
        candidates.pop()
    }

    fn handle_timeout(&mut self, timeout_id: ScheduledTimer) -> Handled {
        match self.timer_handle.take() {
            Some(timeout) if timeout == timeout_id => {
                let new_leader = self.select_leader();
                if new_leader != self.leader {
                    self.period += self.delta;
                    self.leader = new_leader;
                    if let Some(ref leader) = self.leader {
                        self.omega_port.trigger(Trust(leader.clone()));
                    }
                    self.cancel_timer(timeout);
                    let new_timer =
                        self.schedule_periodic(self.period, self.period, Self::handle_timeout);
                    self.timer_handle = Some(new_timer);
                } else {
                    // just put it back
                    self.timer_handle = Some(timeout);
                }
                self.send_heartbeats();
                Handled::Ok
            }
            Some(_) => Handled::Ok, // just ignore outdated timeouts
            None => {
                warn!(self.log(), "Got unexpected timeout: {:?}", timeout_id);
                Handled::Ok
            } // can happen during restart or teardown
        }
    }

    fn send_heartbeats(&self) {
        self.processes.iter().for_each(|process| {
            process.tell((Heartbeat, Serde), self);
        });
    }
}

impl ComponentLifecycle for EventualLeaderElector {
    fn on_start(&mut self) -> Handled {
        self.bootstrap_server.tell((CheckIn, Serde), self);

        self.period = self.ctx.config()["omega"]["initial-period"]
            .as_duration()
            .expect("initial period");
        self.delta = self.ctx.config()["omega"]["delta"]
            .as_duration()
            .expect("delta");
        let timeout = self.schedule_periodic(self.period, self.period, Self::handle_timeout);
        self.timer_handle = Some(timeout);
        Handled::Ok
    }

    fn on_stop(&mut self) -> Handled {
        if let Some(timeout) = self.timer_handle.take() {
            self.cancel_timer(timeout);
        }
        Handled::Ok
    }

    fn on_kill(&mut self) -> Handled {
        self.on_stop()
    }
}

// Doesn't have any requests
ignore_requests!(EventualLeaderDetection, EventualLeaderElector);

impl Actor for EventualLeaderElector {
    type Message = Never;

    fn receive_local(&mut self, _msg: Self::Message) -> Handled {
        unreachable!();
    }

    fn receive_network(&mut self, msg: NetMessage) -> Handled {
        let sender = msg.sender;

        match_deser! {
            (msg.data) {
                msg(_heartbeat): Heartbeat [using Serde] => {
                    self.candidates.insert(sender);
                },
                msg(UpdateProcesses(processes)): UpdateProcesses [using Serde] => {
                    info!(
                        self.log(),
                        "Received new process set with {} processes",
                        processes.len()
                    );
                    self.processes = processes.into_boxed_slice();
                },
            }
        };
        Handled::Ok
    }
}

pub fn main() {
    let args: Vec<String> = std::env::args().collect();
    match args.len() {
        2 => {
            let bootstrap_port: u16 = args[1].parse().expect("port number");
            let bootstrap_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), bootstrap_port);
            let system = run_server(bootstrap_socket);
            system.await_termination(); // gotta quit it from command line
        }
        3 => {
            let bootstrap_port: u16 = args[1].parse().expect("port number");
            let bootstrap_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), bootstrap_port);
            let client_port: u16 = args[2].parse().expect("port number");
            let client_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), client_port);
            let system = run_client(bootstrap_socket, client_socket);
            system.await_termination(); // gotta quit it from command line
        }
        x => panic!("Expected either 1 argument (the port for the bootstrap server to bind on) or 2 arguments (boostrap server and client port), but got {} instead!", x-1),
    }
}

const BOOTSTRAP_PATH: &str = "bootstrap";

pub fn run_server(socket: SocketAddr) -> KompactSystem {
    let mut cfg = KompactConfig::default();
    cfg.load_config_file("./application.conf");
    cfg.system_components(DeadletterBox::new, NetworkConfig::new(socket).build());

    let system = cfg.build().expect("KompactSystem");

    let (bootstrap, bootstrap_registration) = system.create_and_register(BootstrapServer::new);
    let bootstrap_service_registration = system.register_by_alias(&bootstrap, BOOTSTRAP_PATH);

    let _bootstrap_unique = bootstrap_registration
        .wait_expect(Duration::from_millis(1000), "bootstrap never registered");
    let bootstrap_service = bootstrap_service_registration
        .wait_expect(Duration::from_millis(1000), "bootstrap never registered");
    system.start(&bootstrap);

    let printer = system.create(TrustPrinter::new);
    let (detector, registration) =
        system.create_and_register(|| EventualLeaderElector::new(bootstrap_service));
    biconnect_components::<EventualLeaderDetection, _, _>(&detector, &printer).expect("connection");
    let _path = registration.wait_expect(Duration::from_millis(1000), "detector never registered");
    system.start(&printer);
    system.start(&detector);

    system
}

pub fn run_client(bootstrap_socket: SocketAddr, client_socket: SocketAddr) -> KompactSystem {
    let mut cfg = KompactConfig::default();
    cfg.load_config_file("./application.conf");
    cfg.system_components(
        DeadletterBox::new,
        NetworkConfig::new(client_socket).build(),
    );

    let system = cfg.build().expect("KompactSystem");

    let bootstrap_service: ActorPath = NamedPath::with_socket(
        Transport::Tcp,
        bootstrap_socket,
        vec![BOOTSTRAP_PATH.into()],
    )
    .into();

    let printer = system.create(TrustPrinter::new);
    let (detector, registration) =
        system.create_and_register(|| EventualLeaderElector::new(bootstrap_service));
    biconnect_components::<EventualLeaderDetection, _, _>(&detector, &printer).expect("connection");
    let _path = registration.wait_expect(Duration::from_millis(1000), "detector never registered");
    system.start(&printer);
    system.start(&detector);

    system
}

#[cfg(test)]
mod tests {
    use super::*;

    const SERVER_SOCKET: &str = "127.0.0.1:12345";
    const CLIENT_SOCKET: &str = "127.0.0.1:0";

    #[test]
    fn test_bootstrapping() {
        let server_socket: SocketAddr = SERVER_SOCKET.parse().unwrap();
        let server_system = run_server(server_socket);
        let client_socket: SocketAddr = CLIENT_SOCKET.parse().unwrap();
        let mut clients_systems: Vec<KompactSystem> = (0..3)
            .map(|_i| run_client(server_socket, client_socket))
            .collect();
        // let them settle
        std::thread::sleep(Duration::from_millis(1000));
        // shut down systems one by one
        for sys in clients_systems.drain(..) {
            std::thread::sleep(Duration::from_millis(1000));
            sys.shutdown().expect("shutdown");
        }
        std::thread::sleep(Duration::from_millis(1000));
        server_system.shutdown().expect("shutdown");
    }
}

We also need to alter our leader elector a bit. First it needs to know the actor path of the bootstrap server, so it can actually check in. And second, we need to adapt the type of processes to be in line with our changes to UpdateProcesses. We’ll make it a Box<[ActorPath]> instead of Arc<[ActorPath]> and do the conversion from Vec<ActorPath> whenever we receive an update.

#![allow(clippy::unused_unit)]
use kompact::{prelude::*, serde_serialisers::*};
use kompact_examples::trusting::*;
use serde::{Deserialize, Serialize};
use std::{
    collections::HashSet,
    net::{IpAddr, Ipv4Addr, SocketAddr},
    time::Duration,
};

#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
struct CheckIn;
impl SerialisationId for CheckIn {
    const SER_ID: SerId = 2345;
}

#[derive(Serialize, Deserialize, Debug, Clone)]
struct UpdateProcesses(Vec<ActorPath>);
impl SerialisationId for UpdateProcesses {
    const SER_ID: SerId = 3456;
}

#[derive(ComponentDefinition)]
struct BootstrapServer {
    ctx: ComponentContext<Self>,
    processes: HashSet<ActorPath>,
}
impl BootstrapServer {
    fn new() -> Self {
        BootstrapServer {
            ctx: ComponentContext::uninitialised(),
            processes: HashSet::new(),
        }
    }


    fn broadcast_processess(&self) -> () {
        let procs: Vec<ActorPath> = self.processes.iter().cloned().collect();
        let msg = UpdateProcesses(procs);
        self.processes.iter().for_each(|process| {
            process.tell((msg.clone(), Serde), self);
        });
    }
}
ignore_lifecycle!(BootstrapServer);
impl NetworkActor for BootstrapServer {
    type Deserialiser = Serde;
    type Message = CheckIn;

    fn receive(&mut self, source: Option<ActorPath>, _msg: Self::Message) -> Handled {
        if let Some(process) = source {
            if self.processes.insert(process) {
                self.broadcast_processess();
            }
        }
        Handled::Ok
    }
}

#[derive(ComponentDefinition)]
struct EventualLeaderElector {
    ctx: ComponentContext<Self>,
    omega_port: ProvidedPort<EventualLeaderDetection>,
    bootstrap_server: ActorPath,
    processes: Box<[ActorPath]>,
    candidates: HashSet<ActorPath>,
    period: Duration,
    delta: Duration,
    timer_handle: Option<ScheduledTimer>,
    leader: Option<ActorPath>,
}
impl EventualLeaderElector {
    fn new(bootstrap_server: ActorPath) -> Self {
        let minimal_period = Duration::from_millis(1);
        EventualLeaderElector {
            ctx: ComponentContext::uninitialised(),
            omega_port: ProvidedPort::uninitialised(),
            bootstrap_server,
            processes: Vec::new().into_boxed_slice(),
            candidates: HashSet::new(),
            period: minimal_period,
            delta: minimal_period,
            timer_handle: None,
            leader: None,
        }
    }


    fn select_leader(&mut self) -> Option<ActorPath> {
        let mut candidates: Vec<ActorPath> = self.candidates.drain().collect();
        candidates.sort_unstable();
        candidates.reverse(); // pick smallest instead of largest
        candidates.pop()
    }

    fn handle_timeout(&mut self, timeout_id: ScheduledTimer) -> Handled {
        match self.timer_handle.take() {
            Some(timeout) if timeout == timeout_id => {
                let new_leader = self.select_leader();
                if new_leader != self.leader {
                    self.period += self.delta;
                    self.leader = new_leader;
                    if let Some(ref leader) = self.leader {
                        self.omega_port.trigger(Trust(leader.clone()));
                    }
                    self.cancel_timer(timeout);
                    let new_timer =
                        self.schedule_periodic(self.period, self.period, Self::handle_timeout);
                    self.timer_handle = Some(new_timer);
                } else {
                    // just put it back
                    self.timer_handle = Some(timeout);
                }
                self.send_heartbeats();
                Handled::Ok
            }
            Some(_) => Handled::Ok, // just ignore outdated timeouts
            None => {
                warn!(self.log(), "Got unexpected timeout: {:?}", timeout_id);
                Handled::Ok
            } // can happen during restart or teardown
        }
    }

    fn send_heartbeats(&self) {
        self.processes.iter().for_each(|process| {
            process.tell((Heartbeat, Serde), self);
        });
    }
}

impl ComponentLifecycle for EventualLeaderElector {
    fn on_start(&mut self) -> Handled {
        self.bootstrap_server.tell((CheckIn, Serde), self);

        self.period = self.ctx.config()["omega"]["initial-period"]
            .as_duration()
            .expect("initial period");
        self.delta = self.ctx.config()["omega"]["delta"]
            .as_duration()
            .expect("delta");
        let timeout = self.schedule_periodic(self.period, self.period, Self::handle_timeout);
        self.timer_handle = Some(timeout);
        Handled::Ok
    }

    fn on_stop(&mut self) -> Handled {
        if let Some(timeout) = self.timer_handle.take() {
            self.cancel_timer(timeout);
        }
        Handled::Ok
    }

    fn on_kill(&mut self) -> Handled {
        self.on_stop()
    }
}

// Doesn't have any requests
ignore_requests!(EventualLeaderDetection, EventualLeaderElector);

impl Actor for EventualLeaderElector {
    type Message = Never;

    fn receive_local(&mut self, _msg: Self::Message) -> Handled {
        unreachable!();
    }

    fn receive_network(&mut self, msg: NetMessage) -> Handled {
        let sender = msg.sender;

        match_deser! {
            (msg.data) {
                msg(_heartbeat): Heartbeat [using Serde] => {
                    self.candidates.insert(sender);
                },
                msg(UpdateProcesses(processes)): UpdateProcesses [using Serde] => {
                    info!(
                        self.log(),
                        "Received new process set with {} processes",
                        processes.len()
                    );
                    self.processes = processes.into_boxed_slice();
                },
            }
        };
        Handled::Ok
    }
}

pub fn main() {
    let args: Vec<String> = std::env::args().collect();
    match args.len() {
        2 => {
            let bootstrap_port: u16 = args[1].parse().expect("port number");
            let bootstrap_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), bootstrap_port);
            let system = run_server(bootstrap_socket);
            system.await_termination(); // gotta quit it from command line
        }
        3 => {
            let bootstrap_port: u16 = args[1].parse().expect("port number");
            let bootstrap_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), bootstrap_port);
            let client_port: u16 = args[2].parse().expect("port number");
            let client_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), client_port);
            let system = run_client(bootstrap_socket, client_socket);
            system.await_termination(); // gotta quit it from command line
        }
        x => panic!("Expected either 1 argument (the port for the bootstrap server to bind on) or 2 arguments (boostrap server and client port), but got {} instead!", x-1),
    }
}

const BOOTSTRAP_PATH: &str = "bootstrap";

pub fn run_server(socket: SocketAddr) -> KompactSystem {
    let mut cfg = KompactConfig::default();
    cfg.load_config_file("./application.conf");
    cfg.system_components(DeadletterBox::new, NetworkConfig::new(socket).build());

    let system = cfg.build().expect("KompactSystem");

    let (bootstrap, bootstrap_registration) = system.create_and_register(BootstrapServer::new);
    let bootstrap_service_registration = system.register_by_alias(&bootstrap, BOOTSTRAP_PATH);

    let _bootstrap_unique = bootstrap_registration
        .wait_expect(Duration::from_millis(1000), "bootstrap never registered");
    let bootstrap_service = bootstrap_service_registration
        .wait_expect(Duration::from_millis(1000), "bootstrap never registered");
    system.start(&bootstrap);

    let printer = system.create(TrustPrinter::new);
    let (detector, registration) =
        system.create_and_register(|| EventualLeaderElector::new(bootstrap_service));
    biconnect_components::<EventualLeaderDetection, _, _>(&detector, &printer).expect("connection");
    let _path = registration.wait_expect(Duration::from_millis(1000), "detector never registered");
    system.start(&printer);
    system.start(&detector);

    system
}

pub fn run_client(bootstrap_socket: SocketAddr, client_socket: SocketAddr) -> KompactSystem {
    let mut cfg = KompactConfig::default();
    cfg.load_config_file("./application.conf");
    cfg.system_components(
        DeadletterBox::new,
        NetworkConfig::new(client_socket).build(),
    );

    let system = cfg.build().expect("KompactSystem");

    let bootstrap_service: ActorPath = NamedPath::with_socket(
        Transport::Tcp,
        bootstrap_socket,
        vec![BOOTSTRAP_PATH.into()],
    )
    .into();

    let printer = system.create(TrustPrinter::new);
    let (detector, registration) =
        system.create_and_register(|| EventualLeaderElector::new(bootstrap_service));
    biconnect_components::<EventualLeaderDetection, _, _>(&detector, &printer).expect("connection");
    let _path = registration.wait_expect(Duration::from_millis(1000), "detector never registered");
    system.start(&printer);
    system.start(&detector);

    system
}

#[cfg(test)]
mod tests {
    use super::*;

    const SERVER_SOCKET: &str = "127.0.0.1:12345";
    const CLIENT_SOCKET: &str = "127.0.0.1:0";

    #[test]
    fn test_bootstrapping() {
        let server_socket: SocketAddr = SERVER_SOCKET.parse().unwrap();
        let server_system = run_server(server_socket);
        let client_socket: SocketAddr = CLIENT_SOCKET.parse().unwrap();
        let mut clients_systems: Vec<KompactSystem> = (0..3)
            .map(|_i| run_client(server_socket, client_socket))
            .collect();
        // let them settle
        std::thread::sleep(Duration::from_millis(1000));
        // shut down systems one by one
        for sys in clients_systems.drain(..) {
            std::thread::sleep(Duration::from_millis(1000));
            sys.shutdown().expect("shutdown");
        }
        std::thread::sleep(Duration::from_millis(1000));
        server_system.shutdown().expect("shutdown");
    }
}

Behaviours

The behaviour of the bootstrap server is very simple. Whenever it gets a CheckIn, it adds the source of the message to its process set and then broadcasts the new process set to every process in the set. We will use the NetworkActor trait to implement the actor part here instead of Actor. NetworkActor is a convenience trait for actors that handle the same set of messages locally and remotely and ignore all other remote messages. It handles the deserialisation part for us, but we must tell it both the Message type and the Deserialiser type to use. Of course, in this case we don’t actually do anything for local messages, since we only need the sender and local messages simply don’t have a sender attached.

#![allow(clippy::unused_unit)]
use kompact::{prelude::*, serde_serialisers::*};
use kompact_examples::trusting::*;
use serde::{Deserialize, Serialize};
use std::{
    collections::HashSet,
    net::{IpAddr, Ipv4Addr, SocketAddr},
    time::Duration,
};

#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
struct CheckIn;
impl SerialisationId for CheckIn {
    const SER_ID: SerId = 2345;
}

#[derive(Serialize, Deserialize, Debug, Clone)]
struct UpdateProcesses(Vec<ActorPath>);
impl SerialisationId for UpdateProcesses {
    const SER_ID: SerId = 3456;
}

#[derive(ComponentDefinition)]
struct BootstrapServer {
    ctx: ComponentContext<Self>,
    processes: HashSet<ActorPath>,
}
impl BootstrapServer {
    fn new() -> Self {
        BootstrapServer {
            ctx: ComponentContext::uninitialised(),
            processes: HashSet::new(),
        }
    }


    fn broadcast_processess(&self) -> () {
        let procs: Vec<ActorPath> = self.processes.iter().cloned().collect();
        let msg = UpdateProcesses(procs);
        self.processes.iter().for_each(|process| {
            process.tell((msg.clone(), Serde), self);
        });
    }
}
ignore_lifecycle!(BootstrapServer);
impl NetworkActor for BootstrapServer {
    type Deserialiser = Serde;
    type Message = CheckIn;

    fn receive(&mut self, source: Option<ActorPath>, _msg: Self::Message) -> Handled {
        if let Some(process) = source {
            if self.processes.insert(process) {
                self.broadcast_processess();
            }
        }
        Handled::Ok
    }
}

#[derive(ComponentDefinition)]
struct EventualLeaderElector {
    ctx: ComponentContext<Self>,
    omega_port: ProvidedPort<EventualLeaderDetection>,
    bootstrap_server: ActorPath,
    processes: Box<[ActorPath]>,
    candidates: HashSet<ActorPath>,
    period: Duration,
    delta: Duration,
    timer_handle: Option<ScheduledTimer>,
    leader: Option<ActorPath>,
}
impl EventualLeaderElector {
    fn new(bootstrap_server: ActorPath) -> Self {
        let minimal_period = Duration::from_millis(1);
        EventualLeaderElector {
            ctx: ComponentContext::uninitialised(),
            omega_port: ProvidedPort::uninitialised(),
            bootstrap_server,
            processes: Vec::new().into_boxed_slice(),
            candidates: HashSet::new(),
            period: minimal_period,
            delta: minimal_period,
            timer_handle: None,
            leader: None,
        }
    }


    fn select_leader(&mut self) -> Option<ActorPath> {
        let mut candidates: Vec<ActorPath> = self.candidates.drain().collect();
        candidates.sort_unstable();
        candidates.reverse(); // pick smallest instead of largest
        candidates.pop()
    }

    fn handle_timeout(&mut self, timeout_id: ScheduledTimer) -> Handled {
        match self.timer_handle.take() {
            Some(timeout) if timeout == timeout_id => {
                let new_leader = self.select_leader();
                if new_leader != self.leader {
                    self.period += self.delta;
                    self.leader = new_leader;
                    if let Some(ref leader) = self.leader {
                        self.omega_port.trigger(Trust(leader.clone()));
                    }
                    self.cancel_timer(timeout);
                    let new_timer =
                        self.schedule_periodic(self.period, self.period, Self::handle_timeout);
                    self.timer_handle = Some(new_timer);
                } else {
                    // just put it back
                    self.timer_handle = Some(timeout);
                }
                self.send_heartbeats();
                Handled::Ok
            }
            Some(_) => Handled::Ok, // just ignore outdated timeouts
            None => {
                warn!(self.log(), "Got unexpected timeout: {:?}", timeout_id);
                Handled::Ok
            } // can happen during restart or teardown
        }
    }

    fn send_heartbeats(&self) {
        self.processes.iter().for_each(|process| {
            process.tell((Heartbeat, Serde), self);
        });
    }
}

impl ComponentLifecycle for EventualLeaderElector {
    fn on_start(&mut self) -> Handled {
        self.bootstrap_server.tell((CheckIn, Serde), self);

        self.period = self.ctx.config()["omega"]["initial-period"]
            .as_duration()
            .expect("initial period");
        self.delta = self.ctx.config()["omega"]["delta"]
            .as_duration()
            .expect("delta");
        let timeout = self.schedule_periodic(self.period, self.period, Self::handle_timeout);
        self.timer_handle = Some(timeout);
        Handled::Ok
    }

    fn on_stop(&mut self) -> Handled {
        if let Some(timeout) = self.timer_handle.take() {
            self.cancel_timer(timeout);
        }
        Handled::Ok
    }

    fn on_kill(&mut self) -> Handled {
        self.on_stop()
    }
}

// Doesn't have any requests
ignore_requests!(EventualLeaderDetection, EventualLeaderElector);

impl Actor for EventualLeaderElector {
    type Message = Never;

    fn receive_local(&mut self, _msg: Self::Message) -> Handled {
        unreachable!();
    }

    fn receive_network(&mut self, msg: NetMessage) -> Handled {
        let sender = msg.sender;

        match_deser! {
            (msg.data) {
                msg(_heartbeat): Heartbeat [using Serde] => {
                    self.candidates.insert(sender);
                },
                msg(UpdateProcesses(processes)): UpdateProcesses [using Serde] => {
                    info!(
                        self.log(),
                        "Received new process set with {} processes",
                        processes.len()
                    );
                    self.processes = processes.into_boxed_slice();
                },
            }
        };
        Handled::Ok
    }
}

pub fn main() {
    let args: Vec<String> = std::env::args().collect();
    match args.len() {
        2 => {
            let bootstrap_port: u16 = args[1].parse().expect("port number");
            let bootstrap_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), bootstrap_port);
            let system = run_server(bootstrap_socket);
            system.await_termination(); // gotta quit it from command line
        }
        3 => {
            let bootstrap_port: u16 = args[1].parse().expect("port number");
            let bootstrap_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), bootstrap_port);
            let client_port: u16 = args[2].parse().expect("port number");
            let client_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), client_port);
            let system = run_client(bootstrap_socket, client_socket);
            system.await_termination(); // gotta quit it from command line
        }
        x => panic!("Expected either 1 argument (the port for the bootstrap server to bind on) or 2 arguments (boostrap server and client port), but got {} instead!", x-1),
    }
}

const BOOTSTRAP_PATH: &str = "bootstrap";

pub fn run_server(socket: SocketAddr) -> KompactSystem {
    let mut cfg = KompactConfig::default();
    cfg.load_config_file("./application.conf");
    cfg.system_components(DeadletterBox::new, NetworkConfig::new(socket).build());

    let system = cfg.build().expect("KompactSystem");

    let (bootstrap, bootstrap_registration) = system.create_and_register(BootstrapServer::new);
    let bootstrap_service_registration = system.register_by_alias(&bootstrap, BOOTSTRAP_PATH);

    let _bootstrap_unique = bootstrap_registration
        .wait_expect(Duration::from_millis(1000), "bootstrap never registered");
    let bootstrap_service = bootstrap_service_registration
        .wait_expect(Duration::from_millis(1000), "bootstrap never registered");
    system.start(&bootstrap);

    let printer = system.create(TrustPrinter::new);
    let (detector, registration) =
        system.create_and_register(|| EventualLeaderElector::new(bootstrap_service));
    biconnect_components::<EventualLeaderDetection, _, _>(&detector, &printer).expect("connection");
    let _path = registration.wait_expect(Duration::from_millis(1000), "detector never registered");
    system.start(&printer);
    system.start(&detector);

    system
}

pub fn run_client(bootstrap_socket: SocketAddr, client_socket: SocketAddr) -> KompactSystem {
    let mut cfg = KompactConfig::default();
    cfg.load_config_file("./application.conf");
    cfg.system_components(
        DeadletterBox::new,
        NetworkConfig::new(client_socket).build(),
    );

    let system = cfg.build().expect("KompactSystem");

    let bootstrap_service: ActorPath = NamedPath::with_socket(
        Transport::Tcp,
        bootstrap_socket,
        vec![BOOTSTRAP_PATH.into()],
    )
    .into();

    let printer = system.create(TrustPrinter::new);
    let (detector, registration) =
        system.create_and_register(|| EventualLeaderElector::new(bootstrap_service));
    biconnect_components::<EventualLeaderDetection, _, _>(&detector, &printer).expect("connection");
    let _path = registration.wait_expect(Duration::from_millis(1000), "detector never registered");
    system.start(&printer);
    system.start(&detector);

    system
}

#[cfg(test)]
mod tests {
    use super::*;

    const SERVER_SOCKET: &str = "127.0.0.1:12345";
    const CLIENT_SOCKET: &str = "127.0.0.1:0";

    #[test]
    fn test_bootstrapping() {
        let server_socket: SocketAddr = SERVER_SOCKET.parse().unwrap();
        let server_system = run_server(server_socket);
        let client_socket: SocketAddr = CLIENT_SOCKET.parse().unwrap();
        let mut clients_systems: Vec<KompactSystem> = (0..3)
            .map(|_i| run_client(server_socket, client_socket))
            .collect();
        // let them settle
        std::thread::sleep(Duration::from_millis(1000));
        // shut down systems one by one
        for sys in clients_systems.drain(..) {
            std::thread::sleep(Duration::from_millis(1000));
            sys.shutdown().expect("shutdown");
        }
        std::thread::sleep(Duration::from_millis(1000));
        server_system.shutdown().expect("shutdown");
    }
}

We must also make some small changes to the behaviour of the leader elector itself. First of all we must now send the CheckIn when we are being started. As before we are using Serde as a serialisation mechanism, so we really only have to add the following line to the on_start function:

#![allow(clippy::unused_unit)]
use kompact::{prelude::*, serde_serialisers::*};
use kompact_examples::trusting::*;
use serde::{Deserialize, Serialize};
use std::{
    collections::HashSet,
    net::{IpAddr, Ipv4Addr, SocketAddr},
    time::Duration,
};

#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
struct CheckIn;
impl SerialisationId for CheckIn {
    const SER_ID: SerId = 2345;
}

#[derive(Serialize, Deserialize, Debug, Clone)]
struct UpdateProcesses(Vec<ActorPath>);
impl SerialisationId for UpdateProcesses {
    const SER_ID: SerId = 3456;
}

#[derive(ComponentDefinition)]
struct BootstrapServer {
    ctx: ComponentContext<Self>,
    processes: HashSet<ActorPath>,
}
impl BootstrapServer {
    fn new() -> Self {
        BootstrapServer {
            ctx: ComponentContext::uninitialised(),
            processes: HashSet::new(),
        }
    }


    fn broadcast_processess(&self) -> () {
        let procs: Vec<ActorPath> = self.processes.iter().cloned().collect();
        let msg = UpdateProcesses(procs);
        self.processes.iter().for_each(|process| {
            process.tell((msg.clone(), Serde), self);
        });
    }
}
ignore_lifecycle!(BootstrapServer);
impl NetworkActor for BootstrapServer {
    type Deserialiser = Serde;
    type Message = CheckIn;

    fn receive(&mut self, source: Option<ActorPath>, _msg: Self::Message) -> Handled {
        if let Some(process) = source {
            if self.processes.insert(process) {
                self.broadcast_processess();
            }
        }
        Handled::Ok
    }
}

#[derive(ComponentDefinition)]
struct EventualLeaderElector {
    ctx: ComponentContext<Self>,
    omega_port: ProvidedPort<EventualLeaderDetection>,
    bootstrap_server: ActorPath,
    processes: Box<[ActorPath]>,
    candidates: HashSet<ActorPath>,
    period: Duration,
    delta: Duration,
    timer_handle: Option<ScheduledTimer>,
    leader: Option<ActorPath>,
}
impl EventualLeaderElector {
    fn new(bootstrap_server: ActorPath) -> Self {
        let minimal_period = Duration::from_millis(1);
        EventualLeaderElector {
            ctx: ComponentContext::uninitialised(),
            omega_port: ProvidedPort::uninitialised(),
            bootstrap_server,
            processes: Vec::new().into_boxed_slice(),
            candidates: HashSet::new(),
            period: minimal_period,
            delta: minimal_period,
            timer_handle: None,
            leader: None,
        }
    }


    fn select_leader(&mut self) -> Option<ActorPath> {
        let mut candidates: Vec<ActorPath> = self.candidates.drain().collect();
        candidates.sort_unstable();
        candidates.reverse(); // pick smallest instead of largest
        candidates.pop()
    }

    fn handle_timeout(&mut self, timeout_id: ScheduledTimer) -> Handled {
        match self.timer_handle.take() {
            Some(timeout) if timeout == timeout_id => {
                let new_leader = self.select_leader();
                if new_leader != self.leader {
                    self.period += self.delta;
                    self.leader = new_leader;
                    if let Some(ref leader) = self.leader {
                        self.omega_port.trigger(Trust(leader.clone()));
                    }
                    self.cancel_timer(timeout);
                    let new_timer =
                        self.schedule_periodic(self.period, self.period, Self::handle_timeout);
                    self.timer_handle = Some(new_timer);
                } else {
                    // just put it back
                    self.timer_handle = Some(timeout);
                }
                self.send_heartbeats();
                Handled::Ok
            }
            Some(_) => Handled::Ok, // just ignore outdated timeouts
            None => {
                warn!(self.log(), "Got unexpected timeout: {:?}", timeout_id);
                Handled::Ok
            } // can happen during restart or teardown
        }
    }

    fn send_heartbeats(&self) {
        self.processes.iter().for_each(|process| {
            process.tell((Heartbeat, Serde), self);
        });
    }
}

impl ComponentLifecycle for EventualLeaderElector {
    fn on_start(&mut self) -> Handled {
        self.bootstrap_server.tell((CheckIn, Serde), self);

        self.period = self.ctx.config()["omega"]["initial-period"]
            .as_duration()
            .expect("initial period");
        self.delta = self.ctx.config()["omega"]["delta"]
            .as_duration()
            .expect("delta");
        let timeout = self.schedule_periodic(self.period, self.period, Self::handle_timeout);
        self.timer_handle = Some(timeout);
        Handled::Ok
    }

    fn on_stop(&mut self) -> Handled {
        if let Some(timeout) = self.timer_handle.take() {
            self.cancel_timer(timeout);
        }
        Handled::Ok
    }

    fn on_kill(&mut self) -> Handled {
        self.on_stop()
    }
}

// Doesn't have any requests
ignore_requests!(EventualLeaderDetection, EventualLeaderElector);

impl Actor for EventualLeaderElector {
    type Message = Never;

    fn receive_local(&mut self, _msg: Self::Message) -> Handled {
        unreachable!();
    }

    fn receive_network(&mut self, msg: NetMessage) -> Handled {
        let sender = msg.sender;

        match_deser! {
            (msg.data) {
                msg(_heartbeat): Heartbeat [using Serde] => {
                    self.candidates.insert(sender);
                },
                msg(UpdateProcesses(processes)): UpdateProcesses [using Serde] => {
                    info!(
                        self.log(),
                        "Received new process set with {} processes",
                        processes.len()
                    );
                    self.processes = processes.into_boxed_slice();
                },
            }
        };
        Handled::Ok
    }
}

pub fn main() {
    let args: Vec<String> = std::env::args().collect();
    match args.len() {
        2 => {
            let bootstrap_port: u16 = args[1].parse().expect("port number");
            let bootstrap_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), bootstrap_port);
            let system = run_server(bootstrap_socket);
            system.await_termination(); // gotta quit it from command line
        }
        3 => {
            let bootstrap_port: u16 = args[1].parse().expect("port number");
            let bootstrap_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), bootstrap_port);
            let client_port: u16 = args[2].parse().expect("port number");
            let client_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), client_port);
            let system = run_client(bootstrap_socket, client_socket);
            system.await_termination(); // gotta quit it from command line
        }
        x => panic!("Expected either 1 argument (the port for the bootstrap server to bind on) or 2 arguments (boostrap server and client port), but got {} instead!", x-1),
    }
}

const BOOTSTRAP_PATH: &str = "bootstrap";

pub fn run_server(socket: SocketAddr) -> KompactSystem {
    let mut cfg = KompactConfig::default();
    cfg.load_config_file("./application.conf");
    cfg.system_components(DeadletterBox::new, NetworkConfig::new(socket).build());

    let system = cfg.build().expect("KompactSystem");

    let (bootstrap, bootstrap_registration) = system.create_and_register(BootstrapServer::new);
    let bootstrap_service_registration = system.register_by_alias(&bootstrap, BOOTSTRAP_PATH);

    let _bootstrap_unique = bootstrap_registration
        .wait_expect(Duration::from_millis(1000), "bootstrap never registered");
    let bootstrap_service = bootstrap_service_registration
        .wait_expect(Duration::from_millis(1000), "bootstrap never registered");
    system.start(&bootstrap);

    let printer = system.create(TrustPrinter::new);
    let (detector, registration) =
        system.create_and_register(|| EventualLeaderElector::new(bootstrap_service));
    biconnect_components::<EventualLeaderDetection, _, _>(&detector, &printer).expect("connection");
    let _path = registration.wait_expect(Duration::from_millis(1000), "detector never registered");
    system.start(&printer);
    system.start(&detector);

    system
}

pub fn run_client(bootstrap_socket: SocketAddr, client_socket: SocketAddr) -> KompactSystem {
    let mut cfg = KompactConfig::default();
    cfg.load_config_file("./application.conf");
    cfg.system_components(
        DeadletterBox::new,
        NetworkConfig::new(client_socket).build(),
    );

    let system = cfg.build().expect("KompactSystem");

    let bootstrap_service: ActorPath = NamedPath::with_socket(
        Transport::Tcp,
        bootstrap_socket,
        vec![BOOTSTRAP_PATH.into()],
    )
    .into();

    let printer = system.create(TrustPrinter::new);
    let (detector, registration) =
        system.create_and_register(|| EventualLeaderElector::new(bootstrap_service));
    biconnect_components::<EventualLeaderDetection, _, _>(&detector, &printer).expect("connection");
    let _path = registration.wait_expect(Duration::from_millis(1000), "detector never registered");
    system.start(&printer);
    system.start(&detector);

    system
}

#[cfg(test)]
mod tests {
    use super::*;

    const SERVER_SOCKET: &str = "127.0.0.1:12345";
    const CLIENT_SOCKET: &str = "127.0.0.1:0";

    #[test]
    fn test_bootstrapping() {
        let server_socket: SocketAddr = SERVER_SOCKET.parse().unwrap();
        let server_system = run_server(server_socket);
        let client_socket: SocketAddr = CLIENT_SOCKET.parse().unwrap();
        let mut clients_systems: Vec<KompactSystem> = (0..3)
            .map(|_i| run_client(server_socket, client_socket))
            .collect();
        // let them settle
        std::thread::sleep(Duration::from_millis(1000));
        // shut down systems one by one
        for sys in clients_systems.drain(..) {
            std::thread::sleep(Duration::from_millis(1000));
            sys.shutdown().expect("shutdown");
        }
        std::thread::sleep(Duration::from_millis(1000));
        server_system.shutdown().expect("shutdown");
    }
}

We also have to change how we handle UpdateProcesses slightly, since they are now coming in over the network. We thus have to move the code from receive_local to receive_network. But now we have two different possible network messages we could deserialise whenever we get a NetMessage: It could either be a Heartbeat or an UpdateProcesses. Since trying through them individually one by one is somewhat inefficient, what we really want is something like this:

match msg.ser_id() {
	Heartbeat::SER_ID => // deserialise and handle Heartbeat
	UpdateProcesses::SER_ID => // deserialise and handle UpdateProcesses
}

Kompact provides the match_deser! macro to generate code like the above, since this is very common behaviour and writing it manually gets somewhat tedious eventually. The overall syntax for the macro is:

match_deser! {
	(<message expression>) {
		<message case 1>,
		<message case 2>,
		...
	}
}

Here <message expression> is an expression that gives the message (data) to be deserialised. If the expression is simply an identifier like msg then the parenthesis may be elided. The syntax for each different message case in the macro is basically:

msg(variable_name): MessageType [using DeserialiserType] => <body>

For cases where MessageType = DeserialiserType the [using DeserialiserType] block can be elided. There are also default and error branches available for the macro, an example of which can be see in the API docs. It is also possible to immediately destructure the deserialised message by replacing variable_name with a pattern, as can be seen in the case of UpdateProcesses below.

Using this macro, our new actor implementation becomes the following:

#![allow(clippy::unused_unit)]
use kompact::{prelude::*, serde_serialisers::*};
use kompact_examples::trusting::*;
use serde::{Deserialize, Serialize};
use std::{
    collections::HashSet,
    net::{IpAddr, Ipv4Addr, SocketAddr},
    time::Duration,
};

#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
struct CheckIn;
impl SerialisationId for CheckIn {
    const SER_ID: SerId = 2345;
}

#[derive(Serialize, Deserialize, Debug, Clone)]
struct UpdateProcesses(Vec<ActorPath>);
impl SerialisationId for UpdateProcesses {
    const SER_ID: SerId = 3456;
}

#[derive(ComponentDefinition)]
struct BootstrapServer {
    ctx: ComponentContext<Self>,
    processes: HashSet<ActorPath>,
}
impl BootstrapServer {
    fn new() -> Self {
        BootstrapServer {
            ctx: ComponentContext::uninitialised(),
            processes: HashSet::new(),
        }
    }


    fn broadcast_processess(&self) -> () {
        let procs: Vec<ActorPath> = self.processes.iter().cloned().collect();
        let msg = UpdateProcesses(procs);
        self.processes.iter().for_each(|process| {
            process.tell((msg.clone(), Serde), self);
        });
    }
}
ignore_lifecycle!(BootstrapServer);
impl NetworkActor for BootstrapServer {
    type Deserialiser = Serde;
    type Message = CheckIn;

    fn receive(&mut self, source: Option<ActorPath>, _msg: Self::Message) -> Handled {
        if let Some(process) = source {
            if self.processes.insert(process) {
                self.broadcast_processess();
            }
        }
        Handled::Ok
    }
}

#[derive(ComponentDefinition)]
struct EventualLeaderElector {
    ctx: ComponentContext<Self>,
    omega_port: ProvidedPort<EventualLeaderDetection>,
    bootstrap_server: ActorPath,
    processes: Box<[ActorPath]>,
    candidates: HashSet<ActorPath>,
    period: Duration,
    delta: Duration,
    timer_handle: Option<ScheduledTimer>,
    leader: Option<ActorPath>,
}
impl EventualLeaderElector {
    fn new(bootstrap_server: ActorPath) -> Self {
        let minimal_period = Duration::from_millis(1);
        EventualLeaderElector {
            ctx: ComponentContext::uninitialised(),
            omega_port: ProvidedPort::uninitialised(),
            bootstrap_server,
            processes: Vec::new().into_boxed_slice(),
            candidates: HashSet::new(),
            period: minimal_period,
            delta: minimal_period,
            timer_handle: None,
            leader: None,
        }
    }


    fn select_leader(&mut self) -> Option<ActorPath> {
        let mut candidates: Vec<ActorPath> = self.candidates.drain().collect();
        candidates.sort_unstable();
        candidates.reverse(); // pick smallest instead of largest
        candidates.pop()
    }

    fn handle_timeout(&mut self, timeout_id: ScheduledTimer) -> Handled {
        match self.timer_handle.take() {
            Some(timeout) if timeout == timeout_id => {
                let new_leader = self.select_leader();
                if new_leader != self.leader {
                    self.period += self.delta;
                    self.leader = new_leader;
                    if let Some(ref leader) = self.leader {
                        self.omega_port.trigger(Trust(leader.clone()));
                    }
                    self.cancel_timer(timeout);
                    let new_timer =
                        self.schedule_periodic(self.period, self.period, Self::handle_timeout);
                    self.timer_handle = Some(new_timer);
                } else {
                    // just put it back
                    self.timer_handle = Some(timeout);
                }
                self.send_heartbeats();
                Handled::Ok
            }
            Some(_) => Handled::Ok, // just ignore outdated timeouts
            None => {
                warn!(self.log(), "Got unexpected timeout: {:?}", timeout_id);
                Handled::Ok
            } // can happen during restart or teardown
        }
    }

    fn send_heartbeats(&self) {
        self.processes.iter().for_each(|process| {
            process.tell((Heartbeat, Serde), self);
        });
    }
}

impl ComponentLifecycle for EventualLeaderElector {
    fn on_start(&mut self) -> Handled {
        self.bootstrap_server.tell((CheckIn, Serde), self);

        self.period = self.ctx.config()["omega"]["initial-period"]
            .as_duration()
            .expect("initial period");
        self.delta = self.ctx.config()["omega"]["delta"]
            .as_duration()
            .expect("delta");
        let timeout = self.schedule_periodic(self.period, self.period, Self::handle_timeout);
        self.timer_handle = Some(timeout);
        Handled::Ok
    }

    fn on_stop(&mut self) -> Handled {
        if let Some(timeout) = self.timer_handle.take() {
            self.cancel_timer(timeout);
        }
        Handled::Ok
    }

    fn on_kill(&mut self) -> Handled {
        self.on_stop()
    }
}

// Doesn't have any requests
ignore_requests!(EventualLeaderDetection, EventualLeaderElector);

impl Actor for EventualLeaderElector {
    type Message = Never;

    fn receive_local(&mut self, _msg: Self::Message) -> Handled {
        unreachable!();
    }

    fn receive_network(&mut self, msg: NetMessage) -> Handled {
        let sender = msg.sender;

        match_deser! {
            (msg.data) {
                msg(_heartbeat): Heartbeat [using Serde] => {
                    self.candidates.insert(sender);
                },
                msg(UpdateProcesses(processes)): UpdateProcesses [using Serde] => {
                    info!(
                        self.log(),
                        "Received new process set with {} processes",
                        processes.len()
                    );
                    self.processes = processes.into_boxed_slice();
                },
            }
        };
        Handled::Ok
    }
}

pub fn main() {
    let args: Vec<String> = std::env::args().collect();
    match args.len() {
        2 => {
            let bootstrap_port: u16 = args[1].parse().expect("port number");
            let bootstrap_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), bootstrap_port);
            let system = run_server(bootstrap_socket);
            system.await_termination(); // gotta quit it from command line
        }
        3 => {
            let bootstrap_port: u16 = args[1].parse().expect("port number");
            let bootstrap_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), bootstrap_port);
            let client_port: u16 = args[2].parse().expect("port number");
            let client_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), client_port);
            let system = run_client(bootstrap_socket, client_socket);
            system.await_termination(); // gotta quit it from command line
        }
        x => panic!("Expected either 1 argument (the port for the bootstrap server to bind on) or 2 arguments (boostrap server and client port), but got {} instead!", x-1),
    }
}

const BOOTSTRAP_PATH: &str = "bootstrap";

pub fn run_server(socket: SocketAddr) -> KompactSystem {
    let mut cfg = KompactConfig::default();
    cfg.load_config_file("./application.conf");
    cfg.system_components(DeadletterBox::new, NetworkConfig::new(socket).build());

    let system = cfg.build().expect("KompactSystem");

    let (bootstrap, bootstrap_registration) = system.create_and_register(BootstrapServer::new);
    let bootstrap_service_registration = system.register_by_alias(&bootstrap, BOOTSTRAP_PATH);

    let _bootstrap_unique = bootstrap_registration
        .wait_expect(Duration::from_millis(1000), "bootstrap never registered");
    let bootstrap_service = bootstrap_service_registration
        .wait_expect(Duration::from_millis(1000), "bootstrap never registered");
    system.start(&bootstrap);

    let printer = system.create(TrustPrinter::new);
    let (detector, registration) =
        system.create_and_register(|| EventualLeaderElector::new(bootstrap_service));
    biconnect_components::<EventualLeaderDetection, _, _>(&detector, &printer).expect("connection");
    let _path = registration.wait_expect(Duration::from_millis(1000), "detector never registered");
    system.start(&printer);
    system.start(&detector);

    system
}

pub fn run_client(bootstrap_socket: SocketAddr, client_socket: SocketAddr) -> KompactSystem {
    let mut cfg = KompactConfig::default();
    cfg.load_config_file("./application.conf");
    cfg.system_components(
        DeadletterBox::new,
        NetworkConfig::new(client_socket).build(),
    );

    let system = cfg.build().expect("KompactSystem");

    let bootstrap_service: ActorPath = NamedPath::with_socket(
        Transport::Tcp,
        bootstrap_socket,
        vec![BOOTSTRAP_PATH.into()],
    )
    .into();

    let printer = system.create(TrustPrinter::new);
    let (detector, registration) =
        system.create_and_register(|| EventualLeaderElector::new(bootstrap_service));
    biconnect_components::<EventualLeaderDetection, _, _>(&detector, &printer).expect("connection");
    let _path = registration.wait_expect(Duration::from_millis(1000), "detector never registered");
    system.start(&printer);
    system.start(&detector);

    system
}

#[cfg(test)]
mod tests {
    use super::*;

    const SERVER_SOCKET: &str = "127.0.0.1:12345";
    const CLIENT_SOCKET: &str = "127.0.0.1:0";

    #[test]
    fn test_bootstrapping() {
        let server_socket: SocketAddr = SERVER_SOCKET.parse().unwrap();
        let server_system = run_server(server_socket);
        let client_socket: SocketAddr = CLIENT_SOCKET.parse().unwrap();
        let mut clients_systems: Vec<KompactSystem> = (0..3)
            .map(|_i| run_client(server_socket, client_socket))
            .collect();
        // let them settle
        std::thread::sleep(Duration::from_millis(1000));
        // shut down systems one by one
        for sys in clients_systems.drain(..) {
            std::thread::sleep(Duration::from_millis(1000));
            sys.shutdown().expect("shutdown");
        }
        std::thread::sleep(Duration::from_millis(1000));
        server_system.shutdown().expect("shutdown");
    }
}

System

Now the real difference happens in the way we set up the Kompact systems. In the last section we set up a configurable number of systems that were all the same in the same process. Now we are only going to run a single system per process and we have two different setups as well: Most processes will be “clients” and only run the leader elector and the trust printer, but one process will additionally run the BootstrapServer.

Server

The one thing that sets our bootstrap server creation apart from any other actor we have created so far, is that we want a named actor path for it. Basically, we want any other process to be able to constuct a valid ActorPath instance for the bootstrap server, such as tcp://127.0.0.1:<port>/bootstrap , given only the port for it. In order to make Kompact resolve that path to the correct component we must do two things:

  1. Make sure that the Kompact system actually runs on localhost at the given port, and
  2. register a named path alias for the BootstrapServer with the name "bootstrap".

To achieve the first part, we create the NetworkDispatcher from a SocketAddr instance that contains the correct IP and port instead of using the default value as we did before. To register a component with a named path, we must call KompactSystem::register_by_alias(...) with the target component and the path to register. The rest is more or less as before.

#![allow(clippy::unused_unit)]
use kompact::{prelude::*, serde_serialisers::*};
use kompact_examples::trusting::*;
use serde::{Deserialize, Serialize};
use std::{
    collections::HashSet,
    net::{IpAddr, Ipv4Addr, SocketAddr},
    time::Duration,
};

#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
struct CheckIn;
impl SerialisationId for CheckIn {
    const SER_ID: SerId = 2345;
}

#[derive(Serialize, Deserialize, Debug, Clone)]
struct UpdateProcesses(Vec<ActorPath>);
impl SerialisationId for UpdateProcesses {
    const SER_ID: SerId = 3456;
}

#[derive(ComponentDefinition)]
struct BootstrapServer {
    ctx: ComponentContext<Self>,
    processes: HashSet<ActorPath>,
}
impl BootstrapServer {
    fn new() -> Self {
        BootstrapServer {
            ctx: ComponentContext::uninitialised(),
            processes: HashSet::new(),
        }
    }


    fn broadcast_processess(&self) -> () {
        let procs: Vec<ActorPath> = self.processes.iter().cloned().collect();
        let msg = UpdateProcesses(procs);
        self.processes.iter().for_each(|process| {
            process.tell((msg.clone(), Serde), self);
        });
    }
}
ignore_lifecycle!(BootstrapServer);
impl NetworkActor for BootstrapServer {
    type Deserialiser = Serde;
    type Message = CheckIn;

    fn receive(&mut self, source: Option<ActorPath>, _msg: Self::Message) -> Handled {
        if let Some(process) = source {
            if self.processes.insert(process) {
                self.broadcast_processess();
            }
        }
        Handled::Ok
    }
}

#[derive(ComponentDefinition)]
struct EventualLeaderElector {
    ctx: ComponentContext<Self>,
    omega_port: ProvidedPort<EventualLeaderDetection>,
    bootstrap_server: ActorPath,
    processes: Box<[ActorPath]>,
    candidates: HashSet<ActorPath>,
    period: Duration,
    delta: Duration,
    timer_handle: Option<ScheduledTimer>,
    leader: Option<ActorPath>,
}
impl EventualLeaderElector {
    fn new(bootstrap_server: ActorPath) -> Self {
        let minimal_period = Duration::from_millis(1);
        EventualLeaderElector {
            ctx: ComponentContext::uninitialised(),
            omega_port: ProvidedPort::uninitialised(),
            bootstrap_server,
            processes: Vec::new().into_boxed_slice(),
            candidates: HashSet::new(),
            period: minimal_period,
            delta: minimal_period,
            timer_handle: None,
            leader: None,
        }
    }


    fn select_leader(&mut self) -> Option<ActorPath> {
        let mut candidates: Vec<ActorPath> = self.candidates.drain().collect();
        candidates.sort_unstable();
        candidates.reverse(); // pick smallest instead of largest
        candidates.pop()
    }

    fn handle_timeout(&mut self, timeout_id: ScheduledTimer) -> Handled {
        match self.timer_handle.take() {
            Some(timeout) if timeout == timeout_id => {
                let new_leader = self.select_leader();
                if new_leader != self.leader {
                    self.period += self.delta;
                    self.leader = new_leader;
                    if let Some(ref leader) = self.leader {
                        self.omega_port.trigger(Trust(leader.clone()));
                    }
                    self.cancel_timer(timeout);
                    let new_timer =
                        self.schedule_periodic(self.period, self.period, Self::handle_timeout);
                    self.timer_handle = Some(new_timer);
                } else {
                    // just put it back
                    self.timer_handle = Some(timeout);
                }
                self.send_heartbeats();
                Handled::Ok
            }
            Some(_) => Handled::Ok, // just ignore outdated timeouts
            None => {
                warn!(self.log(), "Got unexpected timeout: {:?}", timeout_id);
                Handled::Ok
            } // can happen during restart or teardown
        }
    }

    fn send_heartbeats(&self) {
        self.processes.iter().for_each(|process| {
            process.tell((Heartbeat, Serde), self);
        });
    }
}

impl ComponentLifecycle for EventualLeaderElector {
    fn on_start(&mut self) -> Handled {
        self.bootstrap_server.tell((CheckIn, Serde), self);

        self.period = self.ctx.config()["omega"]["initial-period"]
            .as_duration()
            .expect("initial period");
        self.delta = self.ctx.config()["omega"]["delta"]
            .as_duration()
            .expect("delta");
        let timeout = self.schedule_periodic(self.period, self.period, Self::handle_timeout);
        self.timer_handle = Some(timeout);
        Handled::Ok
    }

    fn on_stop(&mut self) -> Handled {
        if let Some(timeout) = self.timer_handle.take() {
            self.cancel_timer(timeout);
        }
        Handled::Ok
    }

    fn on_kill(&mut self) -> Handled {
        self.on_stop()
    }
}

// Doesn't have any requests
ignore_requests!(EventualLeaderDetection, EventualLeaderElector);

impl Actor for EventualLeaderElector {
    type Message = Never;

    fn receive_local(&mut self, _msg: Self::Message) -> Handled {
        unreachable!();
    }

    fn receive_network(&mut self, msg: NetMessage) -> Handled {
        let sender = msg.sender;

        match_deser! {
            (msg.data) {
                msg(_heartbeat): Heartbeat [using Serde] => {
                    self.candidates.insert(sender);
                },
                msg(UpdateProcesses(processes)): UpdateProcesses [using Serde] => {
                    info!(
                        self.log(),
                        "Received new process set with {} processes",
                        processes.len()
                    );
                    self.processes = processes.into_boxed_slice();
                },
            }
        };
        Handled::Ok
    }
}

pub fn main() {
    let args: Vec<String> = std::env::args().collect();
    match args.len() {
        2 => {
            let bootstrap_port: u16 = args[1].parse().expect("port number");
            let bootstrap_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), bootstrap_port);
            let system = run_server(bootstrap_socket);
            system.await_termination(); // gotta quit it from command line
        }
        3 => {
            let bootstrap_port: u16 = args[1].parse().expect("port number");
            let bootstrap_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), bootstrap_port);
            let client_port: u16 = args[2].parse().expect("port number");
            let client_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), client_port);
            let system = run_client(bootstrap_socket, client_socket);
            system.await_termination(); // gotta quit it from command line
        }
        x => panic!("Expected either 1 argument (the port for the bootstrap server to bind on) or 2 arguments (boostrap server and client port), but got {} instead!", x-1),
    }
}

const BOOTSTRAP_PATH: &str = "bootstrap";

pub fn run_server(socket: SocketAddr) -> KompactSystem {
    let mut cfg = KompactConfig::default();
    cfg.load_config_file("./application.conf");
    cfg.system_components(DeadletterBox::new, NetworkConfig::new(socket).build());

    let system = cfg.build().expect("KompactSystem");

    let (bootstrap, bootstrap_registration) = system.create_and_register(BootstrapServer::new);
    let bootstrap_service_registration = system.register_by_alias(&bootstrap, BOOTSTRAP_PATH);

    let _bootstrap_unique = bootstrap_registration
        .wait_expect(Duration::from_millis(1000), "bootstrap never registered");
    let bootstrap_service = bootstrap_service_registration
        .wait_expect(Duration::from_millis(1000), "bootstrap never registered");
    system.start(&bootstrap);

    let printer = system.create(TrustPrinter::new);
    let (detector, registration) =
        system.create_and_register(|| EventualLeaderElector::new(bootstrap_service));
    biconnect_components::<EventualLeaderDetection, _, _>(&detector, &printer).expect("connection");
    let _path = registration.wait_expect(Duration::from_millis(1000), "detector never registered");
    system.start(&printer);
    system.start(&detector);

    system
}

pub fn run_client(bootstrap_socket: SocketAddr, client_socket: SocketAddr) -> KompactSystem {
    let mut cfg = KompactConfig::default();
    cfg.load_config_file("./application.conf");
    cfg.system_components(
        DeadletterBox::new,
        NetworkConfig::new(client_socket).build(),
    );

    let system = cfg.build().expect("KompactSystem");

    let bootstrap_service: ActorPath = NamedPath::with_socket(
        Transport::Tcp,
        bootstrap_socket,
        vec![BOOTSTRAP_PATH.into()],
    )
    .into();

    let printer = system.create(TrustPrinter::new);
    let (detector, registration) =
        system.create_and_register(|| EventualLeaderElector::new(bootstrap_service));
    biconnect_components::<EventualLeaderDetection, _, _>(&detector, &printer).expect("connection");
    let _path = registration.wait_expect(Duration::from_millis(1000), "detector never registered");
    system.start(&printer);
    system.start(&detector);

    system
}

#[cfg(test)]
mod tests {
    use super::*;

    const SERVER_SOCKET: &str = "127.0.0.1:12345";
    const CLIENT_SOCKET: &str = "127.0.0.1:0";

    #[test]
    fn test_bootstrapping() {
        let server_socket: SocketAddr = SERVER_SOCKET.parse().unwrap();
        let server_system = run_server(server_socket);
        let client_socket: SocketAddr = CLIENT_SOCKET.parse().unwrap();
        let mut clients_systems: Vec<KompactSystem> = (0..3)
            .map(|_i| run_client(server_socket, client_socket))
            .collect();
        // let them settle
        std::thread::sleep(Duration::from_millis(1000));
        // shut down systems one by one
        for sys in clients_systems.drain(..) {
            std::thread::sleep(Duration::from_millis(1000));
            sys.shutdown().expect("shutdown");
        }
        std::thread::sleep(Duration::from_millis(1000));
        server_system.shutdown().expect("shutdown");
    }
}

Client

The client setup works almost the same as in the previous section, except that we need to construct the required ActorPath instance for the bootstrap server given its SocketAddr now. We can do so using NamedPath::with_socket(...) which will construct a NamedPath instance that can easily be converted into an ActorPath. We pass this instance to the leader elector component during construction.

#![allow(clippy::unused_unit)]
use kompact::{prelude::*, serde_serialisers::*};
use kompact_examples::trusting::*;
use serde::{Deserialize, Serialize};
use std::{
    collections::HashSet,
    net::{IpAddr, Ipv4Addr, SocketAddr},
    time::Duration,
};

#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
struct CheckIn;
impl SerialisationId for CheckIn {
    const SER_ID: SerId = 2345;
}

#[derive(Serialize, Deserialize, Debug, Clone)]
struct UpdateProcesses(Vec<ActorPath>);
impl SerialisationId for UpdateProcesses {
    const SER_ID: SerId = 3456;
}

#[derive(ComponentDefinition)]
struct BootstrapServer {
    ctx: ComponentContext<Self>,
    processes: HashSet<ActorPath>,
}
impl BootstrapServer {
    fn new() -> Self {
        BootstrapServer {
            ctx: ComponentContext::uninitialised(),
            processes: HashSet::new(),
        }
    }


    fn broadcast_processess(&self) -> () {
        let procs: Vec<ActorPath> = self.processes.iter().cloned().collect();
        let msg = UpdateProcesses(procs);
        self.processes.iter().for_each(|process| {
            process.tell((msg.clone(), Serde), self);
        });
    }
}
ignore_lifecycle!(BootstrapServer);
impl NetworkActor for BootstrapServer {
    type Deserialiser = Serde;
    type Message = CheckIn;

    fn receive(&mut self, source: Option<ActorPath>, _msg: Self::Message) -> Handled {
        if let Some(process) = source {
            if self.processes.insert(process) {
                self.broadcast_processess();
            }
        }
        Handled::Ok
    }
}

#[derive(ComponentDefinition)]
struct EventualLeaderElector {
    ctx: ComponentContext<Self>,
    omega_port: ProvidedPort<EventualLeaderDetection>,
    bootstrap_server: ActorPath,
    processes: Box<[ActorPath]>,
    candidates: HashSet<ActorPath>,
    period: Duration,
    delta: Duration,
    timer_handle: Option<ScheduledTimer>,
    leader: Option<ActorPath>,
}
impl EventualLeaderElector {
    fn new(bootstrap_server: ActorPath) -> Self {
        let minimal_period = Duration::from_millis(1);
        EventualLeaderElector {
            ctx: ComponentContext::uninitialised(),
            omega_port: ProvidedPort::uninitialised(),
            bootstrap_server,
            processes: Vec::new().into_boxed_slice(),
            candidates: HashSet::new(),
            period: minimal_period,
            delta: minimal_period,
            timer_handle: None,
            leader: None,
        }
    }


    fn select_leader(&mut self) -> Option<ActorPath> {
        let mut candidates: Vec<ActorPath> = self.candidates.drain().collect();
        candidates.sort_unstable();
        candidates.reverse(); // pick smallest instead of largest
        candidates.pop()
    }

    fn handle_timeout(&mut self, timeout_id: ScheduledTimer) -> Handled {
        match self.timer_handle.take() {
            Some(timeout) if timeout == timeout_id => {
                let new_leader = self.select_leader();
                if new_leader != self.leader {
                    self.period += self.delta;
                    self.leader = new_leader;
                    if let Some(ref leader) = self.leader {
                        self.omega_port.trigger(Trust(leader.clone()));
                    }
                    self.cancel_timer(timeout);
                    let new_timer =
                        self.schedule_periodic(self.period, self.period, Self::handle_timeout);
                    self.timer_handle = Some(new_timer);
                } else {
                    // just put it back
                    self.timer_handle = Some(timeout);
                }
                self.send_heartbeats();
                Handled::Ok
            }
            Some(_) => Handled::Ok, // just ignore outdated timeouts
            None => {
                warn!(self.log(), "Got unexpected timeout: {:?}", timeout_id);
                Handled::Ok
            } // can happen during restart or teardown
        }
    }

    fn send_heartbeats(&self) {
        self.processes.iter().for_each(|process| {
            process.tell((Heartbeat, Serde), self);
        });
    }
}

impl ComponentLifecycle for EventualLeaderElector {
    fn on_start(&mut self) -> Handled {
        self.bootstrap_server.tell((CheckIn, Serde), self);

        self.period = self.ctx.config()["omega"]["initial-period"]
            .as_duration()
            .expect("initial period");
        self.delta = self.ctx.config()["omega"]["delta"]
            .as_duration()
            .expect("delta");
        let timeout = self.schedule_periodic(self.period, self.period, Self::handle_timeout);
        self.timer_handle = Some(timeout);
        Handled::Ok
    }

    fn on_stop(&mut self) -> Handled {
        if let Some(timeout) = self.timer_handle.take() {
            self.cancel_timer(timeout);
        }
        Handled::Ok
    }

    fn on_kill(&mut self) -> Handled {
        self.on_stop()
    }
}

// Doesn't have any requests
ignore_requests!(EventualLeaderDetection, EventualLeaderElector);

impl Actor for EventualLeaderElector {
    type Message = Never;

    fn receive_local(&mut self, _msg: Self::Message) -> Handled {
        unreachable!();
    }

    fn receive_network(&mut self, msg: NetMessage) -> Handled {
        let sender = msg.sender;

        match_deser! {
            (msg.data) {
                msg(_heartbeat): Heartbeat [using Serde] => {
                    self.candidates.insert(sender);
                },
                msg(UpdateProcesses(processes)): UpdateProcesses [using Serde] => {
                    info!(
                        self.log(),
                        "Received new process set with {} processes",
                        processes.len()
                    );
                    self.processes = processes.into_boxed_slice();
                },
            }
        };
        Handled::Ok
    }
}

pub fn main() {
    let args: Vec<String> = std::env::args().collect();
    match args.len() {
        2 => {
            let bootstrap_port: u16 = args[1].parse().expect("port number");
            let bootstrap_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), bootstrap_port);
            let system = run_server(bootstrap_socket);
            system.await_termination(); // gotta quit it from command line
        }
        3 => {
            let bootstrap_port: u16 = args[1].parse().expect("port number");
            let bootstrap_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), bootstrap_port);
            let client_port: u16 = args[2].parse().expect("port number");
            let client_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), client_port);
            let system = run_client(bootstrap_socket, client_socket);
            system.await_termination(); // gotta quit it from command line
        }
        x => panic!("Expected either 1 argument (the port for the bootstrap server to bind on) or 2 arguments (boostrap server and client port), but got {} instead!", x-1),
    }
}

const BOOTSTRAP_PATH: &str = "bootstrap";

pub fn run_server(socket: SocketAddr) -> KompactSystem {
    let mut cfg = KompactConfig::default();
    cfg.load_config_file("./application.conf");
    cfg.system_components(DeadletterBox::new, NetworkConfig::new(socket).build());

    let system = cfg.build().expect("KompactSystem");

    let (bootstrap, bootstrap_registration) = system.create_and_register(BootstrapServer::new);
    let bootstrap_service_registration = system.register_by_alias(&bootstrap, BOOTSTRAP_PATH);

    let _bootstrap_unique = bootstrap_registration
        .wait_expect(Duration::from_millis(1000), "bootstrap never registered");
    let bootstrap_service = bootstrap_service_registration
        .wait_expect(Duration::from_millis(1000), "bootstrap never registered");
    system.start(&bootstrap);

    let printer = system.create(TrustPrinter::new);
    let (detector, registration) =
        system.create_and_register(|| EventualLeaderElector::new(bootstrap_service));
    biconnect_components::<EventualLeaderDetection, _, _>(&detector, &printer).expect("connection");
    let _path = registration.wait_expect(Duration::from_millis(1000), "detector never registered");
    system.start(&printer);
    system.start(&detector);

    system
}

pub fn run_client(bootstrap_socket: SocketAddr, client_socket: SocketAddr) -> KompactSystem {
    let mut cfg = KompactConfig::default();
    cfg.load_config_file("./application.conf");
    cfg.system_components(
        DeadletterBox::new,
        NetworkConfig::new(client_socket).build(),
    );

    let system = cfg.build().expect("KompactSystem");

    let bootstrap_service: ActorPath = NamedPath::with_socket(
        Transport::Tcp,
        bootstrap_socket,
        vec![BOOTSTRAP_PATH.into()],
    )
    .into();

    let printer = system.create(TrustPrinter::new);
    let (detector, registration) =
        system.create_and_register(|| EventualLeaderElector::new(bootstrap_service));
    biconnect_components::<EventualLeaderDetection, _, _>(&detector, &printer).expect("connection");
    let _path = registration.wait_expect(Duration::from_millis(1000), "detector never registered");
    system.start(&printer);
    system.start(&detector);

    system
}

#[cfg(test)]
mod tests {
    use super::*;

    const SERVER_SOCKET: &str = "127.0.0.1:12345";
    const CLIENT_SOCKET: &str = "127.0.0.1:0";

    #[test]
    fn test_bootstrapping() {
        let server_socket: SocketAddr = SERVER_SOCKET.parse().unwrap();
        let server_system = run_server(server_socket);
        let client_socket: SocketAddr = CLIENT_SOCKET.parse().unwrap();
        let mut clients_systems: Vec<KompactSystem> = (0..3)
            .map(|_i| run_client(server_socket, client_socket))
            .collect();
        // let them settle
        std::thread::sleep(Duration::from_millis(1000));
        // shut down systems one by one
        for sys in clients_systems.drain(..) {
            std::thread::sleep(Duration::from_millis(1000));
            sys.shutdown().expect("shutdown");
        }
        std::thread::sleep(Duration::from_millis(1000));
        server_system.shutdown().expect("shutdown");
    }
}

Running with Commandline Arguments

All that is left to do is to convert the port numbers given on the command line to the required SocketAddr instances and calling the correct method. When we are given 1 argument (port number) we will start a bootstrap server, and if we are given 2 arguments (server port and client port) we will start a client instead.

#![allow(clippy::unused_unit)]
use kompact::{prelude::*, serde_serialisers::*};
use kompact_examples::trusting::*;
use serde::{Deserialize, Serialize};
use std::{
    collections::HashSet,
    net::{IpAddr, Ipv4Addr, SocketAddr},
    time::Duration,
};

#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
struct CheckIn;
impl SerialisationId for CheckIn {
    const SER_ID: SerId = 2345;
}

#[derive(Serialize, Deserialize, Debug, Clone)]
struct UpdateProcesses(Vec<ActorPath>);
impl SerialisationId for UpdateProcesses {
    const SER_ID: SerId = 3456;
}

#[derive(ComponentDefinition)]
struct BootstrapServer {
    ctx: ComponentContext<Self>,
    processes: HashSet<ActorPath>,
}
impl BootstrapServer {
    fn new() -> Self {
        BootstrapServer {
            ctx: ComponentContext::uninitialised(),
            processes: HashSet::new(),
        }
    }


    fn broadcast_processess(&self) -> () {
        let procs: Vec<ActorPath> = self.processes.iter().cloned().collect();
        let msg = UpdateProcesses(procs);
        self.processes.iter().for_each(|process| {
            process.tell((msg.clone(), Serde), self);
        });
    }
}
ignore_lifecycle!(BootstrapServer);
impl NetworkActor for BootstrapServer {
    type Deserialiser = Serde;
    type Message = CheckIn;

    fn receive(&mut self, source: Option<ActorPath>, _msg: Self::Message) -> Handled {
        if let Some(process) = source {
            if self.processes.insert(process) {
                self.broadcast_processess();
            }
        }
        Handled::Ok
    }
}

#[derive(ComponentDefinition)]
struct EventualLeaderElector {
    ctx: ComponentContext<Self>,
    omega_port: ProvidedPort<EventualLeaderDetection>,
    bootstrap_server: ActorPath,
    processes: Box<[ActorPath]>,
    candidates: HashSet<ActorPath>,
    period: Duration,
    delta: Duration,
    timer_handle: Option<ScheduledTimer>,
    leader: Option<ActorPath>,
}
impl EventualLeaderElector {
    fn new(bootstrap_server: ActorPath) -> Self {
        let minimal_period = Duration::from_millis(1);
        EventualLeaderElector {
            ctx: ComponentContext::uninitialised(),
            omega_port: ProvidedPort::uninitialised(),
            bootstrap_server,
            processes: Vec::new().into_boxed_slice(),
            candidates: HashSet::new(),
            period: minimal_period,
            delta: minimal_period,
            timer_handle: None,
            leader: None,
        }
    }


    fn select_leader(&mut self) -> Option<ActorPath> {
        let mut candidates: Vec<ActorPath> = self.candidates.drain().collect();
        candidates.sort_unstable();
        candidates.reverse(); // pick smallest instead of largest
        candidates.pop()
    }

    fn handle_timeout(&mut self, timeout_id: ScheduledTimer) -> Handled {
        match self.timer_handle.take() {
            Some(timeout) if timeout == timeout_id => {
                let new_leader = self.select_leader();
                if new_leader != self.leader {
                    self.period += self.delta;
                    self.leader = new_leader;
                    if let Some(ref leader) = self.leader {
                        self.omega_port.trigger(Trust(leader.clone()));
                    }
                    self.cancel_timer(timeout);
                    let new_timer =
                        self.schedule_periodic(self.period, self.period, Self::handle_timeout);
                    self.timer_handle = Some(new_timer);
                } else {
                    // just put it back
                    self.timer_handle = Some(timeout);
                }
                self.send_heartbeats();
                Handled::Ok
            }
            Some(_) => Handled::Ok, // just ignore outdated timeouts
            None => {
                warn!(self.log(), "Got unexpected timeout: {:?}", timeout_id);
                Handled::Ok
            } // can happen during restart or teardown
        }
    }

    fn send_heartbeats(&self) {
        self.processes.iter().for_each(|process| {
            process.tell((Heartbeat, Serde), self);
        });
    }
}

impl ComponentLifecycle for EventualLeaderElector {
    fn on_start(&mut self) -> Handled {
        self.bootstrap_server.tell((CheckIn, Serde), self);

        self.period = self.ctx.config()["omega"]["initial-period"]
            .as_duration()
            .expect("initial period");
        self.delta = self.ctx.config()["omega"]["delta"]
            .as_duration()
            .expect("delta");
        let timeout = self.schedule_periodic(self.period, self.period, Self::handle_timeout);
        self.timer_handle = Some(timeout);
        Handled::Ok
    }

    fn on_stop(&mut self) -> Handled {
        if let Some(timeout) = self.timer_handle.take() {
            self.cancel_timer(timeout);
        }
        Handled::Ok
    }

    fn on_kill(&mut self) -> Handled {
        self.on_stop()
    }
}

// Doesn't have any requests
ignore_requests!(EventualLeaderDetection, EventualLeaderElector);

impl Actor for EventualLeaderElector {
    type Message = Never;

    fn receive_local(&mut self, _msg: Self::Message) -> Handled {
        unreachable!();
    }

    fn receive_network(&mut self, msg: NetMessage) -> Handled {
        let sender = msg.sender;

        match_deser! {
            (msg.data) {
                msg(_heartbeat): Heartbeat [using Serde] => {
                    self.candidates.insert(sender);
                },
                msg(UpdateProcesses(processes)): UpdateProcesses [using Serde] => {
                    info!(
                        self.log(),
                        "Received new process set with {} processes",
                        processes.len()
                    );
                    self.processes = processes.into_boxed_slice();
                },
            }
        };
        Handled::Ok
    }
}

pub fn main() {
    let args: Vec<String> = std::env::args().collect();
    match args.len() {
        2 => {
            let bootstrap_port: u16 = args[1].parse().expect("port number");
            let bootstrap_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), bootstrap_port);
            let system = run_server(bootstrap_socket);
            system.await_termination(); // gotta quit it from command line
        }
        3 => {
            let bootstrap_port: u16 = args[1].parse().expect("port number");
            let bootstrap_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), bootstrap_port);
            let client_port: u16 = args[2].parse().expect("port number");
            let client_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), client_port);
            let system = run_client(bootstrap_socket, client_socket);
            system.await_termination(); // gotta quit it from command line
        }
        x => panic!("Expected either 1 argument (the port for the bootstrap server to bind on) or 2 arguments (boostrap server and client port), but got {} instead!", x-1),
    }
}

const BOOTSTRAP_PATH: &str = "bootstrap";

pub fn run_server(socket: SocketAddr) -> KompactSystem {
    let mut cfg = KompactConfig::default();
    cfg.load_config_file("./application.conf");
    cfg.system_components(DeadletterBox::new, NetworkConfig::new(socket).build());

    let system = cfg.build().expect("KompactSystem");

    let (bootstrap, bootstrap_registration) = system.create_and_register(BootstrapServer::new);
    let bootstrap_service_registration = system.register_by_alias(&bootstrap, BOOTSTRAP_PATH);

    let _bootstrap_unique = bootstrap_registration
        .wait_expect(Duration::from_millis(1000), "bootstrap never registered");
    let bootstrap_service = bootstrap_service_registration
        .wait_expect(Duration::from_millis(1000), "bootstrap never registered");
    system.start(&bootstrap);

    let printer = system.create(TrustPrinter::new);
    let (detector, registration) =
        system.create_and_register(|| EventualLeaderElector::new(bootstrap_service));
    biconnect_components::<EventualLeaderDetection, _, _>(&detector, &printer).expect("connection");
    let _path = registration.wait_expect(Duration::from_millis(1000), "detector never registered");
    system.start(&printer);
    system.start(&detector);

    system
}

pub fn run_client(bootstrap_socket: SocketAddr, client_socket: SocketAddr) -> KompactSystem {
    let mut cfg = KompactConfig::default();
    cfg.load_config_file("./application.conf");
    cfg.system_components(
        DeadletterBox::new,
        NetworkConfig::new(client_socket).build(),
    );

    let system = cfg.build().expect("KompactSystem");

    let bootstrap_service: ActorPath = NamedPath::with_socket(
        Transport::Tcp,
        bootstrap_socket,
        vec![BOOTSTRAP_PATH.into()],
    )
    .into();

    let printer = system.create(TrustPrinter::new);
    let (detector, registration) =
        system.create_and_register(|| EventualLeaderElector::new(bootstrap_service));
    biconnect_components::<EventualLeaderDetection, _, _>(&detector, &printer).expect("connection");
    let _path = registration.wait_expect(Duration::from_millis(1000), "detector never registered");
    system.start(&printer);
    system.start(&detector);

    system
}

#[cfg(test)]
mod tests {
    use super::*;

    const SERVER_SOCKET: &str = "127.0.0.1:12345";
    const CLIENT_SOCKET: &str = "127.0.0.1:0";

    #[test]
    fn test_bootstrapping() {
        let server_socket: SocketAddr = SERVER_SOCKET.parse().unwrap();
        let server_system = run_server(server_socket);
        let client_socket: SocketAddr = CLIENT_SOCKET.parse().unwrap();
        let mut clients_systems: Vec<KompactSystem> = (0..3)
            .map(|_i| run_client(server_socket, client_socket))
            .collect();
        // let them settle
        std::thread::sleep(Duration::from_millis(1000));
        // shut down systems one by one
        for sys in clients_systems.drain(..) {
            std::thread::sleep(Duration::from_millis(1000));
            sys.shutdown().expect("shutdown");
        }
        std::thread::sleep(Duration::from_millis(1000));
        server_system.shutdown().expect("shutdown");
    }
}

Now we can run this by first starting a server in one shell and then a few clients in a few other shells. We can also see changes in trust events as we kill and add processes.

Note: As before, if you have checked out the examples folder you can build a binary with:

cargo build --release

You can run the bootstrap server on port 12345 with:

../../target/release/bootstrapping 12345

Similarly, you can run a matching client on some free port with:

../../target/release/bootstrapping 12345 0