1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
// std
use std::collections::{BTreeSet, VecDeque};
use std::sync::Arc;
// third-party
use parking_lot::{Condvar, Mutex, RwLock};
// internal
use crate::event_cache::EventCache;
use massa_models::execution::EventFilter;
use massa_models::output_event::SCOutputEvent;

/// structure used to communicate with controller
#[derive(Debug, Default)]
pub(crate) struct EventCacheWriterInputData {
    /// set stop to true to stop the thread
    pub stop: bool,
    pub(crate) events: VecDeque<SCOutputEvent>,
}

impl EventCacheWriterInputData {
    pub fn new() -> Self {
        Self {
            stop: Default::default(),
            events: Default::default(),
        }
    }

    /*
    /// Takes the current input data into a clone that is returned,
    /// and resets self.
    pub fn take(&mut self) -> Self {
        Self {
            stop: std::mem::take(&mut self.stop),
            events: std::mem::take(&mut self.events),
        }
    }
    */
}

/// interface that communicates with the worker thread
#[cfg_attr(feature = "test-exports", mockall_wrap::wrap, mockall::automock)]
pub trait EventCacheController: Send + Sync {
    fn save_events(&self, events: VecDeque<SCOutputEvent>);

    fn get_filtered_sc_output_events(&self, filter: &EventFilter) -> Vec<SCOutputEvent>;
}

#[derive(Clone)]
/// implementation of the event cache controller
pub struct EventCacheControllerImpl {
    /// input data to process in the VM loop
    /// with a wake-up condition variable that needs to be triggered when the data changes
    pub(crate) input_data: Arc<(Condvar, Mutex<EventCacheWriterInputData>)>,
    /// Event cache
    pub(crate) cache: Arc<RwLock<EventCache>>,
}

impl EventCacheController for EventCacheControllerImpl {
    fn save_events(&self, events: VecDeque<SCOutputEvent>) {
        // lock input data
        let mut input_data = self.input_data.1.lock();
        input_data.events.extend(events);
        // Wake up the condvar in EventCacheWriterThread waiting for events
        self.input_data.0.notify_all();
    }

    fn get_filtered_sc_output_events(&self, filter: &EventFilter) -> Vec<SCOutputEvent> {
        let mut res_0 = {
            // Read from new events first
            let lock_0 = self.input_data.1.lock();
            #[allow(clippy::unnecessary_filter_map)]
            let it = lock_0.events.iter().filter_map(|event| {
                if let Some(start) = filter.start {
                    if event.context.slot < start {
                        return None;
                    }
                }
                if let Some(end) = filter.end {
                    if event.context.slot >= end {
                        return None;
                    }
                }
                if let Some(is_final) = filter.is_final {
                    if event.context.is_final != is_final {
                        return None;
                    }
                }
                if let Some(is_error) = filter.is_error {
                    if event.context.is_error != is_error {
                        return None;
                    }
                }
                match (
                    filter.original_caller_address,
                    event.context.call_stack.front(),
                ) {
                    (Some(addr1), Some(addr2)) if addr1 != *addr2 => return None,
                    (Some(_), None) => return None,
                    _ => (),
                }
                match (filter.emitter_address, event.context.call_stack.back()) {
                    (Some(addr1), Some(addr2)) if addr1 != *addr2 => return None,
                    (Some(_), None) => return None,
                    _ => (),
                }
                match (
                    filter.original_operation_id,
                    event.context.origin_operation_id,
                ) {
                    (Some(addr1), Some(addr2)) if addr1 != addr2 => return None,
                    (Some(_), None) => return None,
                    _ => (),
                }
                Some(event)
            });

            let res_0: BTreeSet<SCOutputEvent> = it.cloned().collect();
            // Drop the lock on the queue as soon as possible to avoid deadlocks
            drop(lock_0);
            res_0
        };

        let res_1 = {
            // Read from db (on disk) events
            let lock = self.cache.read();
            let (_, res_1) = lock.get_filtered_sc_output_events(filter);
            // Drop the lock on the event cache db asap
            drop(lock);
            res_1
        };

        // Merge results
        let res_1: BTreeSet<SCOutputEvent> = BTreeSet::from_iter(res_1);
        res_0.extend(res_1);
        Vec::from_iter(res_0)
    }
}