use std::sync::Arc;
use std::thread;
use parking_lot::{Condvar, Mutex, RwLock};
use tracing::{debug, info};
use crate::config::EventCacheConfig;
use crate::controller::{
EventCacheController, EventCacheControllerImpl, EventCacheWriterInputData,
};
use crate::event_cache::EventCache;
pub(crate) struct EventCacheWriterThread {
input_data: Arc<(Condvar, Mutex<EventCacheWriterInputData>)>,
cache: Arc<RwLock<EventCache>>,
}
impl EventCacheWriterThread {
fn new(
input_data: Arc<(Condvar, Mutex<EventCacheWriterInputData>)>,
event_cache: Arc<RwLock<EventCache>>,
) -> Self {
Self {
input_data,
cache: event_cache,
}
}
fn wait_loop_event(&mut self) -> (EventCacheWriterInputData, bool) {
loop {
let mut input_data_lock = self.input_data.1.lock();
let input_data: EventCacheWriterInputData = std::mem::take(&mut input_data_lock);
if !input_data.events.is_empty() {
return (input_data, false);
}
if input_data.stop {
return (input_data, true);
}
self.input_data.0.wait(&mut input_data_lock);
}
}
pub fn main_loop(&mut self) {
loop {
let (input_data, stop) = self.wait_loop_event();
debug!(
"Event cache writer loop triggered, input_data = {:?}",
input_data
);
if stop {
break;
}
{
let mut lock = self.cache.write();
lock.insert_multi_it(input_data.events.into_iter());
drop(lock);
}
}
}
}
pub trait EventCacheManager {
fn stop(&mut self);
}
pub struct EventCacheWriterManagerImpl {
pub(crate) input_data: Arc<(Condvar, Mutex<EventCacheWriterInputData>)>,
pub(crate) thread_handle: Option<std::thread::JoinHandle<()>>,
}
impl EventCacheManager for EventCacheWriterManagerImpl {
fn stop(&mut self) {
info!("Stopping Execution controller...");
{
let mut input_wlock = self.input_data.1.lock();
input_wlock.stop = true;
self.input_data.0.notify_one();
}
if let Some(join_handle) = self.thread_handle.take() {
join_handle.join().expect("VM controller thread panicked");
}
info!("Execution controller stopped");
}
}
pub fn start_event_cache_writer_worker(
cfg: EventCacheConfig,
) -> (Box<dyn EventCacheManager>, Box<dyn EventCacheController>) {
let event_cache = Arc::new(RwLock::new(EventCache::new(
cfg.event_cache_path.as_path(),
cfg.max_event_cache_length,
cfg.snip_amount,
cfg.thread_count,
cfg.max_call_stack_length,
cfg.max_event_data_length,
cfg.max_events_per_operation,
cfg.max_operations_per_block,
cfg.max_events_per_query,
)));
let input_data = Arc::new((Condvar::new(), Mutex::new(EventCacheWriterInputData::new())));
let input_data_clone = input_data.clone();
let controller = EventCacheControllerImpl {
input_data: input_data.clone(),
cache: event_cache.clone(),
};
let thread_builder = thread::Builder::new().name("event_cache".into());
let thread_handle = thread_builder
.spawn(move || {
EventCacheWriterThread::new(input_data_clone, event_cache).main_loop();
})
.expect("failed to spawn thread : event_cache");
let manager = EventCacheWriterManagerImpl {
input_data,
thread_handle: Some(thread_handle),
};
(Box::new(manager), Box::new(controller))
}