Posted on

Learning about Distributed systems and their usual patterns. I started to learn patterns from the Uncle Bob web. I started checking the Lamport clock.

Furthermore, It is a good opportunity to use message-io library which easier a lot the network protocols. In fact, It simplifies the creation of a client-server communication and its protocol with these files:

//net.rs
use message_io::network::{NetEvent, Transport};
use message_io::node::{self, NodeEvent};

use crate::adapters::network::message::{FromClientMessage, FromServerMessage, Signal};
use crate::entity::Node;

use std::sync::Arc;
use std::sync::Mutex;

pub trait ServerDispatcher {
    fn dispatch(&mut self, received: FromClientMessage) -> Option<FromServerMessage>;
}
pub trait ClientDispatcher {
    fn dispatch(&mut self, received: FromServerMessage) -> Option<FromClientMessage>;
}

/// Network function for setting up a message-io server.
pub fn run_server<T: ServerDispatcher + Clone>(node: Node, dispatcher: Arc<Mutex<T>>) {
    log::info!("Running server for address {:}", node.address);
    let (handler, listener) = node::split::<()>();
    handler
        .network()
        .listen(Transport::FramedTcp, node.address)
        .unwrap();

    listener.for_each(move |event| match event.network() {
        NetEvent::Connected(_, _) => unreachable!(), // Used for explicit connections.
        NetEvent::Accepted(_endpoint, _listener) => log::info!("Client connected"), // Tcp or Ws
        NetEvent::Message(endpoint, data) => {
            let message: FromClientMessage = bincode::deserialize(&data).unwrap();
            let response_message = dispatcher
                .lock()
                .unwrap()
                .clone()
                .dispatch(message.clone())
                .expect("It could no dispatch message");
            log::info!("response message:{:?}", message);
            let response = bincode::serialize(&response_message).expect("It could not serialize ");
            log::info!("response:{:?}", response);

            handler.network().send(endpoint, &response);
        }
        NetEvent::Disconnected(_endpoint) => log::info!("Client disconnected"), //Tcp or Ws
    });
}

/// Network function for setting up a message-io client.
pub fn run_client<T: ClientDispatcher + Clone>(
    message: FromClientMessage,
    dispatcher: Arc<Mutex<T>>,
) {
    let port = std::env::var("PORT").unwrap_or("3042".to_string());
    let host = std::env::var("HOST").unwrap_or("0.0.0.0".to_string());
    let address = format!("{:}:{:}", host, port);

    let (handler, listener) = node::split();

    let (server, _) = handler
        .network()
        .connect(Transport::FramedTcp, address)
        .unwrap();

    listener.for_each(move |event| match event {
        NodeEvent::Network(net_event) => match net_event {
            NetEvent::Connected(_endpoint, _ok) => handler.signals().send(Signal::Greet),
            NetEvent::Accepted(_, _) => unreachable!(), // Only generated by listening
            NetEvent::Message(_endpoint, data) => {
                let message: FromServerMessage =
                    bincode::deserialize(&data).expect("It could not serialize the data");
                dispatcher.lock().unwrap().clone().dispatch(message.clone());
            }
            NetEvent::Disconnected(_endpoint) => (),
        },
        NodeEvent::Signal(signal) => match signal {
            Signal::Greet => {
                handler
                    .network()
                    .send(server, &bincode::serialize(&message).unwrap());
            }
        },
    });
}

and the supported messages

//message.rs
/// Set of available messages for message-io communication.
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum FromClientMessage {
    Ping,
    Read(String, Option<String>, Option<u64>),
    Write(String, String, u64),
    UnknownPong, // Used for non-connection oriented protocols
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum FromServerMessage {
    WrittenAt(u64),
    Pong(String), // Used for connection oriented protocols
    UnknownPong,  // Used for non-connection oriented protocols
}

pub enum Signal {
    Greet,
    // Any other app event here.
}

It works with callback that they are applied via dispatchers for both Client and Server

Then, we should implement our first module that it is the lamport clock structure

//lamport.rs
use std::cmp;

#[derive(Default, Clone)]
pub struct LamportClock {
    pub latest_time: u64,
}

impl LamportClock {
    pub fn new(latest_time: u64) -> Self {
        LamportClock { latest_time }
    }
    pub fn tick(&mut self, request_time: u64) -> u64 {
        self.latest_time = cmp::max(self.latest_time, request_time);
        self.latest_time += 1;
        return self.latest_time;
    }
}