1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
use std::time::Instant;

use massa_consensus_exports::{error::ConsensusError, events::ConsensusEvent};
use massa_models::{
    slot::Slot,
    timeslots::{get_block_slot_timestamp, get_closest_slot_to_timestamp},
};
use massa_time::MassaTime;
use tracing::{info, warn};

use crate::commands::ConsensusCommand;

use super::ConsensusWorker;

enum WaitingStatus {
    Ended,
    Interrupted,
    Disconnected,
}

impl ConsensusWorker {
    /// Execute a command received from the controller also run an update of the graph after processing the command.
    ///
    /// # Arguments:
    /// * `command`: the command to execute
    ///
    /// # Returns:
    /// An error if the command failed
    fn manage_command(&mut self, command: ConsensusCommand) -> Result<(), ConsensusError> {
        let mut write_shared_state = self.shared_state.write();
        match command {
            ConsensusCommand::RegisterBlockHeader(block_id, header) => {
                write_shared_state.register_block_header(block_id, header, self.previous_slot)?;
                write_shared_state.block_db_changed()
            }
            ConsensusCommand::RegisterBlock(block_id, slot, block_storage, created) => {
                write_shared_state.register_block(
                    block_id,
                    slot,
                    self.previous_slot,
                    block_storage,
                    created,
                )?;
                write_shared_state.block_db_changed()
            }
            ConsensusCommand::MarkInvalidBlock(block_id, header) => {
                write_shared_state.mark_invalid_block(&block_id, header);
                Ok(())
            }
        }
    }

    /// Wait and interrupt if we receive a command, a stop signal or we reach the `instant`
    ///
    /// # Return:
    /// WaitingStatus::Interrupted => if a command has been executed
    /// WaitingStatus::Ended => if we reached the `instant`
    /// WaitingStatus::Disconnected => if we received a stop signal
    fn wait_slot_or_command(&mut self, deadline: Instant) -> WaitingStatus {
        match self.command_receiver.recv_deadline(deadline) {
            // message received => manage it
            Ok(command) => {
                if let Err(err) = self.manage_command(command) {
                    warn!("Error in consensus: {}", err);
                }
                WaitingStatus::Interrupted
            }
            // timeout => continue main loop
            Err(crossbeam::channel::RecvTimeoutError::Timeout) => WaitingStatus::Ended,
            // channel disconnected (sender dropped) => quit main loop
            Err(crossbeam::channel::RecvTimeoutError::Disconnected) => WaitingStatus::Disconnected,
        }
    }

    /// Gets the next slot and the instant when it will happen.
    /// Slots can be skipped if we waited too much in-between.
    /// Extra safety against double-production caused by clock adjustments (this is the role of the `previous_slot` parameter).
    fn get_next_slot(&self, previous_slot: Option<Slot>) -> (Slot, Instant) {
        // get current absolute time
        let now = MassaTime::now();

        // get closest slot according to the current absolute time
        let mut next_slot = get_closest_slot_to_timestamp(
            self.config.thread_count,
            self.config.t0,
            self.config.genesis_timestamp,
            now,
        );

        // protection against double-production on unexpected system clock adjustment
        if let Some(prev_slot) = previous_slot {
            if next_slot <= prev_slot {
                next_slot = prev_slot
                    .get_next_slot(self.config.thread_count)
                    .expect("could not compute next slot");
            }
        }

        // get the timestamp of the target slot
        let next_instant = get_block_slot_timestamp(
            self.config.thread_count,
            self.config.t0,
            self.config.genesis_timestamp,
            next_slot,
        )
        .expect("could not get block slot timestamp")
        .estimate_instant()
        .expect("could not estimate block slot instant");

        (next_slot, next_instant)
    }

    /// Runs in loop forever. This loop must stop every slot to perform operations on stats and graph
    /// but can be stopped anytime by a command received.
    pub fn run(&mut self) {
        let mut last_prune = Instant::now();
        loop {
            match self.wait_slot_or_command(self.next_instant) {
                // When we reached the instant of the next slot
                WaitingStatus::Ended => {
                    if let Some(end) = self.config.end_timestamp {
                        // The testnet has ended. Will be removed for mainnet.
                        if self.next_instant > end.estimate_instant().unwrap() {
                            info!("This episode has come to an end, please get the latest testnet node version to continue");
                            let _ = self
                                .shared_state
                                .read()
                                .channels
                                .controller_event_tx
                                .send(ConsensusEvent::Stop);
                            break;
                        }
                    }
                    let previous_cycle = self
                        .previous_slot
                        .map(|s| s.get_cycle(self.config.periods_per_cycle));
                    let observed_cycle = self.next_slot.get_cycle(self.config.periods_per_cycle);
                    if previous_cycle.is_none() {
                        // first cycle observed
                        info!("Massa network has started ! 🎉")
                    }
                    if previous_cycle < Some(observed_cycle) {
                        info!("Started cycle {}", observed_cycle);
                    }
                    // Execute all operations and checks that should be performed at each slot
                    {
                        let mut write_shared_state = self.shared_state.write();
                        if let Err(err) = write_shared_state.slot_tick(self.next_slot) {
                            warn!("Error while processing block tick: {}", err);
                        }
                    };
                    if last_prune.elapsed().as_millis()
                        > self.config.block_db_prune_interval.as_millis() as u128
                    {
                        self.shared_state
                            .write()
                            .prune()
                            .expect("Error while pruning");
                        last_prune = Instant::now();
                    }
                    self.previous_slot = Some(self.next_slot);
                    (self.next_slot, self.next_instant) = self.get_next_slot(Some(self.next_slot));
                }
                WaitingStatus::Disconnected => {
                    break;
                }
                WaitingStatus::Interrupted => {
                    continue;
                }
            };
        }
    }
}