use crossbeam::channel::tick;
use humantime::format_duration;
use massa_consensus_exports::{bootstrapable_graph::BootstrapableGraph, ConsensusController};
use massa_db_exports::CHANGE_ID_DESER_ERROR;
use massa_final_state::FinalStateController;
use massa_logging::massa_trace;
use massa_metrics::MassaMetrics;
use massa_models::{
block_id::BlockId, prehash::PreHashSet, slot::Slot, streaming_step::StreamingStep,
version::Version,
};
use massa_protocol_exports::ProtocolController;
use massa_signature::KeyPair;
use massa_time::MassaTime;
use parking_lot::RwLock;
use std::{
collections::HashMap,
net::{IpAddr, SocketAddr},
sync::Arc,
thread,
time::{Duration, Instant},
};
use tracing::{debug, error, info, warn};
#[cfg(not(test))]
use crate::listener::BootstrapTcpListener;
#[cfg(test)]
use crate::listener::MockBootstrapTcpListener as BootstrapTcpListener;
use crate::{
bindings::BootstrapServerBinder,
error::BootstrapError,
listener::{BootstrapListenerStopHandle, PollEvent},
messages::{BootstrapClientMessage, BootstrapServerMessage},
white_black_list::SharedWhiteBlackList,
BootstrapConfig,
};
pub struct BootstrapManager {
update_handle: thread::JoinHandle<Result<(), BootstrapError>>,
#[allow(clippy::type_complexity)]
main_handle: thread::JoinHandle<Result<(), BootstrapError>>,
listener_stopper: BootstrapListenerStopHandle,
update_stopper_tx: crossbeam::channel::Sender<()>,
pub white_black_list: SharedWhiteBlackList<'static>,
}
impl BootstrapManager {
pub(crate) fn new(
update_handle: thread::JoinHandle<Result<(), BootstrapError>>,
main_handle: thread::JoinHandle<Result<(), BootstrapError>>,
update_stopper_tx: crossbeam::channel::Sender<()>,
listener_stopper: BootstrapListenerStopHandle,
white_black_list: SharedWhiteBlackList<'static>,
) -> Self {
Self {
update_handle,
main_handle,
update_stopper_tx,
listener_stopper,
white_black_list,
}
}
pub fn stop(self) -> Result<(), BootstrapError> {
massa_trace!("bootstrap.lib.stop", {});
if self.listener_stopper.stop().is_err() {
warn!("bootstrap server already dropped");
}
if self.update_stopper_tx.send(()).is_err() {
warn!("bootstrap ip-list-updater already dropped");
}
self.update_handle
.join()
.expect("in BootstrapManager::stop() joining on updater thread")?;
let res = self
.main_handle
.join()
.expect("in BootstrapManager::stop() joining on bootstrap main-loop thread");
info!("bootstrap server stopped");
res
}
}
#[allow(clippy::too_many_arguments)]
pub fn start_bootstrap_server(
ev_poller: BootstrapTcpListener,
listener_stopper: BootstrapListenerStopHandle,
consensus_controller: Box<dyn ConsensusController>,
protocol_controller: Box<dyn ProtocolController>,
final_state: Arc<RwLock<dyn FinalStateController>>,
config: BootstrapConfig,
keypair: KeyPair,
version: Version,
massa_metrics: MassaMetrics,
) -> Result<BootstrapManager, BootstrapError> {
massa_trace!("bootstrap.lib.start_bootstrap_server", {});
let (update_stopper_tx, update_stopper_rx) = crossbeam::channel::bounded::<()>(1);
let Ok(max_bootstraps) = config.max_simultaneous_bootstraps.try_into() else {
return Err(BootstrapError::GeneralError(
"Fail to convert u32 to usize".to_string(),
));
};
let white_black_list = SharedWhiteBlackList::new(
config.bootstrap_whitelist_path.clone(),
config.bootstrap_blacklist_path.clone(),
)?;
let updater_lists = white_black_list.clone();
let update_handle = thread::Builder::new()
.name("wb_list_updater".to_string())
.spawn(move || {
let res = BootstrapServer::run_updater(
updater_lists,
config.cache_duration.into(),
update_stopper_rx,
);
match res {
Ok(_) => info!("ip white/blacklist updater exited cleanly"),
Err(ref er) => error!("updater exited with error: {}", er),
};
res
})
.expect("in `start_bootstrap_server`, OS failed to spawn list-updater thread");
let w_b_list = white_black_list.clone();
let main_handle = thread::Builder::new()
.name("bs-main-loop".to_string())
.spawn(move || {
BootstrapServer {
consensus_controller,
protocol_controller,
final_state,
ev_poller,
white_black_list: w_b_list,
keypair,
version,
ip_hist_map: HashMap::with_capacity(config.ip_list_max_size),
bootstrap_config: config,
massa_metrics,
}
.event_loop(max_bootstraps)
})
.expect("in `start_bootstrap_server`, OS failed to spawn main-loop thread");
Ok(BootstrapManager::new(
update_handle,
main_handle,
update_stopper_tx,
listener_stopper,
white_black_list,
))
}
struct BootstrapServer<'a> {
consensus_controller: Box<dyn ConsensusController>,
protocol_controller: Box<dyn ProtocolController>,
final_state: Arc<RwLock<dyn FinalStateController>>,
ev_poller: BootstrapTcpListener,
white_black_list: SharedWhiteBlackList<'a>,
keypair: KeyPair,
bootstrap_config: BootstrapConfig,
version: Version,
ip_hist_map: HashMap<IpAddr, Instant>,
massa_metrics: MassaMetrics,
}
impl BootstrapServer<'_> {
fn run_updater(
mut list: SharedWhiteBlackList<'_>,
interval: Duration,
stopper: crossbeam::channel::Receiver<()>,
) -> Result<(), BootstrapError> {
let ticker = tick(interval);
loop {
crossbeam::select! {
recv(stopper) -> res => {
match res {
Ok(()) => return Ok(()),
Err(e) => return Err(BootstrapError::GeneralError(format!("update stopper error : {}", e))),
}
},
recv(ticker) -> _ => {list.update()?;},
}
}
}
fn event_loop(mut self, max_bootstraps: usize) -> Result<(), BootstrapError> {
let bootstrap_sessions_counter: Arc<()> = Arc::new(());
let per_ip_min_interval = self.bootstrap_config.per_ip_min_interval.to_duration();
let limit = self.bootstrap_config.rate_limit;
loop {
let connections = match self.ev_poller.poll() {
Ok(PollEvent::Stop) => return Ok(()),
Ok(PollEvent::NewConnections(connections)) => connections,
Err(e) => {
error!("bootstrap listener error: {}", e);
continue;
}
};
for (dplx, remote_addr) in connections {
let server_binding = BootstrapServerBinder::new(
dplx,
self.keypair.clone(),
(&self.bootstrap_config).into(),
Some(limit),
);
if Arc::strong_count(&bootstrap_sessions_counter) - 1 < max_bootstraps {
let bootstrap_count_token = bootstrap_sessions_counter.clone();
if let Err(error_msg) = self.white_black_list.is_ip_allowed(&remote_addr) {
server_binding.close_and_send_error(
error_msg.to_string(),
remote_addr,
move || {},
);
self.massa_metrics.inc_bootstrap_peers_failed();
continue;
};
massa_trace!("bootstrap.lib.run.select.accept", {
"remote_addr": remote_addr
});
let now = Instant::now();
if self.ip_hist_map.len() > self.bootstrap_config.ip_list_max_size {
self.ip_hist_map
.retain(|_k, v| now.duration_since(*v) <= per_ip_min_interval);
if self.ip_hist_map.len() > self.bootstrap_config.ip_list_max_size {
warn!("high bootstrap load: at least {} different IPs attempted bootstrap in the last {}", self.ip_hist_map.len(),format_duration(self.bootstrap_config.per_ip_min_interval.to_duration()).to_string());
self.ip_hist_map.clear();
}
}
if let Err(msg) = BootstrapServer::greedy_client_check(
&mut self.ip_hist_map,
remote_addr,
now,
per_ip_min_interval,
) {
let msg = format!(
"Your last bootstrap on this server was {} ago and you have to wait {} before retrying.",
format_duration(msg),
format_duration(per_ip_min_interval.saturating_sub(msg))
);
let tracer = move || {
massa_trace!("bootstrap.lib.run.select.accept.refuse_limit", {
"remote_addr": remote_addr
})
};
server_binding.close_and_send_error(msg, remote_addr, tracer);
self.massa_metrics.inc_bootstrap_peers_failed();
continue;
};
massa_trace!("bootstrap.lib.run.select.accept.cache_available", {});
let version = self.version;
let data_execution = self.final_state.clone();
let consensus_command_sender = self.consensus_controller.clone();
let protocol_controller = self.protocol_controller.clone();
let config = self.bootstrap_config.clone();
let massa_metrics = self.massa_metrics.clone();
let _ = thread::Builder::new()
.name(format!("bootstrap thread, peer: {}", remote_addr))
.spawn(move || {
run_bootstrap_session(
server_binding,
bootstrap_count_token,
config,
remote_addr,
data_execution,
version,
consensus_command_sender,
protocol_controller,
massa_metrics,
)
});
massa_trace!("bootstrap.session.started", {
"active_count": Arc::strong_count(&bootstrap_sessions_counter) - 1
});
} else {
server_binding.close_and_send_error(
"Bootstrap failed because the bootstrap server currently has no slots available.".to_string(),
remote_addr,
move || debug!("did not bootstrap {}: no available slots", remote_addr),
);
self.massa_metrics.inc_bootstrap_peers_failed();
}
}
}
}
fn greedy_client_check(
ip_hist_map: &mut HashMap<IpAddr, Instant>,
remote_addr: SocketAddr,
now: Instant,
per_ip_min_interval: Duration,
) -> Result<(), Duration> {
let mut res = Ok(());
ip_hist_map
.entry(remote_addr.ip())
.and_modify(|occ| {
if now.duration_since(*occ) <= per_ip_min_interval {
res = Err(occ.elapsed());
} else {
*occ = now;
}
})
.or_insert(now);
res
}
}
#[allow(clippy::too_many_arguments)]
fn run_bootstrap_session(
mut server: BootstrapServerBinder,
arc_counter: Arc<()>,
config: BootstrapConfig,
remote_addr: SocketAddr,
data_execution: Arc<RwLock<dyn FinalStateController>>,
version: Version,
consensus_command_sender: Box<dyn ConsensusController>,
protocol_controller: Box<dyn ProtocolController>,
massa_metrics: MassaMetrics,
) {
debug!("running bootstrap for peer {}", remote_addr);
let deadline = Instant::now() + config.bootstrap_timeout.to_duration();
let res = manage_bootstrap(
&config,
&mut server,
data_execution,
version,
consensus_command_sender,
protocol_controller,
deadline,
);
massa_trace!("bootstrap.session.finished", {
"sessions_remaining": Arc::strong_count(&arc_counter) - 2
});
drop(arc_counter);
match res {
Err(BootstrapError::TimedOut(_)) => {
debug!("bootstrap timeout for peer {}", remote_addr);
let _ = server.send_error_timeout(format!(
"Bootstrap process timedout ({})",
format_duration(config.bootstrap_timeout.to_duration())
));
massa_metrics.inc_bootstrap_peers_failed();
}
Err(BootstrapError::ReceivedError(error)) => {
debug!(
"bootstrap serving error received from peer {}: {}",
remote_addr, error
);
massa_metrics.inc_bootstrap_peers_failed();
}
Err(err) => {
debug!("bootstrap serving error for peer {}: {}", remote_addr, err);
let _ = server.send_error_timeout(err.to_string());
massa_metrics.inc_bootstrap_peers_failed();
}
Ok(_) => {
info!("bootstrapped peer {}", remote_addr);
massa_metrics.inc_bootstrap_peers_success();
}
}
}
#[allow(clippy::too_many_arguments)]
pub fn stream_bootstrap_information(
server: &mut BootstrapServerBinder,
final_state: Arc<RwLock<dyn FinalStateController>>,
consensus_controller: Box<dyn ConsensusController>,
mut last_slot: Option<Slot>,
mut last_state_step: StreamingStep<Vec<u8>>,
mut last_versioning_step: StreamingStep<Vec<u8>>,
mut last_consensus_step: StreamingStep<PreHashSet<BlockId>>,
mut send_last_start_period: bool,
bs_deadline: &Instant,
write_timeout: Duration,
) -> Result<(), BootstrapError> {
loop {
let current_slot;
let state_part;
let versioning_part;
let last_start_period;
let last_slot_before_downtime;
let slot_too_old = false;
{
let final_state_read = final_state.read();
last_start_period = if send_last_start_period {
Some(final_state_read.get_last_start_period())
} else {
None
};
last_slot_before_downtime = if send_last_start_period {
Some(*final_state_read.get_last_slot_before_downtime())
} else {
None
};
state_part = final_state_read
.get_database()
.read()
.get_batch_to_stream(&last_state_step, last_slot)
.map_err(|e| {
BootstrapError::GeneralError(format!("Error get_batch_to_stream: {}", e))
})?;
let new_state_step = match (&last_state_step, state_part.is_empty()) {
(StreamingStep::Finished(_), _) => StreamingStep::Finished(None),
(StreamingStep::Ongoing(_), true) => StreamingStep::Finished(None),
(StreamingStep::Started, true) => {
warn!("State bootstrap is finished but nothing has been streamed yet");
StreamingStep::Finished(None)
}
(StreamingStep::Ongoing(last_key), false) => {
match state_part.new_elements.last_key_value() {
Some((new_last_key, _)) => StreamingStep::Ongoing(new_last_key.clone()), None => StreamingStep::Ongoing(last_key.clone()), }
}
(StreamingStep::Started, false) => match state_part.new_elements.last_key_value() {
Some((new_last_key, _)) => StreamingStep::Ongoing(new_last_key.clone()), None => {
return Err(BootstrapError::GeneralError(String::from(
"State bootstrap started but we have no new elements to stream",
)));
}
},
};
versioning_part = final_state_read
.get_database()
.read()
.get_versioning_batch_to_stream(&last_versioning_step, last_slot)
.map_err(|e| {
BootstrapError::GeneralError(format!(
"Error get_versioning_batch_to_stream: {}",
e
))
})?;
let new_versioning_step = match (&last_versioning_step, versioning_part.is_empty()) {
(StreamingStep::Finished(_), _) => StreamingStep::Finished(None),
(StreamingStep::Ongoing(_), true) => StreamingStep::Finished(None),
(StreamingStep::Started, true) => {
warn!("Versioning bootstrap is finished but nothing has been streamed yet");
StreamingStep::Finished(None)
}
(StreamingStep::Ongoing(last_key), false) => {
match versioning_part.new_elements.last_key_value() {
Some((new_last_key, _)) => StreamingStep::Ongoing(new_last_key.clone()), None => StreamingStep::Ongoing(last_key.clone()), }
}
(StreamingStep::Started, false) => {
match versioning_part.new_elements.last_key_value() {
Some((new_last_key, _)) => StreamingStep::Ongoing(new_last_key.clone()), None => {
return Err(BootstrapError::GeneralError(String::from(
"Versioning bootstrap started but we have no new elements to stream",
)));
}
}
}
};
let db_slot = final_state_read
.get_database()
.read()
.get_change_id()
.expect(CHANGE_ID_DESER_ERROR);
if let Some(slot) = last_slot {
if slot > db_slot {
return Err(BootstrapError::GeneralError(
"Bootstrap cursor set to future slot".to_string(),
));
}
}
last_state_step = new_state_step;
last_versioning_step = new_versioning_step;
last_slot = Some(db_slot);
current_slot = db_slot;
send_last_start_period = false;
}
if slot_too_old {
return server.send_msg(write_timeout, BootstrapServerMessage::SlotTooOld);
}
let final_state_global_step =
if last_state_step.finished() && last_versioning_step.finished() {
StreamingStep::Finished(Some(current_slot))
} else {
StreamingStep::Ongoing(current_slot)
};
let mut consensus_part = BootstrapableGraph {
final_blocks: Default::default(),
};
let mut consensus_outdated_ids: PreHashSet<BlockId> = PreHashSet::default();
if final_state_global_step.finished() {
let (part, outdated_ids, new_consensus_step) = consensus_controller
.get_bootstrap_part(last_consensus_step, final_state_global_step)?;
consensus_part = part;
consensus_outdated_ids = outdated_ids;
last_consensus_step = new_consensus_step;
}
debug!(
"Final state bootstrap cursor: {:?}",
final_state_global_step
);
debug!(
"Consensus blocks bootstrap cursor: {:?}",
last_consensus_step
);
if let StreamingStep::Ongoing(ids) = &last_consensus_step {
debug!("Consensus bootstrap cursor length: {}", ids.len());
}
if final_state_global_step.finished() && last_consensus_step.finished() {
server.send_msg(write_timeout, BootstrapServerMessage::BootstrapFinished)?;
break;
}
let Some(write_timeout) = step_timeout_duration(bs_deadline, &write_timeout) else {
return Err(BootstrapError::Interrupted(
"insufficient time left to provide next bootstrap part".to_string(),
));
};
server.send_msg(
write_timeout,
BootstrapServerMessage::BootstrapPart {
slot: current_slot,
state_part,
versioning_part,
consensus_part,
consensus_outdated_ids,
last_start_period,
last_slot_before_downtime,
},
)?;
}
Ok(())
}
fn step_timeout_duration(bs_deadline: &Instant, step_timeout: &Duration) -> Option<Duration> {
let now = Instant::now();
if now >= *bs_deadline {
return None;
}
let remaining = *bs_deadline - now;
Some(std::cmp::min(remaining, *step_timeout))
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn manage_bootstrap(
bootstrap_config: &BootstrapConfig,
server: &mut BootstrapServerBinder,
final_state: Arc<RwLock<dyn FinalStateController>>,
version: Version,
consensus_controller: Box<dyn ConsensusController>,
protocol_controller: Box<dyn ProtocolController>,
deadline: Instant,
) -> Result<(), BootstrapError> {
massa_trace!("bootstrap.lib.manage_bootstrap", {});
let read_error_timeout: Duration = bootstrap_config.read_error_timeout.into();
let Some(hs_timeout) =
step_timeout_duration(&deadline, &bootstrap_config.read_timeout.to_duration())
else {
return Err(BootstrapError::Interrupted(
"insufficient time left to begin handshake".to_string(),
));
};
server.handshake_timeout(version, Some(hs_timeout))?;
if Instant::now() + read_error_timeout >= deadline {
return Err(BootstrapError::Interrupted(
"insufficient time to check for error from client".to_string(),
));
};
match server.next_timeout(Some(read_error_timeout)) {
Err(BootstrapError::TimedOut(_)) => {}
Err(e) => return Err(e),
Ok(BootstrapClientMessage::BootstrapError { error }) => {
return Err(BootstrapError::GeneralError(error));
}
Ok(msg) => return Err(BootstrapError::UnexpectedClientMessage(Box::new(msg))),
};
let send_time_timeout =
step_timeout_duration(&deadline, &bootstrap_config.write_timeout.to_duration());
let Some(next_step_timeout) = send_time_timeout else {
return Err(BootstrapError::Interrupted(
"insufficient time left to send server time".to_string(),
));
};
server.send_msg(
next_step_timeout,
BootstrapServerMessage::BootstrapTime {
server_time: MassaTime::now(),
version,
},
)?;
loop {
let Some(read_timeout) =
step_timeout_duration(&deadline, &bootstrap_config.read_timeout.to_duration())
else {
return Err(BootstrapError::Interrupted(
"insufficient time left to process next message".to_string(),
));
};
match server.next_timeout(Some(read_timeout)) {
Err(BootstrapError::TimedOut(_)) => break Ok(()),
Err(e) => break Err(e),
Ok(msg) => match msg {
BootstrapClientMessage::AskBootstrapPeers => {
let Some(write_timeout) = step_timeout_duration(
&deadline,
&bootstrap_config.write_timeout.to_duration(),
) else {
return Err(BootstrapError::Interrupted(
"insufficient time left to respond te request for peers".to_string(),
));
};
server.send_msg(
write_timeout,
BootstrapServerMessage::BootstrapPeers {
peers: protocol_controller.get_bootstrap_peers()?,
},
)?;
}
BootstrapClientMessage::AskBootstrapPart {
last_slot,
last_state_step,
last_versioning_step,
last_consensus_step,
send_last_start_period,
} => {
stream_bootstrap_information(
server,
final_state.clone(),
consensus_controller.clone(),
last_slot,
last_state_step,
last_versioning_step,
last_consensus_step,
send_last_start_period,
&deadline,
bootstrap_config.write_timeout.to_duration(),
)?;
}
BootstrapClientMessage::BootstrapSuccess => break Ok(()),
BootstrapClientMessage::BootstrapError { error } => {
break Err(BootstrapError::ReceivedError(error));
}
},
};
}
}