1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// std
use std::collections::{BTreeSet, VecDeque};
use std::sync::Arc;
// third-party
use parking_lot::{Condvar, Mutex, RwLock};
// internal
use crate::event_cache::EventCache;
use massa_models::execution::EventFilter;
use massa_models::output_event::SCOutputEvent;

/// structure used to communicate with controller
#[derive(Debug, Default)]
pub(crate) struct EventCacheWriterInputData {
    /// set stop to true to stop the thread
    pub stop: bool,
    pub(crate) events: VecDeque<SCOutputEvent>,
}

impl EventCacheWriterInputData {
    pub fn new() -> Self {
        Self {
            stop: Default::default(),
            events: Default::default(),
        }
    }
}

/// interface that communicates with the worker thread
#[cfg_attr(feature = "test-exports", mockall_wrap::wrap, mockall::automock)]
pub trait EventCacheController: Send + Sync {
    fn save_events(&self, events: VecDeque<SCOutputEvent>);

    fn get_filtered_sc_output_events(&self, filter: &EventFilter) -> Vec<SCOutputEvent>;
}

#[derive(Clone)]
/// implementation of the event cache controller
pub struct EventCacheControllerImpl {
    /// input data to process in the VM loop
    /// with a wake-up condition variable that needs to be triggered when the data changes
    pub(crate) input_data: Arc<(Condvar, Mutex<EventCacheWriterInputData>)>,
    /// Event cache
    pub(crate) cache: Arc<RwLock<EventCache>>,
}

impl EventCacheController for EventCacheControllerImpl {
    fn save_events(&self, events: VecDeque<SCOutputEvent>) {
        // lock input data
        let mut input_data = self.input_data.1.lock();
        input_data.events.extend(events);
        massa_metrics::set_event_cache_vec_len(input_data.events.len());
        // Wake up the condvar in EventCacheWriterThread waiting for events
        self.input_data.0.notify_all();
    }

    fn get_filtered_sc_output_events(&self, filter: &EventFilter) -> Vec<SCOutputEvent> {
        let mut res_0 = {
            // Read from new events first
            let lock_0 = self.input_data.1.lock();
            #[allow(clippy::unnecessary_filter_map)]
            let it = lock_0.events.iter().filter_map(|event| {
                if let Some(start) = filter.start {
                    if event.context.slot < start {
                        return None;
                    }
                }
                if let Some(end) = filter.end {
                    if event.context.slot >= end {
                        return None;
                    }
                }
                if let Some(is_final) = filter.is_final {
                    if event.context.is_final != is_final {
                        return None;
                    }
                }
                if let Some(is_error) = filter.is_error {
                    if event.context.is_error != is_error {
                        return None;
                    }
                }
                match (
                    filter.original_caller_address,
                    event.context.call_stack.front(),
                ) {
                    (Some(addr1), Some(addr2)) if addr1 != *addr2 => return None,
                    (Some(_), None) => return None,
                    _ => (),
                }
                match (filter.emitter_address, event.context.call_stack.back()) {
                    (Some(addr1), Some(addr2)) if addr1 != *addr2 => return None,
                    (Some(_), None) => return None,
                    _ => (),
                }
                match (
                    filter.original_operation_id,
                    event.context.origin_operation_id,
                ) {
                    (Some(addr1), Some(addr2)) if addr1 != addr2 => return None,
                    (Some(_), None) => return None,
                    _ => (),
                }
                Some(event)
            });

            let res_0: BTreeSet<SCOutputEvent> = it.cloned().collect();
            // Drop the lock on the queue as soon as possible to avoid deadlocks
            drop(lock_0);
            res_0
        };

        let res_1 = {
            // Read from db (on disk) events
            let lock = self.cache.read();
            let (_, res_1) = lock.get_filtered_sc_output_events(filter);
            // Drop the lock on the event cache db asap
            drop(lock);
            res_1
        };

        // Merge results
        let res_1: BTreeSet<SCOutputEvent> = BTreeSet::from_iter(res_1);
        res_0.extend(res_1);
        Vec::from_iter(res_0)
    }
}