use massa_models::{
block_id::BlockId, denunciation::Denunciation, denunciation::DenunciationPrecursor,
endorsement::EndorsementId, operation::OperationId, slot::Slot,
};
use massa_pool_exports::{PoolConfig, PoolController, PoolManager};
use massa_storage::Storage;
use parking_lot::RwLock;
use std::sync::mpsc::TrySendError;
use std::sync::{mpsc::SyncSender, Arc};
use tracing::{info, warn};
use crate::{
denunciation_pool::DenunciationPool, endorsement_pool::EndorsementPool,
operation_pool::OperationPool,
};
#[allow(clippy::large_enum_variant)]
pub enum Command {
AddItems(Storage),
AddDenunciationPrecursor(DenunciationPrecursor),
NotifyFinalCsPeriods(Vec<u64>),
Stop,
}
#[derive(Clone)]
pub struct PoolControllerImpl {
pub(crate) _config: PoolConfig,
pub(crate) operation_pool: Arc<RwLock<OperationPool>>,
pub(crate) endorsement_pool: Arc<RwLock<EndorsementPool>>,
pub(crate) denunciation_pool: Arc<RwLock<DenunciationPool>>,
pub(crate) operations_input_sender: SyncSender<Command>,
pub(crate) endorsements_input_sender: SyncSender<Command>,
pub(crate) denunciations_input_sender: SyncSender<Command>,
pub last_cs_final_periods: Vec<u64>,
}
impl PoolController for PoolControllerImpl {
fn add_operations(&mut self, ops: Storage) {
match self
.operations_input_sender
.try_send(Command::AddItems(ops))
{
Err(TrySendError::Disconnected(_)) => {
warn!("Could not add operations to pool: worker is unreachable.");
}
Err(TrySendError::Full(_)) => {
warn!("Could not add operations to pool: worker channel is full.");
}
Ok(_) => {}
}
}
fn add_endorsements(&mut self, endorsements: Storage) {
match self
.denunciations_input_sender
.try_send(Command::AddItems(endorsements.clone()))
{
Err(TrySendError::Disconnected(_)) => {
warn!("Could not add endorsements to pool: worker is unreachable.");
}
Err(TrySendError::Full(_)) => {
warn!("Could not add endorsements to pool: worker channel is full.");
}
Ok(_) => {}
}
match self
.endorsements_input_sender
.try_send(Command::AddItems(endorsements))
{
Err(TrySendError::Disconnected(_)) => {
warn!("Could not add endorsements to pool: worker is unreachable.");
}
Err(TrySendError::Full(_)) => {
warn!("Could not add endorsements to pool: worker channel is full.");
}
Ok(_) => {}
}
}
fn add_denunciation_precursor(&self, denunciation_precursor: DenunciationPrecursor) {
match self
.denunciations_input_sender
.try_send(Command::AddDenunciationPrecursor(denunciation_precursor))
{
Err(TrySendError::Disconnected(_)) => {
warn!("Could not add denunciation precursor to pool: worker is unreachable.");
}
Err(TrySendError::Full(_)) => {
warn!("Could not add denunciation precursor to pool: worker channel is full.");
}
Ok(_) => {}
}
}
fn notify_final_cs_periods(&mut self, final_cs_periods: &[u64]) {
self.last_cs_final_periods = final_cs_periods.to_vec();
match self
.operations_input_sender
.try_send(Command::NotifyFinalCsPeriods(final_cs_periods.to_vec()))
{
Err(TrySendError::Disconnected(_)) => {
warn!("Could not notify operation pool of new final slots: worker is unreachable.");
}
Err(TrySendError::Full(_)) => {
warn!(
"Could not notify operation pool of new final slots: worker channel is full."
);
}
Ok(_) => {}
}
match self
.endorsements_input_sender
.try_send(Command::NotifyFinalCsPeriods(final_cs_periods.to_vec()))
{
Err(TrySendError::Disconnected(_)) => {
warn!(
"Could not notify endorsement pool of new final slots: worker is unreachable."
);
}
Err(TrySendError::Full(_)) => {
warn!(
"Could not notify endorsement pool of new final slots: worker channel is full."
);
}
Ok(_) => {}
}
match self
.denunciations_input_sender
.try_send(Command::NotifyFinalCsPeriods(final_cs_periods.to_vec()))
{
Err(TrySendError::Disconnected(_)) => {
warn!(
"Could not notify endorsement pool of new final slots: worker is unreachable."
);
}
Err(TrySendError::Full(_)) => {
warn!(
"Could not notify endorsement pool of new final slots: worker channel is full."
);
}
Ok(_) => {}
}
}
fn get_block_operations(&self, slot: &Slot) -> (Vec<OperationId>, Storage) {
self.operation_pool.read().get_block_operations(slot)
}
fn get_block_endorsements(
&self,
target_block: &BlockId,
target_slot: &Slot,
) -> (Vec<Option<EndorsementId>>, Storage) {
self.endorsement_pool
.read()
.get_block_endorsements(target_slot, target_block)
}
fn get_block_denunciations(&self, target_slot: &Slot) -> Vec<Denunciation> {
self.denunciation_pool
.read()
.get_block_denunciations(target_slot)
}
fn get_endorsement_count(&self) -> usize {
self.endorsement_pool.read().len()
}
fn get_operation_count(&self) -> usize {
self.operation_pool.read().len()
}
fn contains_endorsements(&self, endorsements: &[EndorsementId]) -> Vec<bool> {
let lck = self.endorsement_pool.read();
endorsements.iter().map(|id| lck.contains(id)).collect()
}
fn contains_operations(&self, operations: &[OperationId]) -> Vec<bool> {
let lck = self.operation_pool.read();
operations.iter().map(|id| lck.contains(id)).collect()
}
fn get_denunciation_count(&self) -> usize {
self.denunciation_pool.read().len()
}
fn clone_box(&self) -> Box<dyn PoolController> {
Box::new(self.clone())
}
fn get_final_cs_periods(&self) -> Vec<u64> {
self.last_cs_final_periods.clone()
}
}
pub struct PoolManagerImpl {
pub(crate) operations_thread_handle: Option<std::thread::JoinHandle<()>>,
pub(crate) endorsements_thread_handle: Option<std::thread::JoinHandle<()>>,
pub(crate) denunciations_thread_handle: Option<std::thread::JoinHandle<()>>,
pub(crate) operations_input_sender: SyncSender<Command>,
pub(crate) endorsements_input_sender: SyncSender<Command>,
pub(crate) denunciations_input_sender: SyncSender<Command>,
}
impl PoolManager for PoolManagerImpl {
fn stop(&mut self) {
info!("stopping pool workers...");
let _ = self.operations_input_sender.send(Command::Stop);
let _ = self.endorsements_input_sender.send(Command::Stop);
let _ = self.denunciations_input_sender.send(Command::Stop);
if let Some(join_handle) = self.operations_thread_handle.take() {
join_handle
.join()
.expect("operations pool thread panicked on try to join");
}
if let Some(join_handle) = self.endorsements_thread_handle.take() {
join_handle
.join()
.expect("endorsements pool thread panicked on try to join");
}
if let Some(join_handle) = self.denunciations_thread_handle.take() {
join_handle
.join()
.expect("denunciations pool thread panicked on try to join");
}
info!("pool workers stopped");
}
}