use crate::controller::SelectorControllerImpl;
use crate::controller::SelectorManagerImpl;
use crate::draw::perform_draws;
use crate::CycleDraws;
use crate::DrawCache;
use crate::RwLockCondvar;
use crate::{Command, DrawCachePtr};
use massa_pos_exports::PosError;
use massa_pos_exports::PosResult;
use massa_pos_exports::SelectorConfig;
use massa_pos_exports::SelectorController;
use massa_pos_exports::SelectorManager;
use parking_lot::RwLock;
use std::collections::VecDeque;
use std::sync::mpsc::sync_channel;
use std::sync::mpsc::Receiver;
use std::sync::Arc;
use std::thread;
use std::thread::JoinHandle;
#[allow(dead_code)]
pub(crate) struct SelectorThread {
pub(crate) input_mpsc: Receiver<Command>,
pub(crate) cache: DrawCachePtr,
pub(crate) cfg: SelectorConfig,
}
impl SelectorThread {
pub(crate) fn spawn(
input_mpsc: Receiver<Command>,
cache: DrawCachePtr,
cfg: SelectorConfig,
) -> JoinHandle<PosResult<()>> {
let thread_builder = thread::Builder::new().name("selector".into());
thread_builder
.spawn(|| {
let this = Self {
input_mpsc,
cache,
cfg,
};
this.run()
})
.expect("failed to spawn thread : selector")
}
fn process_draws_result(
&self,
cycle: u64,
draws_result: PosResult<CycleDraws>,
) -> PosResult<()> {
let (cache_cv, cache_lock) = &*self.cache;
let mut cache_guard = cache_lock.write();
let out_result = {
let cache = cache_guard.as_mut().map_err(|err| err.clone())?;
if let Some(last_cycle) = cache.0.back() {
if last_cycle.cycle.checked_add(1) != Some(cycle) {
return Err(PosError::ContainerInconsistency(
"discontinuity in cycle draws history".into(),
));
}
}
match draws_result {
Ok(cycle_draws) => {
cache.0.push_back(cycle_draws);
while cache.0.len() > self.cfg.max_draw_cache {
cache.0.pop_front();
}
Ok(())
}
Err(err) => Err(err),
}
};
if let Err(err) = &out_result {
*cache_guard = Err(err.clone());
}
cache_cv.notify_all();
out_result
}
fn run(self) -> PosResult<()> {
loop {
let Ok(Command::DrawInput {
cycle,
lookback_rolls,
lookback_seed,
}) = self.input_mpsc.recv()
else {
break;
};
let draws_result = perform_draws(&self.cfg, cycle, lookback_rolls, lookback_seed);
self.process_draws_result(cycle, draws_result)?;
}
Ok(())
}
}
pub fn start_selector_worker(
selector_config: SelectorConfig,
) -> PosResult<(Box<dyn SelectorManager>, Box<dyn SelectorController>)> {
let (input_sender, input_receiver) = sync_channel(selector_config.channel_size);
let cache = Arc::new((
RwLockCondvar::default(),
RwLock::new(Ok(DrawCache(VecDeque::with_capacity(
selector_config.max_draw_cache.saturating_add(1),
)))),
));
let controller = SelectorControllerImpl {
input_mpsc: input_sender.clone(),
cache: cache.clone(),
periods_per_cycle: selector_config.periods_per_cycle,
thread_count: selector_config.thread_count,
};
let thread_handle = SelectorThread::spawn(input_receiver, cache, selector_config);
let manager = SelectorManagerImpl {
thread_handle: Some(thread_handle),
input_mpsc: input_sender,
};
Ok((Box::new(manager), Box::new(controller)))
}