1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
use std::time::Instant;
use massa_consensus_exports::{error::ConsensusError, events::ConsensusEvent};
use massa_models::{
slot::Slot,
timeslots::{get_block_slot_timestamp, get_closest_slot_to_timestamp},
};
use massa_time::MassaTime;
use tracing::{info, warn};
use crate::commands::ConsensusCommand;
use super::ConsensusWorker;
enum WaitingStatus {
Ended,
Interrupted,
Disconnected,
}
impl ConsensusWorker {
/// Execute a command received from the controller also run an update of the graph after processing the command.
///
/// # Arguments:
/// * `command`: the command to execute
///
/// # Returns:
/// An error if the command failed
fn manage_command(&mut self, command: ConsensusCommand) -> Result<(), ConsensusError> {
let mut write_shared_state = self.shared_state.write();
match command {
ConsensusCommand::RegisterBlockHeader(block_id, header) => {
write_shared_state.register_block_header(block_id, header, self.previous_slot)?;
write_shared_state.block_db_changed()
}
ConsensusCommand::RegisterBlock(block_id, slot, block_storage, created) => {
write_shared_state.register_block(
block_id,
slot,
self.previous_slot,
block_storage,
created,
)?;
write_shared_state.block_db_changed()
}
ConsensusCommand::MarkInvalidBlock(block_id, header) => {
write_shared_state.mark_invalid_block(&block_id, header);
Ok(())
}
}
}
/// Wait and interrupt if we receive a command, a stop signal or we reach the `instant`
///
/// # Return:
/// WaitingStatus::Interrupted => if a command has been executed
/// WaitingStatus::Ended => if we reached the `instant`
/// WaitingStatus::Disconnected => if we received a stop signal
fn wait_slot_or_command(&mut self, deadline: Instant) -> WaitingStatus {
match self.command_receiver.recv_deadline(deadline) {
// message received => manage it
Ok(command) => {
if let Err(err) = self.manage_command(command) {
warn!("Error in consensus: {}", err);
}
WaitingStatus::Interrupted
}
// timeout => continue main loop
Err(crossbeam::channel::RecvTimeoutError::Timeout) => WaitingStatus::Ended,
// channel disconnected (sender dropped) => quit main loop
Err(crossbeam::channel::RecvTimeoutError::Disconnected) => WaitingStatus::Disconnected,
}
}
/// Gets the next slot and the instant when it will happen.
/// Slots can be skipped if we waited too much in-between.
/// Extra safety against double-production caused by clock adjustments (this is the role of the `previous_slot` parameter).
fn get_next_slot(&self, previous_slot: Option<Slot>) -> (Slot, Instant) {
// get current absolute time
let now = MassaTime::now();
// get closest slot according to the current absolute time
let mut next_slot = get_closest_slot_to_timestamp(
self.config.thread_count,
self.config.t0,
self.config.genesis_timestamp,
now,
);
// protection against double-production on unexpected system clock adjustment
if let Some(prev_slot) = previous_slot {
if next_slot <= prev_slot {
next_slot = prev_slot
.get_next_slot(self.config.thread_count)
.expect("could not compute next slot");
}
}
// get the timestamp of the target slot
let next_instant = get_block_slot_timestamp(
self.config.thread_count,
self.config.t0,
self.config.genesis_timestamp,
next_slot,
)
.expect("could not get block slot timestamp")
.estimate_instant()
.expect("could not estimate block slot instant");
(next_slot, next_instant)
}
/// Runs in loop forever. This loop must stop every slot to perform operations on stats and graph
/// but can be stopped anytime by a command received.
pub fn run(&mut self) {
let mut last_prune = Instant::now();
loop {
match self.wait_slot_or_command(self.next_instant) {
// When we reached the instant of the next slot
WaitingStatus::Ended => {
if let Some(end) = self.config.end_timestamp {
// The testnet has ended. Will be removed for mainnet.
if self.next_instant > end.estimate_instant().unwrap() {
info!("This episode has come to an end, please get the latest testnet node version to continue");
let _ = self
.shared_state
.read()
.channels
.controller_event_tx
.send(ConsensusEvent::Stop);
break;
}
}
let previous_cycle = self
.previous_slot
.map(|s| s.get_cycle(self.config.periods_per_cycle));
let observed_cycle = self.next_slot.get_cycle(self.config.periods_per_cycle);
if previous_cycle.is_none() {
// first cycle observed
info!("Massa network has started ! 🎉")
}
if previous_cycle < Some(observed_cycle) {
info!("Started cycle {}", observed_cycle);
}
// Execute all operations and checks that should be performed at each slot
{
let mut write_shared_state = self.shared_state.write();
if let Err(err) = write_shared_state.slot_tick(self.next_slot) {
warn!("Error while processing block tick: {}", err);
}
};
if last_prune.elapsed().as_millis()
> self.config.block_db_prune_interval.as_millis() as u128
{
self.shared_state
.write()
.prune()
.expect("Error while pruning");
last_prune = Instant::now();
}
self.previous_slot = Some(self.next_slot);
(self.next_slot, self.next_instant) = self.get_next_slot(Some(self.next_slot));
}
WaitingStatus::Disconnected => {
break;
}
WaitingStatus::Interrupted => {
continue;
}
};
}
}
}