use massa_models::{
block_id::BlockId, denunciation::Denunciation, denunciation::DenunciationPrecursor,
endorsement::EndorsementId, operation::OperationId, slot::Slot,
};
use massa_pool_exports::{PoolConfig, PoolController, PoolError, PoolManager};
use massa_storage::Storage;
use massa_time::MassaTime;
use parking_lot::RwLock;
use std::sync::mpsc::TrySendError;
use std::sync::{mpsc::SyncSender, Arc};
use tracing::{info, warn};
use crate::{
denunciation_pool::DenunciationPool, endorsement_pool::EndorsementPool,
operation_pool::OperationPool,
};
#[allow(clippy::large_enum_variant)]
pub enum Command {
AddItems(Storage),
AddDenunciationPrecursor(DenunciationPrecursor),
NotifyFinalCsPeriods(Vec<u64>),
Stop,
}
#[derive(Clone)]
pub struct PoolControllerImpl {
pub(crate) _config: PoolConfig,
pub(crate) operation_pool: Arc<RwLock<OperationPool>>,
pub(crate) endorsement_pool: Arc<RwLock<EndorsementPool>>,
pub(crate) denunciation_pool: Arc<RwLock<DenunciationPool>>,
pub(crate) operations_input_sender: SyncSender<Command>,
pub(crate) endorsements_input_sender: SyncSender<Command>,
pub(crate) denunciations_input_sender: SyncSender<Command>,
pub last_cs_final_periods: Vec<u64>,
}
impl PoolController for PoolControllerImpl {
fn add_operations(&mut self, ops: Storage) {
match self
.operations_input_sender
.try_send(Command::AddItems(ops))
{
Err(TrySendError::Disconnected(_)) => {
warn!("Could not add operations to pool: worker is unreachable.");
}
Err(TrySendError::Full(_)) => {
warn!("Could not add operations to pool: worker channel is full.");
}
Ok(_) => {}
}
}
fn add_endorsements(&mut self, endorsements: Storage) {
match self
.denunciations_input_sender
.try_send(Command::AddItems(endorsements.clone()))
{
Err(TrySendError::Disconnected(_)) => {
warn!("Could not add endorsements to pool: worker is unreachable.");
}
Err(TrySendError::Full(_)) => {
warn!("Could not add endorsements to pool: worker channel is full.");
}
Ok(_) => {}
}
match self
.endorsements_input_sender
.try_send(Command::AddItems(endorsements))
{
Err(TrySendError::Disconnected(_)) => {
warn!("Could not add endorsements to pool: worker is unreachable.");
}
Err(TrySendError::Full(_)) => {
warn!("Could not add endorsements to pool: worker channel is full.");
}
Ok(_) => {}
}
}
fn add_denunciation_precursor(&self, denunciation_precursor: DenunciationPrecursor) {
match self
.denunciations_input_sender
.try_send(Command::AddDenunciationPrecursor(denunciation_precursor))
{
Err(TrySendError::Disconnected(_)) => {
warn!("Could not add denunciation precursor to pool: worker is unreachable.");
}
Err(TrySendError::Full(_)) => {
warn!("Could not add denunciation precursor to pool: worker channel is full.");
}
Ok(_) => {}
}
}
fn notify_final_cs_periods(&mut self, final_cs_periods: &[u64]) {
self.last_cs_final_periods = final_cs_periods.to_vec();
match self
.operations_input_sender
.try_send(Command::NotifyFinalCsPeriods(final_cs_periods.to_vec()))
{
Err(TrySendError::Disconnected(_)) => {
warn!("Could not notify operation pool of new final slots: worker is unreachable.");
}
Err(TrySendError::Full(_)) => {
warn!(
"Could not notify operation pool of new final slots: worker channel is full."
);
}
Ok(_) => {}
}
match self
.endorsements_input_sender
.try_send(Command::NotifyFinalCsPeriods(final_cs_periods.to_vec()))
{
Err(TrySendError::Disconnected(_)) => {
warn!(
"Could not notify endorsement pool of new final slots: worker is unreachable."
);
}
Err(TrySendError::Full(_)) => {
warn!(
"Could not notify endorsement pool of new final slots: worker channel is full."
);
}
Ok(_) => {}
}
match self
.denunciations_input_sender
.try_send(Command::NotifyFinalCsPeriods(final_cs_periods.to_vec()))
{
Err(TrySendError::Disconnected(_)) => {
warn!(
"Could not notify endorsement pool of new final slots: worker is unreachable."
);
}
Err(TrySendError::Full(_)) => {
warn!(
"Could not notify endorsement pool of new final slots: worker channel is full."
);
}
Ok(_) => {}
}
}
fn get_block_operations(
&self,
slot: &Slot,
timeout: Option<MassaTime>,
) -> Result<(Vec<OperationId>, Storage), PoolError> {
let guard = if let Some(timeout) = timeout {
self.operation_pool
.try_read_for(timeout.to_duration())
.ok_or(PoolError::LockTimeout)?
} else {
self.operation_pool.read()
};
Ok(guard.get_block_operations(slot))
}
fn get_block_endorsements(
&self,
target_block: &BlockId,
target_slot: &Slot,
timeout: Option<MassaTime>,
) -> Result<(Vec<Option<EndorsementId>>, Storage), PoolError> {
let guard = if let Some(timeout) = timeout {
self.endorsement_pool
.try_read_for(timeout.to_duration())
.ok_or(PoolError::LockTimeout)?
} else {
self.endorsement_pool.read()
};
Ok(guard.get_block_endorsements(target_slot, target_block))
}
fn get_block_denunciations(
&self,
target_slot: &Slot,
timeout: Option<MassaTime>,
) -> Result<Vec<Denunciation>, PoolError> {
let guard = if let Some(timeout) = timeout {
self.denunciation_pool
.try_read_for(timeout.to_duration())
.ok_or(PoolError::LockTimeout)?
} else {
self.denunciation_pool.read()
};
Ok(guard.get_block_denunciations(target_slot))
}
fn get_endorsement_count(&self, timeout: Option<MassaTime>) -> Result<usize, PoolError> {
let guard = if let Some(timeout) = timeout {
self.endorsement_pool
.try_read_for(timeout.to_duration())
.ok_or(PoolError::LockTimeout)?
} else {
self.endorsement_pool.read()
};
Ok(guard.len())
}
fn get_operation_count(&self, timeout: Option<MassaTime>) -> Result<usize, PoolError> {
let guard = if let Some(timeout) = timeout {
self.operation_pool
.try_read_for(timeout.to_duration())
.ok_or(PoolError::LockTimeout)?
} else {
self.operation_pool.read()
};
Ok(guard.len())
}
fn contains_endorsements(
&self,
endorsements: &[EndorsementId],
timeout: Option<MassaTime>,
) -> Result<Vec<bool>, PoolError> {
let guard = if let Some(timeout) = timeout {
self.endorsement_pool
.try_read_for(timeout.to_duration())
.ok_or(PoolError::LockTimeout)?
} else {
self.endorsement_pool.read()
};
Ok(endorsements.iter().map(|id| guard.contains(id)).collect())
}
fn contains_operations(
&self,
operations: &[OperationId],
timeout: Option<MassaTime>,
) -> Result<Vec<bool>, PoolError> {
let guard = if let Some(timeout) = timeout {
self.operation_pool
.try_read_for(timeout.to_duration())
.ok_or(PoolError::LockTimeout)?
} else {
self.operation_pool.read()
};
Ok(operations.iter().map(|id| guard.contains(id)).collect())
}
fn get_denunciation_count(&self, timeout: Option<MassaTime>) -> Result<usize, PoolError> {
let guard = if let Some(timeout) = timeout {
self.denunciation_pool
.try_read_for(timeout.to_duration())
.ok_or(PoolError::LockTimeout)?
} else {
self.denunciation_pool.read()
};
Ok(guard.len())
}
fn clone_box(&self) -> Box<dyn PoolController> {
Box::new(self.clone())
}
fn get_final_cs_periods(&self) -> Vec<u64> {
self.last_cs_final_periods.clone()
}
}
pub struct PoolManagerImpl {
pub(crate) operations_thread_handle: Option<std::thread::JoinHandle<()>>,
pub(crate) endorsements_thread_handle: Option<std::thread::JoinHandle<()>>,
pub(crate) denunciations_thread_handle: Option<std::thread::JoinHandle<()>>,
pub(crate) operations_input_sender: SyncSender<Command>,
pub(crate) endorsements_input_sender: SyncSender<Command>,
pub(crate) denunciations_input_sender: SyncSender<Command>,
}
impl PoolManager for PoolManagerImpl {
fn stop(&mut self) {
info!("stopping pool workers...");
let _ = self.operations_input_sender.send(Command::Stop);
let _ = self.endorsements_input_sender.send(Command::Stop);
let _ = self.denunciations_input_sender.send(Command::Stop);
if let Some(join_handle) = self.operations_thread_handle.take() {
join_handle
.join()
.expect("operations pool thread panicked on try to join");
}
if let Some(join_handle) = self.endorsements_thread_handle.take() {
join_handle
.join()
.expect("endorsements pool thread panicked on try to join");
}
if let Some(join_handle) = self.denunciations_thread_handle.take() {
join_handle
.join()
.expect("denunciations pool thread panicked on try to join");
}
info!("pool workers stopped");
}
}