1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
//! A file describing an optimized datastore keys traversal algorithm.
//! It is shared between execution.rs and speculative_ledger.rs.

use std::{
    collections::{BTreeMap, BTreeSet, VecDeque},
    ops::Bound,
    sync::Arc,
};

use massa_final_state::FinalStateController;
use massa_ledger_exports::LedgerChanges;
use massa_models::{
    address::Address,
    datastore::{get_prefix_bounds, range_intersection},
    types::{SetOrDelete, SetUpdateOrDelete},
};
use parking_lot::RwLock;

use crate::active_history::ActiveHistory;

/// Gets a copy of a datastore keys for a given address
///
/// # Arguments
/// * `addr`: address to query
/// * `prefix`: prefix to filter keys
/// * `start_key`: start key of the range
/// * `end_key`: end key of the range
/// * `count`: maximum number of keys to return
///
/// # Returns
/// A tuple of two `Option<BTreeSet<Vec<u8>>>`:
/// `None` means that the address does not exist.
/// The first element is the final state keys, the second element is the speculative keys.
#[allow(clippy::type_complexity, clippy::too_many_arguments)]
pub fn scan_datastore(
    addr: &Address,
    prefix: &[u8],
    start_key: Bound<Vec<u8>>,
    end_key: Bound<Vec<u8>>,
    count: Option<u32>,
    final_state: Arc<RwLock<dyn FinalStateController>>,
    active_history: Arc<RwLock<ActiveHistory>>,
    added_changes: Option<&LedgerChanges>,
) -> (Option<BTreeSet<Vec<u8>>>, Option<BTreeSet<Vec<u8>>>) {
    // get final keys
    let final_keys = final_state.read().get_ledger().get_datastore_keys(
        addr,
        prefix,
        start_key.clone(),
        end_key.clone(),
        count,
    );

    // the iteration range is the intersection of the prefix range and the selection range
    let key_range = range_intersection(
        get_prefix_bounds(prefix),
        (start_key.clone(), end_key.clone()),
    );

    enum SpeculativeResetType {
        None,
        Set,
        Delete,
    }

    // process speculative history
    let mut speculative_reset = SpeculativeResetType::None;
    let mut key_updates = BTreeMap::new();
    {
        let mut update_indices = VecDeque::new();
        let history_lock = active_history.read();

        let it = history_lock
            .0
            .iter()
            .map(|v| &v.state_changes.ledger_changes)
            .chain(added_changes.iter().copied());
        let mut index = history_lock.0.len() + if added_changes.is_some() { 1 } else { 0 };
        for output in it.rev() {
            index -= 1;
            match output.get(addr) {
                // address absent from the changes
                None => (),

                // address ledger entry being reset to an absolute new list of keys
                Some(SetUpdateOrDelete::Set(v)) => {
                    if let Some(k_range) = key_range.as_ref() {
                        key_updates = v
                            .datastore
                            .range(k_range.clone())
                            .map(|(k, _v)| (k.clone(), true))
                            .collect();
                    }
                    speculative_reset = SpeculativeResetType::Set;
                    break;
                }

                // address ledger entry being updated within the key range of interest
                Some(SetUpdateOrDelete::Update(updates)) => {
                    if let Some(k_range) = key_range.as_ref() {
                        if updates.datastore.range(k_range.clone()).next().is_some() {
                            update_indices.push_front(index);
                        }
                    }
                }

                // address ledger entry being deleted
                Some(SetUpdateOrDelete::Delete) => {
                    speculative_reset = SpeculativeResetType::Delete;
                    break;
                }
            }
        }
        if matches!(speculative_reset, SpeculativeResetType::Delete) && !update_indices.is_empty() {
            // if there are updates after an address deletion, consider it a Set
            speculative_reset = SpeculativeResetType::Set;
        }

        // aggregate key updates
        for idx in update_indices {
            let changes = if idx < history_lock.0.len() {
                &history_lock.0[idx].state_changes.ledger_changes
            } else if let Some(added_changes) = added_changes.as_ref() {
                *added_changes
            } else {
                panic!("unexpected index out of bounds")
            };

            if let SetUpdateOrDelete::Update(updates) = changes
                .get(addr)
                .expect("address unexpectedly absent from the changes")
            {
                if let Some(k_range) = key_range.as_ref() {
                    for (k, update) in updates.datastore.range(k_range.clone()) {
                        match update {
                            SetOrDelete::Set(_) => {
                                key_updates.insert(k.clone(), true);
                            }
                            SetOrDelete::Delete => {
                                key_updates.insert(k.clone(), false);
                            }
                        }
                    }
                }
            } else {
                panic!("unexpected state change");
            }
        }
    }

    // process reset-related edge cases
    match speculative_reset {
        SpeculativeResetType::Delete => {
            // the address was deleted in the speculative history without further updates
            return (final_keys, None);
        }
        SpeculativeResetType::Set => {
            // the address was reset in the speculative history
            let filter_it = key_updates
                .into_iter()
                .filter_map(|(k, is_set)| if is_set { Some(k) } else { None });
            if let Some(cnt) = count {
                return (final_keys, Some(filter_it.take(cnt as usize).collect()));
            } else {
                return (final_keys, Some(filter_it.collect()));
            }
        }
        SpeculativeResetType::None => {
            // there was no reset
            if key_updates.is_empty() {
                // there were no updates: return the same as final
                return (final_keys.clone(), final_keys);
            } else if final_keys.is_none() {
                // handle the case where there were updates but the final address was absent
                let filter_it =
                    key_updates
                        .into_iter()
                        .filter_map(|(k, is_set)| if is_set { Some(k) } else { None });
                if let Some(cnt) = count {
                    return (None, Some(filter_it.take(cnt as usize).collect()));
                } else {
                    return (None, Some(filter_it.collect()));
                }
            }
        }
    }

    // If we reach this point, it means that all of the following is true:
    //   * the final key list is present
    //   * there was no reset/delete in the speculative history
    //   * there were updates in the speculative history
    // This means that we need to merge the final and speculative key lists,
    // querying more final keys if necessary to reach the desired count.

    let mut final_keys_queue: VecDeque<_> = final_keys
        .as_ref()
        .expect("expected final keys to be non-None")
        .iter()
        .cloned()
        .collect();
    let mut speculative_keys: BTreeSet<_> = Default::default();
    let mut last_final_batch_key = final_keys_queue.back().cloned();
    let mut key_updates_it = key_updates.into_iter().peekable();
    loop {
        if let Some(cnt) = count {
            if speculative_keys.len() >= cnt as usize {
                return (final_keys, Some(speculative_keys));
            }
        }
        match (final_keys_queue.front(), key_updates_it.peek()) {
            (Some(_f), None) => {
                // final only
                let k = final_keys_queue
                    .pop_front()
                    .expect("expected final list to be non-empty");
                speculative_keys.insert(k);
            }
            (Some(f), Some((u, _is_set))) => {
                // key present both in the final state and as a speculative update
                match f.cmp(u) {
                    std::cmp::Ordering::Less => {
                        // take into account final only
                        let k = final_keys_queue
                            .pop_front()
                            .expect("expected final key queue to be non-empty");
                        speculative_keys.insert(k);
                    }
                    std::cmp::Ordering::Equal => {
                        // take into account the change but pop both
                        let (k, is_set) = key_updates_it
                            .next()
                            .expect("expected key update queue to be non-empty");
                        final_keys_queue.pop_front();
                        if is_set {
                            speculative_keys.insert(k);
                        }
                    }
                    std::cmp::Ordering::Greater => {
                        // take into account the update only
                        let (k, is_set) = key_updates_it
                            .next()
                            .expect("expected key update queue to be non-empty");
                        if is_set {
                            speculative_keys.insert(k);
                        }
                    }
                }
            }
            (None, Some((_u, _is_set))) => {
                // no final but there is a change
                let (k, is_set) = key_updates_it
                    .next()
                    .expect("expected key update queue to be non-empty");
                if is_set {
                    speculative_keys.insert(k);
                }
            }
            (None, None) => {
                // nothing is left
                return (final_keys, Some(speculative_keys));
            }
        }

        if final_keys_queue.is_empty() {
            if let Some(last_k) = last_final_batch_key.take() {
                // the last final item was consumed: replenish the queue by querying more
                final_keys_queue = final_state
                    .read()
                    .get_ledger()
                    .get_datastore_keys(
                        addr,
                        prefix,
                        std::ops::Bound::Excluded(last_k),
                        end_key.clone(),
                        count,
                    )
                    .expect("address expected to exist in final state")
                    .iter()
                    .cloned()
                    .collect();
                last_final_batch_key = final_keys_queue.back().cloned();
            }
        }
    }
}