1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
use core::panic;

use massa_consensus_exports::{
    block_status::{BlockStatus, DiscardReason, HeaderOrBlock},
    error::ConsensusError,
};
use massa_logging::massa_trace;
use massa_models::{
    active_block::ActiveBlock,
    block_id::BlockId,
    prehash::{PreHashMap, PreHashSet},
    slot::Slot,
};
use tracing::debug;

use super::ConsensusState;

impl ConsensusState {
    /// prune active blocks and return final blocks, return discarded final blocks
    fn prune_active(&mut self) -> Result<PreHashMap<BlockId, ActiveBlock>, ConsensusError> {
        // list required active blocks
        let mut retain_active: PreHashSet<BlockId> = self.list_required_active_blocks(None)?;

        // retain extra history according to the config
        // this is useful to avoid desync on temporary connection loss
        for a_block in self.blocks_state.active_blocks().clone().iter() {
            if let Some(BlockStatus::Active {
                a_block: active_block,
                storage_or_block,
            }) = self.blocks_state.get_mut(a_block)
            {
                let (_b_id, latest_final_period) =
                    self.latest_final_blocks_periods[active_block.slot.thread as usize];

                if active_block.slot.period
                    >= latest_final_period
                        .saturating_sub(self.config.force_keep_final_periods_without_ops)
                {
                    retain_active.insert(*a_block);
                    if active_block.slot.period
                        < latest_final_period.saturating_sub(self.config.force_keep_final_periods)
                        && !self.active_index_without_ops.contains(a_block)
                    {
                        storage_or_block.strip_to_block(a_block);
                        self.active_index_without_ops.insert(*a_block);
                        // reset the list of descendants
                        active_block.descendants = Default::default();
                    }
                } else {
                    self.active_index_without_ops.remove(a_block);
                }
            }
        }

        // remove unused final active blocks
        let mut discarded_finals: PreHashMap<BlockId, ActiveBlock> = PreHashMap::default();
        let to_remove: Vec<BlockId> = self
            .blocks_state
            .active_blocks()
            .difference(&retain_active)
            .copied()
            .collect();
        for discard_active_h in to_remove {
            let sequence_number = self.blocks_state.sequence_counter();
            self.blocks_state.transition_map(&discard_active_h, |block_status, block_statuses| {
                if let Some(
                    BlockStatus::Active {
                        a_block: discarded_active,
                        ..
                    }
                ) = block_status {
                    // remove from parent's children
                    for (parent_h, _parent_period) in discarded_active.parents.iter() {
                        if let Some(BlockStatus::Active {
                            a_block: parent_active_block,
                            ..
                        }) = block_statuses.get_mut(parent_h)
                        {
                            parent_active_block.children[discarded_active.slot.thread as usize]
                                .remove(&discard_active_h);
                        }
                    }

                    massa_trace!("consensus.block_graph.prune_active", {"hash": discard_active_h, "reason": DiscardReason::Final});
                    let block_slot = discarded_active.slot;
                    let block_creator = discarded_active.creator_address;
                    let block_parents = discarded_active.parents.iter().map(|(p, _)| *p).collect();
                    discarded_finals.insert(discard_active_h, *discarded_active);

                    // mark as final
                    Some(BlockStatus::Discarded {
                        slot: block_slot,
                        creator: block_creator,
                        parents: block_parents,
                        reason: DiscardReason::Final,
                        sequence_number,
                    })
                } else {
                    panic!("inconsistency inside block statuses pruning and removing unused final active blocks - {} is missing", discard_active_h);
                }
            });
        }
        Ok(discarded_finals)
    }

    // Keep only a certain (`config.max_future_processing_blocks`) number of blocks that have slots in the future
    // to avoid high memory consumption
    fn prune_slot_waiting(&mut self) {
        if self.blocks_state.waiting_for_slot_blocks().len()
            <= self.config.max_future_processing_blocks
        {
            return;
        }
        let mut slot_waiting: Vec<(Slot, BlockId)> = self
            .blocks_state
            .waiting_for_slot_blocks()
            .iter()
            .filter_map(|block_id| {
                if let Some(BlockStatus::WaitingForSlot(header_or_block)) =
                    self.blocks_state.get(block_id)
                {
                    return Some((header_or_block.get_slot(), *block_id));
                }
                None
            })
            .collect();
        slot_waiting.sort_unstable();
        let len_slot_waiting = slot_waiting.len();
        (self.config.max_future_processing_blocks..len_slot_waiting).for_each(|idx| {
            let (_slot, block_id) = &slot_waiting[idx];
            self.blocks_state.transition_map(block_id, |_, _| None);
        });
    }

    // Keep only a certain (`config.max_discarded_blocks`) number of blocks that are discarded
    // to avoid high memory consumption
    fn prune_discarded(&mut self) -> Result<(), ConsensusError> {
        if self.blocks_state.discarded_blocks().len() <= self.config.max_discarded_blocks {
            return Ok(());
        }
        let mut discard_hashes: Vec<(u64, BlockId)> = self
            .blocks_state
            .discarded_blocks()
            .iter()
            .filter_map(|block_id| {
                if let Some(BlockStatus::Discarded {
                    sequence_number, ..
                }) = self.blocks_state.get(block_id)
                {
                    return Some((*sequence_number, *block_id));
                }
                None
            })
            .collect();
        discard_hashes.sort_unstable();
        discard_hashes.truncate(
            self.blocks_state.discarded_blocks().len() - self.config.max_discarded_blocks,
        );
        for (_, block_id) in discard_hashes.iter() {
            self.blocks_state.transition_map(block_id, |_, _| None);
        }
        Ok(())
    }

    fn prune_waiting_for_dependencies(&mut self) -> Result<(), ConsensusError> {
        let mut to_discard: PreHashMap<BlockId, Option<DiscardReason>> = PreHashMap::default();
        let mut to_keep: PreHashMap<BlockId, (u64, Slot)> = PreHashMap::default();

        // list items that are older than the latest final blocks in their threads or have deps that are discarded
        {
            for block_id in self.blocks_state.waiting_for_dependencies_blocks().iter() {
                if let Some(BlockStatus::WaitingForDependencies {
                    header_or_block,
                    unsatisfied_dependencies,
                    sequence_number,
                }) = self.blocks_state.get(block_id)
                {
                    // has already discarded dependencies => discard (choose worst reason)
                    let mut discard_reason = None;
                    let mut discarded_dep_found = false;
                    for dep in unsatisfied_dependencies.iter() {
                        if let Some(BlockStatus::Discarded { reason, .. }) =
                            self.blocks_state.get(dep)
                        {
                            discarded_dep_found = true;
                            match reason {
                                DiscardReason::Invalid(reason) => {
                                    discard_reason = Some(DiscardReason::Invalid(format!("discarded because depend on block:{} that has discard reason:{}", block_id, reason)));
                                    break;
                                }
                                DiscardReason::Stale => discard_reason = Some(DiscardReason::Stale),
                                DiscardReason::Final => discard_reason = Some(DiscardReason::Stale),
                            }
                        }
                    }
                    if discarded_dep_found {
                        to_discard.insert(*block_id, discard_reason);
                        continue;
                    }

                    // is at least as old as the latest final block in its thread => discard as stale
                    let slot = header_or_block.get_slot();
                    if slot.period <= self.latest_final_blocks_periods[slot.thread as usize].1 {
                        to_discard.insert(*block_id, Some(DiscardReason::Stale));
                        continue;
                    }

                    // otherwise, mark as to_keep
                    to_keep.insert(*block_id, (*sequence_number, header_or_block.get_slot()));
                }
            }
        }

        // discard in chain and because of limited size
        while !to_keep.is_empty() {
            // mark entries as to_discard and remove them from to_keep
            for (hash, _old_order) in to_keep.clone().into_iter() {
                if let Some(BlockStatus::WaitingForDependencies {
                    unsatisfied_dependencies,
                    ..
                }) = self.blocks_state.get(&hash)
                {
                    // has dependencies that will be discarded => discard (choose worst reason)
                    let mut discard_reason = None;
                    let mut dep_to_discard_found = false;
                    for dep in unsatisfied_dependencies.iter() {
                        if let Some(reason) = to_discard.get(dep) {
                            dep_to_discard_found = true;
                            match reason {
                                Some(DiscardReason::Invalid(reason)) => {
                                    discard_reason = Some(DiscardReason::Invalid(format!("discarded because depend on block:{} that has discard reason:{}", hash, reason)));
                                    break;
                                }
                                Some(DiscardReason::Stale) => {
                                    discard_reason = Some(DiscardReason::Stale)
                                }
                                Some(DiscardReason::Final) => {
                                    discard_reason = Some(DiscardReason::Stale)
                                }
                                None => {} // leave as None
                            }
                        }
                    }
                    if dep_to_discard_found {
                        to_keep.remove(&hash);
                        to_discard.insert(hash, discard_reason);
                        continue;
                    }
                }
            }

            // remove worst excess element
            if to_keep.len() > self.config.max_dependency_blocks {
                let remove_elt = to_keep
                    .iter()
                    .filter_map(|(hash, _old_order)| {
                        if let Some(BlockStatus::WaitingForDependencies {
                            header_or_block,
                            sequence_number,
                            ..
                        }) = self.blocks_state.get(hash)
                        {
                            return Some((sequence_number, header_or_block.get_slot(), *hash));
                        }
                        None
                    })
                    .min();
                if let Some((_seq_num, _slot, hash)) = remove_elt {
                    to_keep.remove(&hash);
                    to_discard.insert(hash, None);
                    continue;
                }
            }

            // nothing happened: stop loop
            break;
        }

        // transition states to Discarded if there is a reason, otherwise just drop
        for (block_id, reason_opt) in to_discard.drain() {
            let sequence_number = self.blocks_state.sequence_counter();
            self.blocks_state.transition_map(&block_id, |block_status, _| {
                if let Some(BlockStatus::WaitingForDependencies {
                    header_or_block, ..
                }) = block_status {
                    let header = match header_or_block {
                        HeaderOrBlock::Header(h) => h,
                        HeaderOrBlock::Block { id: block_id, .. } => self
                            .storage
                            .read_blocks()
                            .get(&block_id)
                            .unwrap_or_else(|| panic!("block {} should be in storage", block_id))
                            .content
                            .header
                            .clone()
                    };
                    massa_trace!("consensus.block_graph.prune_waiting_for_dependencies", {"hash": block_id, "reason": reason_opt});
                    if let Some(reason) = reason_opt {
                        // add to stats if reason is Stale
                        if reason == DiscardReason::Stale {
                            self.new_stale_blocks.insert(
                                block_id,
                                (header.content_creator_address, header.content.slot),
                            );
                        }
                        // transition to Discarded only if there is a reason
                        Some(BlockStatus::Discarded {
                                slot: header.content.slot,
                                creator: header.content_creator_address,
                                parents: header.content.parents,
                                reason,
                                sequence_number,
                            },
                        )
                    } else {
                        None
                    }
                } else {
                    panic!("block {} should be in WaitingForDependencies state", block_id);
                }
            });
        }

        Ok(())
    }

    /// Clear the cache of blocks indexed by slot.
    /// Slot are not saved anymore, when the block in the same thread with a equal or greater period is finalized.
    fn prune_nonfinal_blocks_per_slot(&mut self) {
        self.nonfinal_active_blocks_per_slot
            .retain(|s, _| s.period > self.latest_final_blocks_periods[s.thread as usize].1);
    }

    /// Clear all the caches and blocks waiting to be processed to avoid too much memory usage.
    pub fn prune(&mut self) -> Result<(), ConsensusError> {
        let before = self.max_cliques.len();
        // Step 1: discard final blocks that are not useful to the graph anymore and return them
        self.prune_active()?;

        // Step 2: prune slot waiting blocks
        self.prune_slot_waiting();

        // Step 3: prune dependency waiting blocks
        self.prune_waiting_for_dependencies()?;

        // Step 4: prune discarded
        self.prune_discarded()?;

        // Step 5: prune nonfinal blocks per slot
        self.prune_nonfinal_blocks_per_slot();

        let after = self.max_cliques.len();
        if before != after {
            debug!(
                "clique number went from {} to {} after pruning",
                before, after
            );
        }

        Ok(())
    }
}