use massa_models::{
block_id::BlockId,
endorsement::EndorsementId,
prehash::{CapacityAllocator, PreHashSet},
slot::Slot,
};
use massa_pool_exports::{PoolChannels, PoolConfig};
use massa_storage::Storage;
use massa_wallet::Wallet;
use parking_lot::RwLock;
use std::{
collections::{hash_map::Entry, BTreeMap, HashMap},
sync::Arc,
};
use tracing::{trace, warn};
pub struct EndorsementPool {
config: PoolConfig,
endorsements_indexed: HashMap<(Slot, u32, BlockId), EndorsementId>,
endorsements_sorted: Vec<BTreeMap<(Slot, u32, BlockId), EndorsementId>>,
storage: Storage,
last_cs_final_periods: Vec<u64>,
channels: PoolChannels,
wallet: Arc<RwLock<Wallet>>,
}
impl EndorsementPool {
pub fn init(
config: PoolConfig,
storage: &Storage,
channels: PoolChannels,
wallet: Arc<RwLock<Wallet>>,
) -> Self {
EndorsementPool {
last_cs_final_periods: vec![0u64; config.thread_count as usize],
endorsements_indexed: Default::default(),
endorsements_sorted: vec![Default::default(); config.thread_count as usize],
config,
storage: storage.clone_without_refs(),
channels,
wallet,
}
}
pub fn len(&self) -> usize {
self.storage.get_endorsement_refs().len()
}
pub fn contains(&self, id: &EndorsementId) -> bool {
self.storage.get_endorsement_refs().contains(id)
}
pub(crate) fn notify_final_cs_periods(&mut self, final_cs_periods: &[u64]) {
self.last_cs_final_periods = final_cs_periods.to_vec();
let mut removed: PreHashSet<EndorsementId> = Default::default();
for thread in 0..self.config.thread_count {
while let Some((&(inclusion_slot, index, block_id), &endo_id)) =
self.endorsements_sorted[thread as usize].first_key_value()
{
if inclusion_slot.period <= self.last_cs_final_periods[thread as usize] {
self.endorsements_sorted[thread as usize].pop_first();
self.endorsements_indexed
.remove(&(inclusion_slot, index, block_id))
.expect("endorsement should be in endorsements_indexed at this point");
removed.insert(endo_id);
} else {
break;
}
}
}
self.storage.drop_endorsement_refs(&removed);
}
pub(crate) fn add_endorsements(&mut self, mut endorsement_storage: Storage) {
let items = endorsement_storage
.get_endorsement_refs()
.iter()
.copied()
.collect::<Vec<_>>();
let mut added = PreHashSet::with_capacity(items.len());
let mut removed = PreHashSet::with_capacity(items.len());
{
let endo_store = endorsement_storage.read_endorsements();
for endo_id in items {
let endo = endo_store
.get(&endo_id)
.expect("attempting to add endorsement to pool, but it is absent from storage");
if endo.content.slot.period
<= self.last_cs_final_periods[endo.content.slot.thread as usize]
{
continue;
}
let pos_draws = match self.channels.selector.get_selection(endo.content.slot) {
Ok(draw) => draw,
Err(err) => {
warn!(
"error, failed to get PoS draw for endorsement with id {} at slot {}: {}",
endo.id.clone(), endo.content.slot, err
);
continue;
}
};
if !pos_draws
.endorsements
.get(endo.content.index as usize)
.map_or(false, |a| a == &endo.content_creator_address)
{
warn!(
"error, endorsement with id {} at slot {} is not selected for PoS draw",
endo.id.clone(),
endo.content.slot
);
continue;
}
if self.config.broadcast_enabled {
if let Err(err) = self
.channels
.broadcasts
.endorsement_sender
.send(endo.clone())
{
trace!(
"error, failed to broadcast endorsement {}: {}",
endo.id.clone(),
err
);
}
}
if !self.wallet.read().keys.contains_key(&pos_draws.producer) {
continue;
}
let key = (
endo.content.slot,
endo.content.index,
endo.content.endorsed_block,
);
if let Entry::Vacant(e) = self.endorsements_indexed.entry(key) {
e.insert(endo.id);
if self.endorsements_sorted[endo.content.slot.thread as usize]
.insert(key, endo.id)
.is_some()
{
panic!("endorsement is expected to be absent from endorsements_sorted at this point");
}
added.insert(endo.id);
}
}
}
for thread in 0..self.config.thread_count {
while self.endorsements_sorted[thread as usize].len()
> self.config.max_endorsements_pool_size_per_thread
{
let (_key, endo_id) = self.endorsements_sorted[thread as usize]
.pop_last()
.unwrap();
if !added.remove(&endo_id) {
removed.insert(endo_id);
}
}
}
self.storage.extend(endorsement_storage.split_off(
&Default::default(),
&Default::default(),
&added,
));
self.storage.drop_endorsement_refs(&removed);
}
pub fn get_block_endorsements(
&self,
slot: &Slot, target_block: &BlockId,
) -> (Vec<Option<EndorsementId>>, Storage) {
let mut endo_ids = Vec::with_capacity(self.config.max_block_endorsement_count as usize);
for index in 0..self.config.max_block_endorsement_count {
endo_ids.push(
self.endorsements_indexed
.get(&(*slot, index, *target_block))
.copied(),
);
}
let mut endo_storage = self.storage.clone_without_refs();
let claim_endos: PreHashSet<EndorsementId> =
endo_ids.iter().filter_map(|&opt| opt).collect();
let claimed_endos = endo_storage.claim_endorsement_refs(&claim_endos);
if claimed_endos.len() != claim_endos.len() {
panic!("could not claim all endorsements from storage");
}
(endo_ids, endo_storage)
}
}