Serialisation

In this section we are going take a closer look at the various serialisation options offered by Kompact. In particular, we will look at how to write different kinds of custom serialiser implementations, as well how to handle messages that are all but guaranteed to actually go over the network more efficiently.

Custom Serialisation

At the centre of Kompact’s serialisation mechanisms are the Serialisable and Deserialiser traits, the signature of which looks roughtly like this:

pub trait Serialisable: Send + Debug {
    /// The serialisation id for this serialisable
    fn ser_id(&self) -> SerId;

    /// An indicator how many bytes must be reserved in a buffer for a value to be
    /// serialsed into it with this serialiser
    fn size_hint(&self) -> Option<usize>;

    /// Serialises this object (`self`) into `buf`
    fn serialise(&self, buf: &mut dyn BufMut) -> Result<(), SerError>;

    /// Try move this object onto the heap for reflection, instead of serialising
    fn local(self: Box<Self>) -> Result<Box<dyn Any + Send>, Box<dyn Serialisable>>;
}
pub trait Deserialiser<T>: Send {
    /// The serialisation id for which this deserialiser is to be invoked
    const SER_ID: SerId;

    /// Try to deserialise a `T` from the given `buf`
    fn deserialise(buf: &mut dyn Buf) -> Result<T, SerError>;
}

Outgoing Path

When ActorPath::tell(...) is invoked with a type that is Serialisable, it will create a boxed trait object from the given instance and send it to the network layer. Only when the network layer has the determind that the destination must be accessed via a network channel, will the runtime serialise the instance into the network channel’s buffer. If it turns out the destination is on the same actor system as the source, it will simply call Serialisable::local(...) to get a boxed instance of the Any trait and then send it directly to the target component, without ever serialising. This approach is called lazy serialisation. For the vast majority of Serialisable implementations, Serialisable::local(...) is implemented simply as Ok(self). However, for some more advanced usages (e.g., serialisation proxies) the implementation may have to call some additional code.

Once it is determined that an instance does indeed need to be serialised, the runtime will reserve some buffer memory for it to be serialised into. It does so by querying the Serialisable::size_hint(...) function for an estimate of how much space the type is likely going to take. For some types this is easy to know statically, but others it is not so clear. In any case, this is just an optimisation. Serialisation will proceed correctly even if the estimate is terribly wrong or no estimate is given at all.

The first thing in the new serialisation buffer is typically the serialisation id obtained via Serialisable::ser_id(...). Typically, Kompact will only require a single serialisation id for the message to be written into the buffer, even if the message uses other serialisers internally, as long as all the internal types are statically known. This top-level serialisation id must match the Deserialiser::SER_ID for the deserialiser to be used for this instance. For types that implement both Serialisable and Deserialiser, as most do, it is recommended to simply use is Self::SER_ID as the implementation for Serialisable::ser_id(...) to make sure the ids match later.

The actual serialisation of the instance is handled by Serialisable::serialise(...), which should use the functions provided by BufMut to serialise the individual parts of the instance into the buffer.

Serialiser

Instead of implementing Serialisable we can also implement the Serialiser trait:

pub trait Serialiser<T>: Send {
    /// The serialisation id for this serialiser
    fn ser_id(&self) -> SerId;

    /// An indicator how many bytes must be reserved in a buffer for a value to be
    fn size_hint(&self) -> Option<usize>;

    /// Serialise `v` into `buf`.
    fn serialise(&self, v: &T, buf: &mut dyn BufMut) -> Result<(), SerError>;
}

This behaves essentually the same, except that it doesn’t serialise itself, but an instance of another type T. In order to use an instance t: T with a Serialiser<T> we can simply pass a pair of the two to the ActorPath::tell(...) function, as we have already seen in the previous section, for example with Serde:

#![allow(clippy::unused_unit)]
use kompact::{prelude::*, serde_serialisers::*};
use kompact_examples::trusting::*;
use std::{
    collections::HashSet,
    convert::TryInto,
    net::{IpAddr, Ipv4Addr, SocketAddr},
    time::Duration,
};

struct ZstSerialiser<T>(T)
where
    T: Send + Sync + Default + Copy + SerialisationId;

impl<T> Serialiser<T> for &ZstSerialiser<T>
where
    T: Send + Sync + Default + Copy + SerialisationId,
{
    fn ser_id(&self) -> SerId {
        T::SER_ID
    }

    fn size_hint(&self) -> Option<usize> {
        Some(0)
    }

    fn serialise(&self, _v: &T, _buf: &mut dyn BufMut) -> Result<(), SerError> {
        Ok(())
    }
}

impl<T> Deserialiser<T> for ZstSerialiser<T>
where
    T: Send + Sync + Default + Copy + SerialisationId,
{
    const SER_ID: SerId = T::SER_ID;

    fn deserialise(_buf: &mut dyn Buf) -> Result<T, SerError> {
        Ok(T::default())
    }
}

#[derive(Debug, Clone, Copy, Default)]
struct CheckIn;

impl SerialisationId for CheckIn {
    const SER_ID: SerId = 2345;
}

static CHECK_IN_SER: ZstSerialiser<CheckIn> = ZstSerialiser(CheckIn);

#[derive(Debug, Clone)]
struct UpdateProcesses(Vec<ActorPath>);

impl Serialisable for UpdateProcesses {
    fn ser_id(&self) -> SerId {
        Self::SER_ID
    }

    fn size_hint(&self) -> Option<usize> {
        let procs_size = self.0.len() * 23; // 23 bytes is the size of a unique actor path
        Some(8 + procs_size)
    }

    fn serialise(&self, buf: &mut dyn BufMut) -> Result<(), SerError> {
        let len = self.0.len() as u64;
        buf.put_u64(len);
        for path in self.0.iter() {
            path.serialise(buf)?;
        }
        Ok(())
    }

    fn local(self: Box<Self>) -> Result<Box<dyn Any + Send>, Box<dyn Serialisable>> {
        Ok(self)
    }
}

impl Deserialiser<UpdateProcesses> for UpdateProcesses {
    const SER_ID: SerId = 3456;

    fn deserialise(buf: &mut dyn Buf) -> Result<UpdateProcesses, SerError> {
        let len_u64 = buf.get_u64();
        let len: usize = len_u64.try_into().map_err(SerError::from_debug)?;
        let mut data: Vec<ActorPath> = Vec::with_capacity(len);
        for _i in 0..len {
            let path = ActorPath::deserialise(buf)?;
            data.push(path);
        }
        Ok(UpdateProcesses(data))
    }
}

#[derive(ComponentDefinition)]
struct BootstrapServer {
    ctx: ComponentContext<Self>,
    processes: HashSet<ActorPath>,
}

impl BootstrapServer {
    fn new() -> Self {
        BootstrapServer {
            ctx: ComponentContext::uninitialised(),
            processes: HashSet::new(),
        }
    }

    fn broadcast_processess(&self) -> Handled {
        let procs: Vec<ActorPath> = self.processes.iter().cloned().collect();
        let msg = UpdateProcesses(procs);

        self.processes.iter().for_each(|process| {
            process
                .tell_serialised(msg.clone(), self)
                .unwrap_or_else(|e| warn!(self.log(), "Error during serialisation: {}", e));
        });
        Handled::Ok
    }
}

ignore_lifecycle!(BootstrapServer);

impl NetworkActor for BootstrapServer {
    type Deserialiser = ZstSerialiser<CheckIn>;
    type Message = CheckIn;

    fn receive(&mut self, source: Option<ActorPath>, _msg: Self::Message) -> Handled {
        if let Some(process) = source {
            if self.processes.insert(process) {
                self.broadcast_processess()
            } else {
                Handled::Ok
            }
        } else {
            Handled::Ok
        }
    }
}

#[derive(ComponentDefinition)]
struct EventualLeaderElector {
    ctx: ComponentContext<Self>,
    omega_port: ProvidedPort<EventualLeaderDetection>,
    bootstrap_server: ActorPath,
    processes: Box<[ActorPath]>,
    candidates: HashSet<ActorPath>,
    period: Duration,
    delta: Duration,
    timer_handle: Option<ScheduledTimer>,
    leader: Option<ActorPath>,
}

impl EventualLeaderElector {
    fn new(bootstrap_server: ActorPath) -> Self {
        let minimal_period = Duration::from_millis(1);
        EventualLeaderElector {
            ctx: ComponentContext::uninitialised(),
            omega_port: ProvidedPort::uninitialised(),
            bootstrap_server,
            processes: Vec::new().into_boxed_slice(),
            candidates: HashSet::new(),
            period: minimal_period,
            delta: minimal_period,
            timer_handle: None,
            leader: None,
        }
    }

    fn select_leader(&mut self) -> Option<ActorPath> {
        let mut candidates: Vec<ActorPath> = self.candidates.drain().collect();
        candidates.sort_unstable();
        candidates.reverse(); // pick smallest instead of largest
        candidates.pop()
    }

    fn handle_timeout(&mut self, timeout_id: ScheduledTimer) -> Handled {
        match self.timer_handle.take() {
            Some(timeout) if timeout == timeout_id => {
                let new_leader = self.select_leader();
                if new_leader != self.leader {
                    self.period += self.delta;
                    self.leader = new_leader;
                    if let Some(ref leader) = self.leader {
                        self.omega_port.trigger(Trust(leader.clone()));
                    }
                    self.cancel_timer(timeout);
                    let new_timer = self.schedule_periodic(
                        self.period,
                        self.period,
                        EventualLeaderElector::handle_timeout,
                    );
                    self.timer_handle = Some(new_timer);
                } else {
                    // just put it back
                    self.timer_handle = Some(timeout);
                }
                self.send_heartbeats()
            }
            Some(_) => Handled::Ok, // just ignore outdated timeouts
            None => {
                warn!(self.log(), "Got unexpected timeout: {:?}", timeout_id);
                Handled::Ok
            } // can happen during restart or teardown
        }
    }

    fn send_heartbeats(&self) -> Handled {
        self.processes.iter().for_each(|process| {
            process.tell((Heartbeat, Serde), self);
        });
        Handled::Ok
    }
}

impl ComponentLifecycle for EventualLeaderElector {
    fn on_start(&mut self) -> Handled {
        self.bootstrap_server.tell((CheckIn, &CHECK_IN_SER), self);

        self.period = self.ctx.config()["omega"]["initial-period"]
            .as_duration()
            .expect("initial period");
        self.delta = self.ctx.config()["omega"]["delta"]
            .as_duration()
            .expect("delta");
        let timeout = self.schedule_periodic(
            self.period,
            self.period,
            EventualLeaderElector::handle_timeout,
        );
        self.timer_handle = Some(timeout);
        Handled::Ok
    }

    fn on_stop(&mut self) -> Handled {
        if let Some(timeout) = self.timer_handle.take() {
            self.cancel_timer(timeout);
        }
        Handled::Ok
    }

    fn on_kill(&mut self) -> Handled {
        self.on_stop()
    }
}
// Doesn't have any requests
ignore_requests!(EventualLeaderDetection, EventualLeaderElector);

impl Actor for EventualLeaderElector {
    type Message = Never;

    fn receive_local(&mut self, _msg: Self::Message) -> Handled {
        unreachable!();
    }

    fn receive_network(&mut self, msg: NetMessage) -> Handled {
        let sender = msg.sender;

        match_deser! {
            (msg.data) {
                msg(_heartbeat): Heartbeat [using Serde] => {
                    self.candidates.insert(sender);
                },
                msg(update): UpdateProcesses => {
                    let UpdateProcesses(processes) = update;
                    info!(
                        self.log(),
                        "Received new process set with {} processes",
                        processes.len()
                    );
                    self.processes = processes.into_boxed_slice();
                },
            }
        }
        Handled::Ok
    }
}

pub fn main() {
    let args: Vec<String> = std::env::args().collect();
    match args.len() {
        2 => {
            let bootstrap_port: u16 = args[1].parse().expect("port number");
            let bootstrap_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), bootstrap_port);
            let system = run_server(bootstrap_socket);
            system.await_termination(); // gotta quit it from command line
        }
        3 => {
            let bootstrap_port: u16 = args[1].parse().expect("port number");
            let bootstrap_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), bootstrap_port);
            let client_port: u16 = args[2].parse().expect("port number");
            let client_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), client_port);
            let system = run_client(bootstrap_socket, client_socket);
            system.await_termination(); // gotta quit it from command line
        }
        x => panic!("Expected either 1 argument (the port for the bootstrap server to bind on) or 2 arguments (boostrap server and client port), but got {} instead!", x - 1),
    }
}

const BOOTSTRAP_PATH: &str = "bootstrap";

pub fn run_server(socket: SocketAddr) -> KompactSystem {
    let mut cfg = KompactConfig::default();
    cfg.load_config_file("./application.conf");
    cfg.system_components(DeadletterBox::new, NetworkConfig::new(socket).build());

    let system = cfg.build().expect("KompactSystem");

    let (bootstrap, bootstrap_registration) = system.create_and_register(BootstrapServer::new);
    let bootstrap_service_registration = system.register_by_alias(&bootstrap, BOOTSTRAP_PATH);

    let _bootstrap_unique = bootstrap_registration
        .wait_expect(Duration::from_millis(1000), "bootstrap never registered");
    let bootstrap_service = bootstrap_service_registration
        .wait_expect(Duration::from_millis(1000), "bootstrap never registered");
    system.start(&bootstrap);

    let printer = system.create(TrustPrinter::new);
    let (detector, registration) =
        system.create_and_register(|| EventualLeaderElector::new(bootstrap_service));
    biconnect_components::<EventualLeaderDetection, _, _>(&detector, &printer).expect("connection");
    let _path = registration.wait_expect(Duration::from_millis(1000), "detector never registered");
    system.start(&printer);
    system.start(&detector);

    system
}

pub fn run_client(bootstrap_socket: SocketAddr, client_socket: SocketAddr) -> KompactSystem {
    let mut cfg = KompactConfig::default();
    cfg.load_config_file("./application.conf");
    cfg.system_components(
        DeadletterBox::new,
        NetworkConfig::new(client_socket).build(),
    );

    let system = cfg.build().expect("KompactSystem");

    let bootstrap_service: ActorPath = NamedPath::with_socket(
        Transport::Tcp,
        bootstrap_socket,
        vec![BOOTSTRAP_PATH.into()],
    )
    .into();

    let printer = system.create(TrustPrinter::new);
    let (detector, registration) =
        system.create_and_register(|| EventualLeaderElector::new(bootstrap_service));
    biconnect_components::<EventualLeaderDetection, _, _>(&detector, &printer).expect("connection");
    let _path = registration.wait_expect(Duration::from_millis(1000), "detector never registered");
    system.start(&printer);
    system.start(&detector);

    system
}

#[cfg(test)]
mod tests {
    use super::*;

    const SERVER_SOCKET: &str = "127.0.0.1:12345";
    const CLIENT_SOCKET: &str = "127.0.0.1:0";

    #[test]
    fn test_bootstrapping_serialisation() {
        let server_socket: SocketAddr = SERVER_SOCKET.parse().unwrap();
        let server_system = run_server(server_socket);
        let client_socket: SocketAddr = CLIENT_SOCKET.parse().unwrap();
        let mut clients_systems: Vec<KompactSystem> = (0..3)
            .map(|_i| run_client(server_socket, client_socket))
            .collect();
        // let them settle
        std::thread::sleep(Duration::from_millis(1000));
        // shut down systems one by one
        for sys in clients_systems.drain(..) {
            std::thread::sleep(Duration::from_millis(1000));
            sys.shutdown().expect("shutdown");
        }
        std::thread::sleep(Duration::from_millis(1000));
        server_system.shutdown().expect("shutdown");
    }
}

Incoming Path

For any incoming network message the Kompact framework will buffer all data, and once it is complete, it will read out the serialisation id and create a NetMessage from it and the remaining buffer. It will then send the NetMessage directly to the destination component without any further processing. This approach is called lazy deserialisation and is quite different from most other actor/component frameworks, which tend to deserialise eagerly and then type match later at the destination component. However, in Rust the lazy approach is more efficient as it avoids unnecessary heap allocations for the deserialised instance.

When the NetMessage::try_deserialise function is called on the destination component, the serialisation ids of the message and the given Deserialiser will be checked and if they match up the Deserialiser::deserialise(...) function is called with the message’s data. For custom deserialisers, this method must use the Buf API to implement essentially the inverse path of what the serialisable did before.

Example

To show how custom serialisers can be implemented, we will show two examples re-using the bootstrapping leader election from the previous sections.

Serialiser

In our example, CheckIn is a zero-sized type (ZST), since we don’t really care about the message, only about the sender. Since ZSTs have no content, we can uniquely identify them by their serialisation id alone and all the serialisers for them are basically identical, in that their serialise(...) function consists simply of Ok(()). For this example, instead of using Serde for CheckIn, we will write our own Serialiser implementation for ZSTs and then use it for CheckIn. We could also use it for Heartbeat, but we won’t, so as to leave it as a reference for the other approach.

#![allow(clippy::unused_unit)]
use kompact::{prelude::*, serde_serialisers::*};
use kompact_examples::trusting::*;
use std::{
    collections::HashSet,
    convert::TryInto,
    net::{IpAddr, Ipv4Addr, SocketAddr},
    time::Duration,
};

struct ZstSerialiser<T>(T)
where
    T: Send + Sync + Default + Copy + SerialisationId;

impl<T> Serialiser<T> for &ZstSerialiser<T>
where
    T: Send + Sync + Default + Copy + SerialisationId,
{
    fn ser_id(&self) -> SerId {
        T::SER_ID
    }

    fn size_hint(&self) -> Option<usize> {
        Some(0)
    }

    fn serialise(&self, _v: &T, _buf: &mut dyn BufMut) -> Result<(), SerError> {
        Ok(())
    }
}

impl<T> Deserialiser<T> for ZstSerialiser<T>
where
    T: Send + Sync + Default + Copy + SerialisationId,
{
    const SER_ID: SerId = T::SER_ID;

    fn deserialise(_buf: &mut dyn Buf) -> Result<T, SerError> {
        Ok(T::default())
    }
}

#[derive(Debug, Clone, Copy, Default)]
struct CheckIn;

impl SerialisationId for CheckIn {
    const SER_ID: SerId = 2345;
}

static CHECK_IN_SER: ZstSerialiser<CheckIn> = ZstSerialiser(CheckIn);

#[derive(Debug, Clone)]
struct UpdateProcesses(Vec<ActorPath>);

impl Serialisable for UpdateProcesses {
    fn ser_id(&self) -> SerId {
        Self::SER_ID
    }

    fn size_hint(&self) -> Option<usize> {
        let procs_size = self.0.len() * 23; // 23 bytes is the size of a unique actor path
        Some(8 + procs_size)
    }

    fn serialise(&self, buf: &mut dyn BufMut) -> Result<(), SerError> {
        let len = self.0.len() as u64;
        buf.put_u64(len);
        for path in self.0.iter() {
            path.serialise(buf)?;
        }
        Ok(())
    }

    fn local(self: Box<Self>) -> Result<Box<dyn Any + Send>, Box<dyn Serialisable>> {
        Ok(self)
    }
}

impl Deserialiser<UpdateProcesses> for UpdateProcesses {
    const SER_ID: SerId = 3456;

    fn deserialise(buf: &mut dyn Buf) -> Result<UpdateProcesses, SerError> {
        let len_u64 = buf.get_u64();
        let len: usize = len_u64.try_into().map_err(SerError::from_debug)?;
        let mut data: Vec<ActorPath> = Vec::with_capacity(len);
        for _i in 0..len {
            let path = ActorPath::deserialise(buf)?;
            data.push(path);
        }
        Ok(UpdateProcesses(data))
    }
}

#[derive(ComponentDefinition)]
struct BootstrapServer {
    ctx: ComponentContext<Self>,
    processes: HashSet<ActorPath>,
}

impl BootstrapServer {
    fn new() -> Self {
        BootstrapServer {
            ctx: ComponentContext::uninitialised(),
            processes: HashSet::new(),
        }
    }

    fn broadcast_processess(&self) -> Handled {
        let procs: Vec<ActorPath> = self.processes.iter().cloned().collect();
        let msg = UpdateProcesses(procs);

        self.processes.iter().for_each(|process| {
            process
                .tell_serialised(msg.clone(), self)
                .unwrap_or_else(|e| warn!(self.log(), "Error during serialisation: {}", e));
        });
        Handled::Ok
    }
}

ignore_lifecycle!(BootstrapServer);

impl NetworkActor for BootstrapServer {
    type Deserialiser = ZstSerialiser<CheckIn>;
    type Message = CheckIn;

    fn receive(&mut self, source: Option<ActorPath>, _msg: Self::Message) -> Handled {
        if let Some(process) = source {
            if self.processes.insert(process) {
                self.broadcast_processess()
            } else {
                Handled::Ok
            }
        } else {
            Handled::Ok
        }
    }
}

#[derive(ComponentDefinition)]
struct EventualLeaderElector {
    ctx: ComponentContext<Self>,
    omega_port: ProvidedPort<EventualLeaderDetection>,
    bootstrap_server: ActorPath,
    processes: Box<[ActorPath]>,
    candidates: HashSet<ActorPath>,
    period: Duration,
    delta: Duration,
    timer_handle: Option<ScheduledTimer>,
    leader: Option<ActorPath>,
}

impl EventualLeaderElector {
    fn new(bootstrap_server: ActorPath) -> Self {
        let minimal_period = Duration::from_millis(1);
        EventualLeaderElector {
            ctx: ComponentContext::uninitialised(),
            omega_port: ProvidedPort::uninitialised(),
            bootstrap_server,
            processes: Vec::new().into_boxed_slice(),
            candidates: HashSet::new(),
            period: minimal_period,
            delta: minimal_period,
            timer_handle: None,
            leader: None,
        }
    }

    fn select_leader(&mut self) -> Option<ActorPath> {
        let mut candidates: Vec<ActorPath> = self.candidates.drain().collect();
        candidates.sort_unstable();
        candidates.reverse(); // pick smallest instead of largest
        candidates.pop()
    }

    fn handle_timeout(&mut self, timeout_id: ScheduledTimer) -> Handled {
        match self.timer_handle.take() {
            Some(timeout) if timeout == timeout_id => {
                let new_leader = self.select_leader();
                if new_leader != self.leader {
                    self.period += self.delta;
                    self.leader = new_leader;
                    if let Some(ref leader) = self.leader {
                        self.omega_port.trigger(Trust(leader.clone()));
                    }
                    self.cancel_timer(timeout);
                    let new_timer = self.schedule_periodic(
                        self.period,
                        self.period,
                        EventualLeaderElector::handle_timeout,
                    );
                    self.timer_handle = Some(new_timer);
                } else {
                    // just put it back
                    self.timer_handle = Some(timeout);
                }
                self.send_heartbeats()
            }
            Some(_) => Handled::Ok, // just ignore outdated timeouts
            None => {
                warn!(self.log(), "Got unexpected timeout: {:?}", timeout_id);
                Handled::Ok
            } // can happen during restart or teardown
        }
    }

    fn send_heartbeats(&self) -> Handled {
        self.processes.iter().for_each(|process| {
            process.tell((Heartbeat, Serde), self);
        });
        Handled::Ok
    }
}

impl ComponentLifecycle for EventualLeaderElector {
    fn on_start(&mut self) -> Handled {
        self.bootstrap_server.tell((CheckIn, &CHECK_IN_SER), self);

        self.period = self.ctx.config()["omega"]["initial-period"]
            .as_duration()
            .expect("initial period");
        self.delta = self.ctx.config()["omega"]["delta"]
            .as_duration()
            .expect("delta");
        let timeout = self.schedule_periodic(
            self.period,
            self.period,
            EventualLeaderElector::handle_timeout,
        );
        self.timer_handle = Some(timeout);
        Handled::Ok
    }

    fn on_stop(&mut self) -> Handled {
        if let Some(timeout) = self.timer_handle.take() {
            self.cancel_timer(timeout);
        }
        Handled::Ok
    }

    fn on_kill(&mut self) -> Handled {
        self.on_stop()
    }
}
// Doesn't have any requests
ignore_requests!(EventualLeaderDetection, EventualLeaderElector);

impl Actor for EventualLeaderElector {
    type Message = Never;

    fn receive_local(&mut self, _msg: Self::Message) -> Handled {
        unreachable!();
    }

    fn receive_network(&mut self, msg: NetMessage) -> Handled {
        let sender = msg.sender;

        match_deser! {
            (msg.data) {
                msg(_heartbeat): Heartbeat [using Serde] => {
                    self.candidates.insert(sender);
                },
                msg(update): UpdateProcesses => {
                    let UpdateProcesses(processes) = update;
                    info!(
                        self.log(),
                        "Received new process set with {} processes",
                        processes.len()
                    );
                    self.processes = processes.into_boxed_slice();
                },
            }
        }
        Handled::Ok
    }
}

pub fn main() {
    let args: Vec<String> = std::env::args().collect();
    match args.len() {
        2 => {
            let bootstrap_port: u16 = args[1].parse().expect("port number");
            let bootstrap_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), bootstrap_port);
            let system = run_server(bootstrap_socket);
            system.await_termination(); // gotta quit it from command line
        }
        3 => {
            let bootstrap_port: u16 = args[1].parse().expect("port number");
            let bootstrap_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), bootstrap_port);
            let client_port: u16 = args[2].parse().expect("port number");
            let client_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), client_port);
            let system = run_client(bootstrap_socket, client_socket);
            system.await_termination(); // gotta quit it from command line
        }
        x => panic!("Expected either 1 argument (the port for the bootstrap server to bind on) or 2 arguments (boostrap server and client port), but got {} instead!", x - 1),
    }
}

const BOOTSTRAP_PATH: &str = "bootstrap";

pub fn run_server(socket: SocketAddr) -> KompactSystem {
    let mut cfg = KompactConfig::default();
    cfg.load_config_file("./application.conf");
    cfg.system_components(DeadletterBox::new, NetworkConfig::new(socket).build());

    let system = cfg.build().expect("KompactSystem");

    let (bootstrap, bootstrap_registration) = system.create_and_register(BootstrapServer::new);
    let bootstrap_service_registration = system.register_by_alias(&bootstrap, BOOTSTRAP_PATH);

    let _bootstrap_unique = bootstrap_registration
        .wait_expect(Duration::from_millis(1000), "bootstrap never registered");
    let bootstrap_service = bootstrap_service_registration
        .wait_expect(Duration::from_millis(1000), "bootstrap never registered");
    system.start(&bootstrap);

    let printer = system.create(TrustPrinter::new);
    let (detector, registration) =
        system.create_and_register(|| EventualLeaderElector::new(bootstrap_service));
    biconnect_components::<EventualLeaderDetection, _, _>(&detector, &printer).expect("connection");
    let _path = registration.wait_expect(Duration::from_millis(1000), "detector never registered");
    system.start(&printer);
    system.start(&detector);

    system
}

pub fn run_client(bootstrap_socket: SocketAddr, client_socket: SocketAddr) -> KompactSystem {
    let mut cfg = KompactConfig::default();
    cfg.load_config_file("./application.conf");
    cfg.system_components(
        DeadletterBox::new,
        NetworkConfig::new(client_socket).build(),
    );

    let system = cfg.build().expect("KompactSystem");

    let bootstrap_service: ActorPath = NamedPath::with_socket(
        Transport::Tcp,
        bootstrap_socket,
        vec![BOOTSTRAP_PATH.into()],
    )
    .into();

    let printer = system.create(TrustPrinter::new);
    let (detector, registration) =
        system.create_and_register(|| EventualLeaderElector::new(bootstrap_service));
    biconnect_components::<EventualLeaderDetection, _, _>(&detector, &printer).expect("connection");
    let _path = registration.wait_expect(Duration::from_millis(1000), "detector never registered");
    system.start(&printer);
    system.start(&detector);

    system
}

#[cfg(test)]
mod tests {
    use super::*;

    const SERVER_SOCKET: &str = "127.0.0.1:12345";
    const CLIENT_SOCKET: &str = "127.0.0.1:0";

    #[test]
    fn test_bootstrapping_serialisation() {
        let server_socket: SocketAddr = SERVER_SOCKET.parse().unwrap();
        let server_system = run_server(server_socket);
        let client_socket: SocketAddr = CLIENT_SOCKET.parse().unwrap();
        let mut clients_systems: Vec<KompactSystem> = (0..3)
            .map(|_i| run_client(server_socket, client_socket))
            .collect();
        // let them settle
        std::thread::sleep(Duration::from_millis(1000));
        // shut down systems one by one
        for sys in clients_systems.drain(..) {
            std::thread::sleep(Duration::from_millis(1000));
            sys.shutdown().expect("shutdown");
        }
        std::thread::sleep(Duration::from_millis(1000));
        server_system.shutdown().expect("shutdown");
    }
}

We continue using the SerialisationId trait like we did for Serde, because we need to write id of the ZST not of the ZstSerialiser, which can serialise and deserialise many different ZSTs.

In order to create the correct type instance during deserialisation, we use the Default trait, which can be trivially derived for ZSTs.

It is clear that this serialiser is basically trivial. We can use it by creating a pair of Checkin with a reference to our static instance CHECK_IN_SER, which simply specialises the ZstSerialiser for CheckIn, as we did before:

#![allow(clippy::unused_unit)]
use kompact::{prelude::*, serde_serialisers::*};
use kompact_examples::trusting::*;
use std::{
    collections::HashSet,
    convert::TryInto,
    net::{IpAddr, Ipv4Addr, SocketAddr},
    time::Duration,
};

struct ZstSerialiser<T>(T)
where
    T: Send + Sync + Default + Copy + SerialisationId;

impl<T> Serialiser<T> for &ZstSerialiser<T>
where
    T: Send + Sync + Default + Copy + SerialisationId,
{
    fn ser_id(&self) -> SerId {
        T::SER_ID
    }

    fn size_hint(&self) -> Option<usize> {
        Some(0)
    }

    fn serialise(&self, _v: &T, _buf: &mut dyn BufMut) -> Result<(), SerError> {
        Ok(())
    }
}

impl<T> Deserialiser<T> for ZstSerialiser<T>
where
    T: Send + Sync + Default + Copy + SerialisationId,
{
    const SER_ID: SerId = T::SER_ID;

    fn deserialise(_buf: &mut dyn Buf) -> Result<T, SerError> {
        Ok(T::default())
    }
}

#[derive(Debug, Clone, Copy, Default)]
struct CheckIn;

impl SerialisationId for CheckIn {
    const SER_ID: SerId = 2345;
}

static CHECK_IN_SER: ZstSerialiser<CheckIn> = ZstSerialiser(CheckIn);

#[derive(Debug, Clone)]
struct UpdateProcesses(Vec<ActorPath>);

impl Serialisable for UpdateProcesses {
    fn ser_id(&self) -> SerId {
        Self::SER_ID
    }

    fn size_hint(&self) -> Option<usize> {
        let procs_size = self.0.len() * 23; // 23 bytes is the size of a unique actor path
        Some(8 + procs_size)
    }

    fn serialise(&self, buf: &mut dyn BufMut) -> Result<(), SerError> {
        let len = self.0.len() as u64;
        buf.put_u64(len);
        for path in self.0.iter() {
            path.serialise(buf)?;
        }
        Ok(())
    }

    fn local(self: Box<Self>) -> Result<Box<dyn Any + Send>, Box<dyn Serialisable>> {
        Ok(self)
    }
}

impl Deserialiser<UpdateProcesses> for UpdateProcesses {
    const SER_ID: SerId = 3456;

    fn deserialise(buf: &mut dyn Buf) -> Result<UpdateProcesses, SerError> {
        let len_u64 = buf.get_u64();
        let len: usize = len_u64.try_into().map_err(SerError::from_debug)?;
        let mut data: Vec<ActorPath> = Vec::with_capacity(len);
        for _i in 0..len {
            let path = ActorPath::deserialise(buf)?;
            data.push(path);
        }
        Ok(UpdateProcesses(data))
    }
}

#[derive(ComponentDefinition)]
struct BootstrapServer {
    ctx: ComponentContext<Self>,
    processes: HashSet<ActorPath>,
}

impl BootstrapServer {
    fn new() -> Self {
        BootstrapServer {
            ctx: ComponentContext::uninitialised(),
            processes: HashSet::new(),
        }
    }

    fn broadcast_processess(&self) -> Handled {
        let procs: Vec<ActorPath> = self.processes.iter().cloned().collect();
        let msg = UpdateProcesses(procs);

        self.processes.iter().for_each(|process| {
            process
                .tell_serialised(msg.clone(), self)
                .unwrap_or_else(|e| warn!(self.log(), "Error during serialisation: {}", e));
        });
        Handled::Ok
    }
}

ignore_lifecycle!(BootstrapServer);

impl NetworkActor for BootstrapServer {
    type Deserialiser = ZstSerialiser<CheckIn>;
    type Message = CheckIn;

    fn receive(&mut self, source: Option<ActorPath>, _msg: Self::Message) -> Handled {
        if let Some(process) = source {
            if self.processes.insert(process) {
                self.broadcast_processess()
            } else {
                Handled::Ok
            }
        } else {
            Handled::Ok
        }
    }
}

#[derive(ComponentDefinition)]
struct EventualLeaderElector {
    ctx: ComponentContext<Self>,
    omega_port: ProvidedPort<EventualLeaderDetection>,
    bootstrap_server: ActorPath,
    processes: Box<[ActorPath]>,
    candidates: HashSet<ActorPath>,
    period: Duration,
    delta: Duration,
    timer_handle: Option<ScheduledTimer>,
    leader: Option<ActorPath>,
}

impl EventualLeaderElector {
    fn new(bootstrap_server: ActorPath) -> Self {
        let minimal_period = Duration::from_millis(1);
        EventualLeaderElector {
            ctx: ComponentContext::uninitialised(),
            omega_port: ProvidedPort::uninitialised(),
            bootstrap_server,
            processes: Vec::new().into_boxed_slice(),
            candidates: HashSet::new(),
            period: minimal_period,
            delta: minimal_period,
            timer_handle: None,
            leader: None,
        }
    }

    fn select_leader(&mut self) -> Option<ActorPath> {
        let mut candidates: Vec<ActorPath> = self.candidates.drain().collect();
        candidates.sort_unstable();
        candidates.reverse(); // pick smallest instead of largest
        candidates.pop()
    }

    fn handle_timeout(&mut self, timeout_id: ScheduledTimer) -> Handled {
        match self.timer_handle.take() {
            Some(timeout) if timeout == timeout_id => {
                let new_leader = self.select_leader();
                if new_leader != self.leader {
                    self.period += self.delta;
                    self.leader = new_leader;
                    if let Some(ref leader) = self.leader {
                        self.omega_port.trigger(Trust(leader.clone()));
                    }
                    self.cancel_timer(timeout);
                    let new_timer = self.schedule_periodic(
                        self.period,
                        self.period,
                        EventualLeaderElector::handle_timeout,
                    );
                    self.timer_handle = Some(new_timer);
                } else {
                    // just put it back
                    self.timer_handle = Some(timeout);
                }
                self.send_heartbeats()
            }
            Some(_) => Handled::Ok, // just ignore outdated timeouts
            None => {
                warn!(self.log(), "Got unexpected timeout: {:?}", timeout_id);
                Handled::Ok
            } // can happen during restart or teardown
        }
    }

    fn send_heartbeats(&self) -> Handled {
        self.processes.iter().for_each(|process| {
            process.tell((Heartbeat, Serde), self);
        });
        Handled::Ok
    }
}

impl ComponentLifecycle for EventualLeaderElector {
    fn on_start(&mut self) -> Handled {
        self.bootstrap_server.tell((CheckIn, &CHECK_IN_SER), self);

        self.period = self.ctx.config()["omega"]["initial-period"]
            .as_duration()
            .expect("initial period");
        self.delta = self.ctx.config()["omega"]["delta"]
            .as_duration()
            .expect("delta");
        let timeout = self.schedule_periodic(
            self.period,
            self.period,
            EventualLeaderElector::handle_timeout,
        );
        self.timer_handle = Some(timeout);
        Handled::Ok
    }

    fn on_stop(&mut self) -> Handled {
        if let Some(timeout) = self.timer_handle.take() {
            self.cancel_timer(timeout);
        }
        Handled::Ok
    }

    fn on_kill(&mut self) -> Handled {
        self.on_stop()
    }
}
// Doesn't have any requests
ignore_requests!(EventualLeaderDetection, EventualLeaderElector);

impl Actor for EventualLeaderElector {
    type Message = Never;

    fn receive_local(&mut self, _msg: Self::Message) -> Handled {
        unreachable!();
    }

    fn receive_network(&mut self, msg: NetMessage) -> Handled {
        let sender = msg.sender;

        match_deser! {
            (msg.data) {
                msg(_heartbeat): Heartbeat [using Serde] => {
                    self.candidates.insert(sender);
                },
                msg(update): UpdateProcesses => {
                    let UpdateProcesses(processes) = update;
                    info!(
                        self.log(),
                        "Received new process set with {} processes",
                        processes.len()
                    );
                    self.processes = processes.into_boxed_slice();
                },
            }
        }
        Handled::Ok
    }
}

pub fn main() {
    let args: Vec<String> = std::env::args().collect();
    match args.len() {
        2 => {
            let bootstrap_port: u16 = args[1].parse().expect("port number");
            let bootstrap_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), bootstrap_port);
            let system = run_server(bootstrap_socket);
            system.await_termination(); // gotta quit it from command line
        }
        3 => {
            let bootstrap_port: u16 = args[1].parse().expect("port number");
            let bootstrap_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), bootstrap_port);
            let client_port: u16 = args[2].parse().expect("port number");
            let client_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), client_port);
            let system = run_client(bootstrap_socket, client_socket);
            system.await_termination(); // gotta quit it from command line
        }
        x => panic!("Expected either 1 argument (the port for the bootstrap server to bind on) or 2 arguments (boostrap server and client port), but got {} instead!", x - 1),
    }
}

const BOOTSTRAP_PATH: &str = "bootstrap";

pub fn run_server(socket: SocketAddr) -> KompactSystem {
    let mut cfg = KompactConfig::default();
    cfg.load_config_file("./application.conf");
    cfg.system_components(DeadletterBox::new, NetworkConfig::new(socket).build());

    let system = cfg.build().expect("KompactSystem");

    let (bootstrap, bootstrap_registration) = system.create_and_register(BootstrapServer::new);
    let bootstrap_service_registration = system.register_by_alias(&bootstrap, BOOTSTRAP_PATH);

    let _bootstrap_unique = bootstrap_registration
        .wait_expect(Duration::from_millis(1000), "bootstrap never registered");
    let bootstrap_service = bootstrap_service_registration
        .wait_expect(Duration::from_millis(1000), "bootstrap never registered");
    system.start(&bootstrap);

    let printer = system.create(TrustPrinter::new);
    let (detector, registration) =
        system.create_and_register(|| EventualLeaderElector::new(bootstrap_service));
    biconnect_components::<EventualLeaderDetection, _, _>(&detector, &printer).expect("connection");
    let _path = registration.wait_expect(Duration::from_millis(1000), "detector never registered");
    system.start(&printer);
    system.start(&detector);

    system
}

pub fn run_client(bootstrap_socket: SocketAddr, client_socket: SocketAddr) -> KompactSystem {
    let mut cfg = KompactConfig::default();
    cfg.load_config_file("./application.conf");
    cfg.system_components(
        DeadletterBox::new,
        NetworkConfig::new(client_socket).build(),
    );

    let system = cfg.build().expect("KompactSystem");

    let bootstrap_service: ActorPath = NamedPath::with_socket(
        Transport::Tcp,
        bootstrap_socket,
        vec![BOOTSTRAP_PATH.into()],
    )
    .into();

    let printer = system.create(TrustPrinter::new);
    let (detector, registration) =
        system.create_and_register(|| EventualLeaderElector::new(bootstrap_service));
    biconnect_components::<EventualLeaderDetection, _, _>(&detector, &printer).expect("connection");
    let _path = registration.wait_expect(Duration::from_millis(1000), "detector never registered");
    system.start(&printer);
    system.start(&detector);

    system
}

#[cfg(test)]
mod tests {
    use super::*;

    const SERVER_SOCKET: &str = "127.0.0.1:12345";
    const CLIENT_SOCKET: &str = "127.0.0.1:0";

    #[test]
    fn test_bootstrapping_serialisation() {
        let server_socket: SocketAddr = SERVER_SOCKET.parse().unwrap();
        let server_system = run_server(server_socket);
        let client_socket: SocketAddr = CLIENT_SOCKET.parse().unwrap();
        let mut clients_systems: Vec<KompactSystem> = (0..3)
            .map(|_i| run_client(server_socket, client_socket))
            .collect();
        // let them settle
        std::thread::sleep(Duration::from_millis(1000));
        // shut down systems one by one
        for sys in clients_systems.drain(..) {
            std::thread::sleep(Duration::from_millis(1000));
            sys.shutdown().expect("shutdown");
        }
        std::thread::sleep(Duration::from_millis(1000));
        server_system.shutdown().expect("shutdown");
    }
}

Serialisable

Since the previous example was somewhat trivial, we will do a slightly trickier one for the Serialisable example. We will make the UpdateProcesses type both Serialisable and Deserialiser<UpdateProcesses>. This type contains a vector of ActorPath instances, which we must handle correctly. We will reuse the Serialisable and Deserialiser<ActorPath> implementations that are already provided for the ActorPath type.

#![allow(clippy::unused_unit)]
use kompact::{prelude::*, serde_serialisers::*};
use kompact_examples::trusting::*;
use std::{
    collections::HashSet,
    convert::TryInto,
    net::{IpAddr, Ipv4Addr, SocketAddr},
    time::Duration,
};

struct ZstSerialiser<T>(T)
where
    T: Send + Sync + Default + Copy + SerialisationId;

impl<T> Serialiser<T> for &ZstSerialiser<T>
where
    T: Send + Sync + Default + Copy + SerialisationId,
{
    fn ser_id(&self) -> SerId {
        T::SER_ID
    }

    fn size_hint(&self) -> Option<usize> {
        Some(0)
    }

    fn serialise(&self, _v: &T, _buf: &mut dyn BufMut) -> Result<(), SerError> {
        Ok(())
    }
}

impl<T> Deserialiser<T> for ZstSerialiser<T>
where
    T: Send + Sync + Default + Copy + SerialisationId,
{
    const SER_ID: SerId = T::SER_ID;

    fn deserialise(_buf: &mut dyn Buf) -> Result<T, SerError> {
        Ok(T::default())
    }
}

#[derive(Debug, Clone, Copy, Default)]
struct CheckIn;

impl SerialisationId for CheckIn {
    const SER_ID: SerId = 2345;
}

static CHECK_IN_SER: ZstSerialiser<CheckIn> = ZstSerialiser(CheckIn);

#[derive(Debug, Clone)]
struct UpdateProcesses(Vec<ActorPath>);

impl Serialisable for UpdateProcesses {
    fn ser_id(&self) -> SerId {
        Self::SER_ID
    }

    fn size_hint(&self) -> Option<usize> {
        let procs_size = self.0.len() * 23; // 23 bytes is the size of a unique actor path
        Some(8 + procs_size)
    }

    fn serialise(&self, buf: &mut dyn BufMut) -> Result<(), SerError> {
        let len = self.0.len() as u64;
        buf.put_u64(len);
        for path in self.0.iter() {
            path.serialise(buf)?;
        }
        Ok(())
    }

    fn local(self: Box<Self>) -> Result<Box<dyn Any + Send>, Box<dyn Serialisable>> {
        Ok(self)
    }
}

impl Deserialiser<UpdateProcesses> for UpdateProcesses {
    const SER_ID: SerId = 3456;

    fn deserialise(buf: &mut dyn Buf) -> Result<UpdateProcesses, SerError> {
        let len_u64 = buf.get_u64();
        let len: usize = len_u64.try_into().map_err(SerError::from_debug)?;
        let mut data: Vec<ActorPath> = Vec::with_capacity(len);
        for _i in 0..len {
            let path = ActorPath::deserialise(buf)?;
            data.push(path);
        }
        Ok(UpdateProcesses(data))
    }
}

#[derive(ComponentDefinition)]
struct BootstrapServer {
    ctx: ComponentContext<Self>,
    processes: HashSet<ActorPath>,
}

impl BootstrapServer {
    fn new() -> Self {
        BootstrapServer {
            ctx: ComponentContext::uninitialised(),
            processes: HashSet::new(),
        }
    }

    fn broadcast_processess(&self) -> Handled {
        let procs: Vec<ActorPath> = self.processes.iter().cloned().collect();
        let msg = UpdateProcesses(procs);

        self.processes.iter().for_each(|process| {
            process
                .tell_serialised(msg.clone(), self)
                .unwrap_or_else(|e| warn!(self.log(), "Error during serialisation: {}", e));
        });
        Handled::Ok
    }
}

ignore_lifecycle!(BootstrapServer);

impl NetworkActor for BootstrapServer {
    type Deserialiser = ZstSerialiser<CheckIn>;
    type Message = CheckIn;

    fn receive(&mut self, source: Option<ActorPath>, _msg: Self::Message) -> Handled {
        if let Some(process) = source {
            if self.processes.insert(process) {
                self.broadcast_processess()
            } else {
                Handled::Ok
            }
        } else {
            Handled::Ok
        }
    }
}

#[derive(ComponentDefinition)]
struct EventualLeaderElector {
    ctx: ComponentContext<Self>,
    omega_port: ProvidedPort<EventualLeaderDetection>,
    bootstrap_server: ActorPath,
    processes: Box<[ActorPath]>,
    candidates: HashSet<ActorPath>,
    period: Duration,
    delta: Duration,
    timer_handle: Option<ScheduledTimer>,
    leader: Option<ActorPath>,
}

impl EventualLeaderElector {
    fn new(bootstrap_server: ActorPath) -> Self {
        let minimal_period = Duration::from_millis(1);
        EventualLeaderElector {
            ctx: ComponentContext::uninitialised(),
            omega_port: ProvidedPort::uninitialised(),
            bootstrap_server,
            processes: Vec::new().into_boxed_slice(),
            candidates: HashSet::new(),
            period: minimal_period,
            delta: minimal_period,
            timer_handle: None,
            leader: None,
        }
    }

    fn select_leader(&mut self) -> Option<ActorPath> {
        let mut candidates: Vec<ActorPath> = self.candidates.drain().collect();
        candidates.sort_unstable();
        candidates.reverse(); // pick smallest instead of largest
        candidates.pop()
    }

    fn handle_timeout(&mut self, timeout_id: ScheduledTimer) -> Handled {
        match self.timer_handle.take() {
            Some(timeout) if timeout == timeout_id => {
                let new_leader = self.select_leader();
                if new_leader != self.leader {
                    self.period += self.delta;
                    self.leader = new_leader;
                    if let Some(ref leader) = self.leader {
                        self.omega_port.trigger(Trust(leader.clone()));
                    }
                    self.cancel_timer(timeout);
                    let new_timer = self.schedule_periodic(
                        self.period,
                        self.period,
                        EventualLeaderElector::handle_timeout,
                    );
                    self.timer_handle = Some(new_timer);
                } else {
                    // just put it back
                    self.timer_handle = Some(timeout);
                }
                self.send_heartbeats()
            }
            Some(_) => Handled::Ok, // just ignore outdated timeouts
            None => {
                warn!(self.log(), "Got unexpected timeout: {:?}", timeout_id);
                Handled::Ok
            } // can happen during restart or teardown
        }
    }

    fn send_heartbeats(&self) -> Handled {
        self.processes.iter().for_each(|process| {
            process.tell((Heartbeat, Serde), self);
        });
        Handled::Ok
    }
}

impl ComponentLifecycle for EventualLeaderElector {
    fn on_start(&mut self) -> Handled {
        self.bootstrap_server.tell((CheckIn, &CHECK_IN_SER), self);

        self.period = self.ctx.config()["omega"]["initial-period"]
            .as_duration()
            .expect("initial period");
        self.delta = self.ctx.config()["omega"]["delta"]
            .as_duration()
            .expect("delta");
        let timeout = self.schedule_periodic(
            self.period,
            self.period,
            EventualLeaderElector::handle_timeout,
        );
        self.timer_handle = Some(timeout);
        Handled::Ok
    }

    fn on_stop(&mut self) -> Handled {
        if let Some(timeout) = self.timer_handle.take() {
            self.cancel_timer(timeout);
        }
        Handled::Ok
    }

    fn on_kill(&mut self) -> Handled {
        self.on_stop()
    }
}
// Doesn't have any requests
ignore_requests!(EventualLeaderDetection, EventualLeaderElector);

impl Actor for EventualLeaderElector {
    type Message = Never;

    fn receive_local(&mut self, _msg: Self::Message) -> Handled {
        unreachable!();
    }

    fn receive_network(&mut self, msg: NetMessage) -> Handled {
        let sender = msg.sender;

        match_deser! {
            (msg.data) {
                msg(_heartbeat): Heartbeat [using Serde] => {
                    self.candidates.insert(sender);
                },
                msg(update): UpdateProcesses => {
                    let UpdateProcesses(processes) = update;
                    info!(
                        self.log(),
                        "Received new process set with {} processes",
                        processes.len()
                    );
                    self.processes = processes.into_boxed_slice();
                },
            }
        }
        Handled::Ok
    }
}

pub fn main() {
    let args: Vec<String> = std::env::args().collect();
    match args.len() {
        2 => {
            let bootstrap_port: u16 = args[1].parse().expect("port number");
            let bootstrap_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), bootstrap_port);
            let system = run_server(bootstrap_socket);
            system.await_termination(); // gotta quit it from command line
        }
        3 => {
            let bootstrap_port: u16 = args[1].parse().expect("port number");
            let bootstrap_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), bootstrap_port);
            let client_port: u16 = args[2].parse().expect("port number");
            let client_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), client_port);
            let system = run_client(bootstrap_socket, client_socket);
            system.await_termination(); // gotta quit it from command line
        }
        x => panic!("Expected either 1 argument (the port for the bootstrap server to bind on) or 2 arguments (boostrap server and client port), but got {} instead!", x - 1),
    }
}

const BOOTSTRAP_PATH: &str = "bootstrap";

pub fn run_server(socket: SocketAddr) -> KompactSystem {
    let mut cfg = KompactConfig::default();
    cfg.load_config_file("./application.conf");
    cfg.system_components(DeadletterBox::new, NetworkConfig::new(socket).build());

    let system = cfg.build().expect("KompactSystem");

    let (bootstrap, bootstrap_registration) = system.create_and_register(BootstrapServer::new);
    let bootstrap_service_registration = system.register_by_alias(&bootstrap, BOOTSTRAP_PATH);

    let _bootstrap_unique = bootstrap_registration
        .wait_expect(Duration::from_millis(1000), "bootstrap never registered");
    let bootstrap_service = bootstrap_service_registration
        .wait_expect(Duration::from_millis(1000), "bootstrap never registered");
    system.start(&bootstrap);

    let printer = system.create(TrustPrinter::new);
    let (detector, registration) =
        system.create_and_register(|| EventualLeaderElector::new(bootstrap_service));
    biconnect_components::<EventualLeaderDetection, _, _>(&detector, &printer).expect("connection");
    let _path = registration.wait_expect(Duration::from_millis(1000), "detector never registered");
    system.start(&printer);
    system.start(&detector);

    system
}

pub fn run_client(bootstrap_socket: SocketAddr, client_socket: SocketAddr) -> KompactSystem {
    let mut cfg = KompactConfig::default();
    cfg.load_config_file("./application.conf");
    cfg.system_components(
        DeadletterBox::new,
        NetworkConfig::new(client_socket).build(),
    );

    let system = cfg.build().expect("KompactSystem");

    let bootstrap_service: ActorPath = NamedPath::with_socket(
        Transport::Tcp,
        bootstrap_socket,
        vec![BOOTSTRAP_PATH.into()],
    )
    .into();

    let printer = system.create(TrustPrinter::new);
    let (detector, registration) =
        system.create_and_register(|| EventualLeaderElector::new(bootstrap_service));
    biconnect_components::<EventualLeaderDetection, _, _>(&detector, &printer).expect("connection");
    let _path = registration.wait_expect(Duration::from_millis(1000), "detector never registered");
    system.start(&printer);
    system.start(&detector);

    system
}

#[cfg(test)]
mod tests {
    use super::*;

    const SERVER_SOCKET: &str = "127.0.0.1:12345";
    const CLIENT_SOCKET: &str = "127.0.0.1:0";

    #[test]
    fn test_bootstrapping_serialisation() {
        let server_socket: SocketAddr = SERVER_SOCKET.parse().unwrap();
        let server_system = run_server(server_socket);
        let client_socket: SocketAddr = CLIENT_SOCKET.parse().unwrap();
        let mut clients_systems: Vec<KompactSystem> = (0..3)
            .map(|_i| run_client(server_socket, client_socket))
            .collect();
        // let them settle
        std::thread::sleep(Duration::from_millis(1000));
        // shut down systems one by one
        for sys in clients_systems.drain(..) {
            std::thread::sleep(Duration::from_millis(1000));
            sys.shutdown().expect("shutdown");
        }
        std::thread::sleep(Duration::from_millis(1000));
        server_system.shutdown().expect("shutdown");
    }
}

It would be easy to just iterate through the vector during serialisation and write one path at a time using its own serialise(...) implementation. But during deserialisation we need to know how many paths we have to take out of the buffer. We could simply try taking until the buffer refuses us, but this kind of approach often makes it difficult to detect bugs in one’s serialiser implementations. We will instead write the length of the vector before we serialise the actor paths, and during deserialisation we will read it first and allocate a vector of appropriate size. If we are concerned about the space the length wastes, we could try to use some better integer encoding like Protocol Buffers do, for example. But for now we don’t care so much and simply write a full u64. Those extra 8 bytes make little different compared to the sizes of a bunch of actor paths.

We don’t really have a good idea for size_hint(...) here. It’s basically 8 plus the sum of the size hints for each actor path. In this case, we actually know we are pretty much just going to send unique actor paths in this set, so we can assume each one is 23 bytes long. If that assumption turns out to be wrong in practice, it will simply cause some additional allocations during serialisation. In general, as a developer we have to decide on a trade-off between how much time we want to spend calculating accurate size hints, and how much time we want to spend on potential reallocations. We could also simply return a large number such as 1024 and accept that we may may often waste much of that allocated space. Application requirements (read: benchmarking) will determine which is the best choice in a particular scenario.

Eager Serialisation

As mentioned above, using ActorPath::tell(...) may cause a stack-to-heap move of the data, as it is being converted into a boxed trait object for lazy serialisation. This approach optimises for avoiding expensive serialisations in the case where the ActorPath turns out to be local. However, this may not always be the appropriate approach, in particular if serialisation is quick compared to allocation, or most actor paths are not going to be local anyway. For these cases, Kompact also allows eager serialisation. To force an instance to be serialised eagerly, on the sending component’s thread, you can use ActorPath::tell_serialised(...). It works essentially the same as ActorPath::tell(...) but uses a buffer pool local to the sending component to serialise the data into, before sending it off to the dispatcher. If then in the dispatcher it turns out that the actor path was actually local, the data simply has to be deserialised again, as if it had arrived remotely. If the target is remote, however, the data can be written directly into the appropriate channel.

Example

To show an easy usage for this approach, we use eager serialisation in the BootstrapServer::broadcast_processess function:

#![allow(clippy::unused_unit)]
use kompact::{prelude::*, serde_serialisers::*};
use kompact_examples::trusting::*;
use std::{
    collections::HashSet,
    convert::TryInto,
    net::{IpAddr, Ipv4Addr, SocketAddr},
    time::Duration,
};

struct ZstSerialiser<T>(T)
where
    T: Send + Sync + Default + Copy + SerialisationId;

impl<T> Serialiser<T> for &ZstSerialiser<T>
where
    T: Send + Sync + Default + Copy + SerialisationId,
{
    fn ser_id(&self) -> SerId {
        T::SER_ID
    }

    fn size_hint(&self) -> Option<usize> {
        Some(0)
    }

    fn serialise(&self, _v: &T, _buf: &mut dyn BufMut) -> Result<(), SerError> {
        Ok(())
    }
}

impl<T> Deserialiser<T> for ZstSerialiser<T>
where
    T: Send + Sync + Default + Copy + SerialisationId,
{
    const SER_ID: SerId = T::SER_ID;

    fn deserialise(_buf: &mut dyn Buf) -> Result<T, SerError> {
        Ok(T::default())
    }
}

#[derive(Debug, Clone, Copy, Default)]
struct CheckIn;

impl SerialisationId for CheckIn {
    const SER_ID: SerId = 2345;
}

static CHECK_IN_SER: ZstSerialiser<CheckIn> = ZstSerialiser(CheckIn);

#[derive(Debug, Clone)]
struct UpdateProcesses(Vec<ActorPath>);

impl Serialisable for UpdateProcesses {
    fn ser_id(&self) -> SerId {
        Self::SER_ID
    }

    fn size_hint(&self) -> Option<usize> {
        let procs_size = self.0.len() * 23; // 23 bytes is the size of a unique actor path
        Some(8 + procs_size)
    }

    fn serialise(&self, buf: &mut dyn BufMut) -> Result<(), SerError> {
        let len = self.0.len() as u64;
        buf.put_u64(len);
        for path in self.0.iter() {
            path.serialise(buf)?;
        }
        Ok(())
    }

    fn local(self: Box<Self>) -> Result<Box<dyn Any + Send>, Box<dyn Serialisable>> {
        Ok(self)
    }
}

impl Deserialiser<UpdateProcesses> for UpdateProcesses {
    const SER_ID: SerId = 3456;

    fn deserialise(buf: &mut dyn Buf) -> Result<UpdateProcesses, SerError> {
        let len_u64 = buf.get_u64();
        let len: usize = len_u64.try_into().map_err(SerError::from_debug)?;
        let mut data: Vec<ActorPath> = Vec::with_capacity(len);
        for _i in 0..len {
            let path = ActorPath::deserialise(buf)?;
            data.push(path);
        }
        Ok(UpdateProcesses(data))
    }
}

#[derive(ComponentDefinition)]
struct BootstrapServer {
    ctx: ComponentContext<Self>,
    processes: HashSet<ActorPath>,
}

impl BootstrapServer {
    fn new() -> Self {
        BootstrapServer {
            ctx: ComponentContext::uninitialised(),
            processes: HashSet::new(),
        }
    }

    fn broadcast_processess(&self) -> Handled {
        let procs: Vec<ActorPath> = self.processes.iter().cloned().collect();
        let msg = UpdateProcesses(procs);

        self.processes.iter().for_each(|process| {
            process
                .tell_serialised(msg.clone(), self)
                .unwrap_or_else(|e| warn!(self.log(), "Error during serialisation: {}", e));
        });
        Handled::Ok
    }
}

ignore_lifecycle!(BootstrapServer);

impl NetworkActor for BootstrapServer {
    type Deserialiser = ZstSerialiser<CheckIn>;
    type Message = CheckIn;

    fn receive(&mut self, source: Option<ActorPath>, _msg: Self::Message) -> Handled {
        if let Some(process) = source {
            if self.processes.insert(process) {
                self.broadcast_processess()
            } else {
                Handled::Ok
            }
        } else {
            Handled::Ok
        }
    }
}

#[derive(ComponentDefinition)]
struct EventualLeaderElector {
    ctx: ComponentContext<Self>,
    omega_port: ProvidedPort<EventualLeaderDetection>,
    bootstrap_server: ActorPath,
    processes: Box<[ActorPath]>,
    candidates: HashSet<ActorPath>,
    period: Duration,
    delta: Duration,
    timer_handle: Option<ScheduledTimer>,
    leader: Option<ActorPath>,
}

impl EventualLeaderElector {
    fn new(bootstrap_server: ActorPath) -> Self {
        let minimal_period = Duration::from_millis(1);
        EventualLeaderElector {
            ctx: ComponentContext::uninitialised(),
            omega_port: ProvidedPort::uninitialised(),
            bootstrap_server,
            processes: Vec::new().into_boxed_slice(),
            candidates: HashSet::new(),
            period: minimal_period,
            delta: minimal_period,
            timer_handle: None,
            leader: None,
        }
    }

    fn select_leader(&mut self) -> Option<ActorPath> {
        let mut candidates: Vec<ActorPath> = self.candidates.drain().collect();
        candidates.sort_unstable();
        candidates.reverse(); // pick smallest instead of largest
        candidates.pop()
    }

    fn handle_timeout(&mut self, timeout_id: ScheduledTimer) -> Handled {
        match self.timer_handle.take() {
            Some(timeout) if timeout == timeout_id => {
                let new_leader = self.select_leader();
                if new_leader != self.leader {
                    self.period += self.delta;
                    self.leader = new_leader;
                    if let Some(ref leader) = self.leader {
                        self.omega_port.trigger(Trust(leader.clone()));
                    }
                    self.cancel_timer(timeout);
                    let new_timer = self.schedule_periodic(
                        self.period,
                        self.period,
                        EventualLeaderElector::handle_timeout,
                    );
                    self.timer_handle = Some(new_timer);
                } else {
                    // just put it back
                    self.timer_handle = Some(timeout);
                }
                self.send_heartbeats()
            }
            Some(_) => Handled::Ok, // just ignore outdated timeouts
            None => {
                warn!(self.log(), "Got unexpected timeout: {:?}", timeout_id);
                Handled::Ok
            } // can happen during restart or teardown
        }
    }

    fn send_heartbeats(&self) -> Handled {
        self.processes.iter().for_each(|process| {
            process.tell((Heartbeat, Serde), self);
        });
        Handled::Ok
    }
}

impl ComponentLifecycle for EventualLeaderElector {
    fn on_start(&mut self) -> Handled {
        self.bootstrap_server.tell((CheckIn, &CHECK_IN_SER), self);

        self.period = self.ctx.config()["omega"]["initial-period"]
            .as_duration()
            .expect("initial period");
        self.delta = self.ctx.config()["omega"]["delta"]
            .as_duration()
            .expect("delta");
        let timeout = self.schedule_periodic(
            self.period,
            self.period,
            EventualLeaderElector::handle_timeout,
        );
        self.timer_handle = Some(timeout);
        Handled::Ok
    }

    fn on_stop(&mut self) -> Handled {
        if let Some(timeout) = self.timer_handle.take() {
            self.cancel_timer(timeout);
        }
        Handled::Ok
    }

    fn on_kill(&mut self) -> Handled {
        self.on_stop()
    }
}
// Doesn't have any requests
ignore_requests!(EventualLeaderDetection, EventualLeaderElector);

impl Actor for EventualLeaderElector {
    type Message = Never;

    fn receive_local(&mut self, _msg: Self::Message) -> Handled {
        unreachable!();
    }

    fn receive_network(&mut self, msg: NetMessage) -> Handled {
        let sender = msg.sender;

        match_deser! {
            (msg.data) {
                msg(_heartbeat): Heartbeat [using Serde] => {
                    self.candidates.insert(sender);
                },
                msg(update): UpdateProcesses => {
                    let UpdateProcesses(processes) = update;
                    info!(
                        self.log(),
                        "Received new process set with {} processes",
                        processes.len()
                    );
                    self.processes = processes.into_boxed_slice();
                },
            }
        }
        Handled::Ok
    }
}

pub fn main() {
    let args: Vec<String> = std::env::args().collect();
    match args.len() {
        2 => {
            let bootstrap_port: u16 = args[1].parse().expect("port number");
            let bootstrap_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), bootstrap_port);
            let system = run_server(bootstrap_socket);
            system.await_termination(); // gotta quit it from command line
        }
        3 => {
            let bootstrap_port: u16 = args[1].parse().expect("port number");
            let bootstrap_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), bootstrap_port);
            let client_port: u16 = args[2].parse().expect("port number");
            let client_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), client_port);
            let system = run_client(bootstrap_socket, client_socket);
            system.await_termination(); // gotta quit it from command line
        }
        x => panic!("Expected either 1 argument (the port for the bootstrap server to bind on) or 2 arguments (boostrap server and client port), but got {} instead!", x - 1),
    }
}

const BOOTSTRAP_PATH: &str = "bootstrap";

pub fn run_server(socket: SocketAddr) -> KompactSystem {
    let mut cfg = KompactConfig::default();
    cfg.load_config_file("./application.conf");
    cfg.system_components(DeadletterBox::new, NetworkConfig::new(socket).build());

    let system = cfg.build().expect("KompactSystem");

    let (bootstrap, bootstrap_registration) = system.create_and_register(BootstrapServer::new);
    let bootstrap_service_registration = system.register_by_alias(&bootstrap, BOOTSTRAP_PATH);

    let _bootstrap_unique = bootstrap_registration
        .wait_expect(Duration::from_millis(1000), "bootstrap never registered");
    let bootstrap_service = bootstrap_service_registration
        .wait_expect(Duration::from_millis(1000), "bootstrap never registered");
    system.start(&bootstrap);

    let printer = system.create(TrustPrinter::new);
    let (detector, registration) =
        system.create_and_register(|| EventualLeaderElector::new(bootstrap_service));
    biconnect_components::<EventualLeaderDetection, _, _>(&detector, &printer).expect("connection");
    let _path = registration.wait_expect(Duration::from_millis(1000), "detector never registered");
    system.start(&printer);
    system.start(&detector);

    system
}

pub fn run_client(bootstrap_socket: SocketAddr, client_socket: SocketAddr) -> KompactSystem {
    let mut cfg = KompactConfig::default();
    cfg.load_config_file("./application.conf");
    cfg.system_components(
        DeadletterBox::new,
        NetworkConfig::new(client_socket).build(),
    );

    let system = cfg.build().expect("KompactSystem");

    let bootstrap_service: ActorPath = NamedPath::with_socket(
        Transport::Tcp,
        bootstrap_socket,
        vec![BOOTSTRAP_PATH.into()],
    )
    .into();

    let printer = system.create(TrustPrinter::new);
    let (detector, registration) =
        system.create_and_register(|| EventualLeaderElector::new(bootstrap_service));
    biconnect_components::<EventualLeaderDetection, _, _>(&detector, &printer).expect("connection");
    let _path = registration.wait_expect(Duration::from_millis(1000), "detector never registered");
    system.start(&printer);
    system.start(&detector);

    system
}

#[cfg(test)]
mod tests {
    use super::*;

    const SERVER_SOCKET: &str = "127.0.0.1:12345";
    const CLIENT_SOCKET: &str = "127.0.0.1:0";

    #[test]
    fn test_bootstrapping_serialisation() {
        let server_socket: SocketAddr = SERVER_SOCKET.parse().unwrap();
        let server_system = run_server(server_socket);
        let client_socket: SocketAddr = CLIENT_SOCKET.parse().unwrap();
        let mut clients_systems: Vec<KompactSystem> = (0..3)
            .map(|_i| run_client(server_socket, client_socket))
            .collect();
        // let them settle
        std::thread::sleep(Duration::from_millis(1000));
        // shut down systems one by one
        for sys in clients_systems.drain(..) {
            std::thread::sleep(Duration::from_millis(1000));
            sys.shutdown().expect("shutdown");
        }
        std::thread::sleep(Duration::from_millis(1000));
        server_system.shutdown().expect("shutdown");
    }
}

As you can see above, another feature of eager serialisation is that you can (and must) deal with serialisaition errors, which you have no control over using lazy serialisation. In particular, your memory allocation may prevent your local buffer pool from allocating a buffer large enough to fit your data at the time of serialisation. In this case you will get a SerError::BufferError and must decide how to handle that. You could either retry at a later time, or switch to lazy serialisation and hope the network’s buffers still have capacity (assuming they likely have priority over component local buffer pools).