1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
//! Copyright (c) 2022 MASSA LABS <info@massa.net>

use massa_models::{
    block_id::BlockId,
    endorsement::EndorsementId,
    prehash::{CapacityAllocator, PreHashSet},
    slot::Slot,
};
use massa_pool_exports::{PoolChannels, PoolConfig};
use massa_storage::Storage;
use massa_wallet::Wallet;
use parking_lot::RwLock;
use std::{
    collections::{hash_map::Entry, BTreeMap, HashMap},
    sync::Arc,
};
use tracing::{trace, warn};

pub struct EndorsementPool {
    /// configuration
    config: PoolConfig,

    /// endorsements indexed by slot, index and block ID
    endorsements_indexed: HashMap<(Slot, u32, BlockId), EndorsementId>,

    /// endorsements sorted by increasing inclusion slot for pruning
    /// indexed by thread, then `BTreeMap<(inclusion_slot, index, target_block), endorsement_id>`
    endorsements_sorted: Vec<BTreeMap<(Slot, u32, BlockId), EndorsementId>>,

    /// storage
    storage: Storage,

    /// last consensus final periods, per thread
    last_cs_final_periods: Vec<u64>,

    /// channels used by the pool worker
    channels: PoolChannels,

    /// staking wallet, to know which addresses we are using to stake
    wallet: Arc<RwLock<Wallet>>,
}

impl EndorsementPool {
    pub fn init(
        config: PoolConfig,
        storage: &Storage,
        channels: PoolChannels,
        wallet: Arc<RwLock<Wallet>>,
    ) -> Self {
        EndorsementPool {
            last_cs_final_periods: vec![0u64; config.thread_count as usize],
            endorsements_indexed: Default::default(),
            endorsements_sorted: vec![Default::default(); config.thread_count as usize],
            config,
            storage: storage.clone_without_refs(),
            channels,
            wallet,
        }
    }

    /// Get the number of stored elements
    pub fn len(&self) -> usize {
        self.storage.get_endorsement_refs().len()
    }

    /// Checks whether an element is stored in the pool.
    pub fn contains(&self, id: &EndorsementId) -> bool {
        self.storage.get_endorsement_refs().contains(id)
    }

    /// notify of new final CS periods
    pub(crate) fn notify_final_cs_periods(&mut self, final_cs_periods: &[u64]) {
        // update internal final CS period counter
        self.last_cs_final_periods = final_cs_periods.to_vec();

        // remove all endorsements whose periods <= last_cs_final_periods[endorsement.thread]
        let mut removed: PreHashSet<EndorsementId> = Default::default();
        for thread in 0..self.config.thread_count {
            while let Some((&(inclusion_slot, index, block_id), &endo_id)) =
                self.endorsements_sorted[thread as usize].first_key_value()
            {
                if inclusion_slot.period <= self.last_cs_final_periods[thread as usize] {
                    self.endorsements_sorted[thread as usize].pop_first();
                    self.endorsements_indexed
                        .remove(&(inclusion_slot, index, block_id))
                        .expect("endorsement should be in endorsements_indexed at this point");
                    removed.insert(endo_id);
                } else {
                    break;
                }
            }
        }
        self.storage.drop_endorsement_refs(&removed);
    }

    /// Add a list of endorsements to the pool
    pub(crate) fn add_endorsements(&mut self, mut endorsement_storage: Storage) {
        let items = endorsement_storage
            .get_endorsement_refs()
            .iter()
            .copied()
            .collect::<Vec<_>>();

        let mut added = PreHashSet::with_capacity(items.len());
        let mut removed = PreHashSet::with_capacity(items.len());

        // add items to pool
        {
            let endo_store = endorsement_storage.read_endorsements();
            for endo_id in items {
                let endo = endo_store
                    .get(&endo_id)
                    .expect("attempting to add endorsement to pool, but it is absent from storage");

                // check endorsement expiry
                if endo.content.slot.period
                    <= self.last_cs_final_periods[endo.content.slot.thread as usize]
                {
                    continue;
                }

                // check PoS draw
                let pos_draws = match self.channels.selector.get_selection(endo.content.slot) {
                    Ok(draw) => draw,
                    Err(err) => {
                        warn!(
                            "error, failed to get PoS draw for endorsement with id {} at slot {}: {}",
                            endo.id.clone(), endo.content.slot, err
                        );
                        continue;
                    }
                };
                if !pos_draws
                    .endorsements
                    .get(endo.content.index as usize)
                    .map_or(false, |a| a == &endo.content_creator_address)
                {
                    warn!(
                        "error, endorsement with id {} at slot {} is not selected for PoS draw",
                        endo.id.clone(),
                        endo.content.slot
                    );
                    continue;
                }

                // Broadcast endorsement to active channel subscribers.
                if self.config.broadcast_enabled {
                    if let Err(err) = self
                        .channels
                        .broadcasts
                        .endorsement_sender
                        .send(endo.clone())
                    {
                        trace!(
                            "error, failed to broadcast endorsement {}: {}",
                            endo.id.clone(),
                            err
                        );
                    }
                }

                // Only keep endorsements that one of our addresses can include
                if !self.wallet.read().keys.contains_key(&pos_draws.producer) {
                    continue;
                }

                // insert
                let key = (
                    endo.content.slot,
                    endo.content.index,
                    endo.content.endorsed_block,
                );
                // note that we don't want equivalent endorsements (slot, index, block etc...) to overwrite each other
                if let Entry::Vacant(e) = self.endorsements_indexed.entry(key) {
                    e.insert(endo.id);
                    if self.endorsements_sorted[endo.content.slot.thread as usize]
                        .insert(key, endo.id)
                        .is_some()
                    {
                        panic!("endorsement is expected to be absent from endorsements_sorted at this point");
                    }
                    added.insert(endo.id);
                }
            }
        }

        // prune excess endorsements
        for thread in 0..self.config.thread_count {
            while self.endorsements_sorted[thread as usize].len()
                > self.config.max_endorsements_pool_size_per_thread
            {
                // won't panic because len was checked above
                let (_key, endo_id) = self.endorsements_sorted[thread as usize]
                    .pop_last()
                    .unwrap();
                if !added.remove(&endo_id) {
                    removed.insert(endo_id);
                }
            }
        }

        // take ownership on added endorsements
        self.storage.extend(endorsement_storage.split_off(
            &Default::default(),
            &Default::default(),
            &added,
        ));

        // drop removed endorsements from storage
        self.storage.drop_endorsement_refs(&removed);
    }

    /// get endorsements for block creation
    pub fn get_block_endorsements(
        &self,
        slot: &Slot, // slot of the block that will contain the endorsement
        target_block: &BlockId,
    ) -> (Vec<Option<EndorsementId>>, Storage) {
        // init list of selected endorsement IDs
        let mut endo_ids = Vec::with_capacity(self.config.max_block_endorsement_count as usize);

        // gather endorsements
        for index in 0..self.config.max_block_endorsement_count {
            endo_ids.push(
                self.endorsements_indexed
                    .get(&(*slot, index, *target_block))
                    .copied(),
            );
        }

        // setup endorsement storage
        let mut endo_storage = self.storage.clone_without_refs();
        let claim_endos: PreHashSet<EndorsementId> =
            endo_ids.iter().filter_map(|&opt| opt).collect();
        let claimed_endos = endo_storage.claim_endorsement_refs(&claim_endos);
        if claimed_endos.len() != claim_endos.len() {
            panic!("could not claim all endorsements from storage");
        }

        (endo_ids, endo_storage)
    }
}