use crate::active_history::{ActiveHistory, HistorySearchResult::Present};
use massa_async_pool::{
AsyncMessage, AsyncMessageId, AsyncMessageInfo, AsyncMessageTrigger, AsyncMessageUpdate,
AsyncPoolChanges,
};
use massa_final_state::FinalStateController;
use massa_ledger_exports::LedgerChanges;
use massa_models::types::{Applicable, SetUpdateOrDelete};
use massa_models::{
config::{GENESIS_TIMESTAMP, T0, THREAD_COUNT},
slot::Slot,
timeslots::get_block_slot_timestamp,
};
use massa_versioning::versioning::MipComponent;
use parking_lot::RwLock;
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
};
pub(crate) struct SpeculativeAsyncPool {
final_state: Arc<RwLock<dyn FinalStateController>>,
active_history: Arc<RwLock<ActiveHistory>>,
pool_changes: AsyncPoolChanges,
message_infos: BTreeMap<AsyncMessageId, AsyncMessageInfo>,
}
impl SpeculativeAsyncPool {
pub fn new(
final_state: Arc<RwLock<dyn FinalStateController>>,
active_history: Arc<RwLock<ActiveHistory>>,
) -> Self {
let mut message_infos = final_state
.read()
.get_async_pool()
.message_info_cache
.clone();
for history_item in active_history.read().0.iter() {
for change in history_item.state_changes.async_pool_changes.0.iter() {
match change {
(id, SetUpdateOrDelete::Set(message)) => {
message_infos.insert(*id, AsyncMessageInfo::from(message.clone()));
}
(id, SetUpdateOrDelete::Update(message_update)) => {
message_infos.entry(*id).and_modify(|message_info| {
message_info.apply(message_update.clone());
});
}
(id, SetUpdateOrDelete::Delete) => {
message_infos.remove(id);
}
}
}
}
SpeculativeAsyncPool {
final_state,
active_history,
pool_changes: Default::default(),
message_infos,
}
}
pub fn take(&mut self) -> AsyncPoolChanges {
std::mem::take(&mut self.pool_changes)
}
pub fn get_snapshot(&self) -> (AsyncPoolChanges, BTreeMap<AsyncMessageId, AsyncMessageInfo>) {
(self.pool_changes.clone(), self.message_infos.clone())
}
pub fn reset_to_snapshot(
&mut self,
snapshot: (AsyncPoolChanges, BTreeMap<AsyncMessageId, AsyncMessageInfo>),
) {
self.pool_changes = snapshot.0;
self.message_infos = snapshot.1;
}
pub fn push_new_message(&mut self, msg: AsyncMessage) {
self.pool_changes.push_add(msg.compute_id(), msg.clone());
self.message_infos.insert(msg.compute_id(), msg.into());
}
pub fn take_batch_to_execute(
&mut self,
slot: Slot,
max_gas: u64,
async_msg_cst_gas_cost: u64,
) -> Vec<(AsyncMessageId, AsyncMessage)> {
let mut available_gas = max_gas;
let mut wanted_messages = Vec::new();
let message_infos = self.message_infos.clone();
for (message_id, message_info) in message_infos.iter() {
let corrected_max_gas = message_info.max_gas.saturating_add(async_msg_cst_gas_cost);
if available_gas >= corrected_max_gas
&& Self::is_message_ready_to_execute(
&slot,
&message_info.validity_start,
&message_info.validity_end,
self.get_execution_component_version(&slot),
)
&& message_info.can_be_executed
{
available_gas -= corrected_max_gas;
wanted_messages.push(message_id);
}
}
let taken = self.fetch_msgs(
wanted_messages,
true,
self.get_execution_component_version(&slot),
);
for (message_id, _) in taken.iter() {
self.message_infos.remove(message_id);
}
taken
}
pub fn settle_slot(
&mut self,
slot: &Slot,
ledger_changes: &LedgerChanges,
fix_eliminated_msg: bool,
) -> Vec<(AsyncMessageId, AsyncMessage)> {
let execution_component_version = self.get_execution_component_version(slot);
let mut eliminated_infos = Vec::new();
self.message_infos.retain(|id, info| {
if Self::is_message_expired(slot, &info.validity_end, execution_component_version) {
eliminated_infos.push((*id, info.clone()));
false
} else {
true
}
});
let mut eliminated_new_messages = Vec::new();
self.pool_changes.0.retain(|k, v| match v {
SetUpdateOrDelete::Set(message) => {
if Self::is_message_expired(
slot,
&message.validity_end,
execution_component_version,
) {
eliminated_new_messages.push((*k, v.clone()));
false
} else {
true
}
}
SetUpdateOrDelete::Update(_v) => true,
SetUpdateOrDelete::Delete => true,
});
eliminated_infos.extend(eliminated_new_messages.iter().filter_map(|(k, v)| match v {
SetUpdateOrDelete::Set(v) => Some((*k, AsyncMessageInfo::from(v.clone()))),
SetUpdateOrDelete::Update(_v) => None,
SetUpdateOrDelete::Delete => None,
}));
let excess_count = self
.message_infos
.len()
.saturating_sub(self.final_state.read().get_async_pool().config.max_length as usize);
eliminated_infos.reserve_exact(excess_count);
for _ in 0..excess_count {
eliminated_infos.push(self.message_infos.pop_last().unwrap()); }
let mut triggered_info = Vec::new();
for (id, message_info) in self.message_infos.iter_mut() {
if let Some(filter) = &message_info.trigger {
if is_triggered(filter, ledger_changes) {
message_info.can_be_executed = true;
triggered_info.push((*id, message_info.clone()));
}
}
}
let triggered_msg = self.fetch_msgs(
triggered_info.iter().map(|(id, _)| id).collect(),
false,
execution_component_version,
);
for (msg_id, _msg) in triggered_msg.iter() {
self.pool_changes.push_activate(*msg_id);
}
let mut eliminated_msg = self.fetch_msgs(
eliminated_infos.iter().map(|(id, _)| id).collect(),
true,
execution_component_version,
);
if fix_eliminated_msg {
eliminated_msg.extend(eliminated_new_messages.iter().filter_map(|(k, v)| match v {
SetUpdateOrDelete::Set(v) => Some((*k, v.clone())),
SetUpdateOrDelete::Update(_v) => None,
SetUpdateOrDelete::Delete => None,
}));
}
eliminated_msg
}
fn fetch_msgs(
&mut self,
mut wanted_ids: Vec<&AsyncMessageId>,
delete_existing: bool,
execution_component_version: u32,
) -> Vec<(AsyncMessageId, AsyncMessage)> {
let mut msgs = Vec::new();
let mut current_changes = HashMap::new();
for id in wanted_ids.iter() {
current_changes.insert(*id, AsyncMessageUpdate::default());
}
let pool_changes_clone = self.pool_changes.clone();
wanted_ids.retain(|&message_id| match pool_changes_clone.0.get(message_id) {
Some(SetUpdateOrDelete::Set(msg)) => {
if delete_existing {
self.pool_changes.push_delete(*message_id);
}
msgs.push((*message_id, msg.clone()));
false
}
Some(SetUpdateOrDelete::Update(msg_update)) => {
current_changes.entry(message_id).and_modify(|e| {
e.apply(msg_update.clone());
});
true
}
Some(SetUpdateOrDelete::Delete) => true,
None => true,
});
wanted_ids.retain(|&message_id| {
match self.active_history.read().fetch_message(
message_id,
current_changes.get(message_id).cloned().unwrap_or_default(),
execution_component_version,
) {
Present(SetUpdateOrDelete::Set(mut msg)) => {
msg.apply(current_changes.get(message_id).cloned().unwrap_or_default());
if delete_existing {
self.pool_changes.push_delete(*message_id);
}
msgs.push((*message_id, msg));
return false;
}
Present(SetUpdateOrDelete::Update(msg_update)) => {
current_changes.entry(message_id).and_modify(|e| {
match execution_component_version {
0 => {
e.apply(msg_update.clone());
}
_ => {
*e = msg_update.clone();
}
}
});
return true;
}
_ => {}
}
true
});
let fetched_msgs = self
.final_state
.read()
.get_async_pool()
.fetch_messages(wanted_ids);
for (message_id, message) in fetched_msgs {
if let Some(msg) = message {
let mut msg = msg.clone();
msg.apply(current_changes.get(message_id).cloned().unwrap_or_default());
if delete_existing {
self.pool_changes.push_delete(*message_id);
}
msgs.push((*message_id, msg));
}
}
msgs
}
fn get_execution_component_version(&self, slot: &Slot) -> u32 {
let ts = get_block_slot_timestamp(THREAD_COUNT, T0, *GENESIS_TIMESTAMP, *slot)
.expect("Time overflow when getting block slot timestamp for MIP");
self.final_state
.read()
.get_mip_store()
.get_latest_component_version_at(&MipComponent::Execution, ts)
}
fn is_message_expired(
slot: &Slot,
message_validity_end: &Slot,
execution_component_version: u32,
) -> bool {
match execution_component_version {
0 => *slot >= *message_validity_end,
_ => *slot > *message_validity_end,
}
}
fn is_message_ready_to_execute(
slot: &Slot,
message_validity_start: &Slot,
message_validity_end: &Slot,
execution_component_version: u32,
) -> bool {
match execution_component_version {
0 => slot >= message_validity_start && slot < message_validity_end,
_ => slot >= message_validity_start && slot <= message_validity_end,
}
}
}
fn is_triggered(filter: &AsyncMessageTrigger, ledger_changes: &LedgerChanges) -> bool {
ledger_changes.has_writes(&filter.address, filter.datastore_key.clone())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_validity() {
let slot1 = Slot::new(6, 0);
let slot2 = Slot::new(9, 0);
let slot_validity_start = Slot::new(4, 0);
let slot_validity_end = Slot::new(8, 0);
assert!(!SpeculativeAsyncPool::is_message_expired(
&slot1,
&slot_validity_end,
1
));
assert!(SpeculativeAsyncPool::is_message_ready_to_execute(
&slot1,
&slot_validity_start,
&slot_validity_end,
1
));
assert!(!SpeculativeAsyncPool::is_message_expired(
&slot_validity_start,
&slot_validity_end,
1
));
assert!(SpeculativeAsyncPool::is_message_ready_to_execute(
&slot_validity_start,
&slot_validity_start,
&slot_validity_end,
1
));
assert!(!SpeculativeAsyncPool::is_message_expired(
&slot_validity_end,
&slot_validity_end,
1
));
assert!(SpeculativeAsyncPool::is_message_ready_to_execute(
&slot_validity_end,
&slot_validity_start,
&slot_validity_end,
1
));
assert!(SpeculativeAsyncPool::is_message_expired(
&slot2,
&slot_validity_end,
1
));
assert!(!SpeculativeAsyncPool::is_message_ready_to_execute(
&slot2,
&slot_validity_start,
&slot_validity_end,
1
));
}
}