use std::{collections::VecDeque, thread::JoinHandle, time::Instant};
use crossbeam::{channel::tick, select};
use massa_channel::{receiver::MassaReceiver, sender::MassaSender};
use massa_logging::massa_trace;
use massa_metrics::MassaMetrics;
use massa_models::{
operation::{OperationPrefixId, OperationPrefixIds, SecureShareOperation},
prehash::{CapacityAllocator, PreHashMap, PreHashSet},
secure_share::Id,
slot::Slot,
timeslots::get_block_slot_timestamp,
};
use massa_pool_exports::PoolController;
use massa_protocol_exports::PeerId;
use massa_protocol_exports::{ProtocolConfig, ProtocolError};
use massa_serialization::{DeserializeError, Deserializer};
use massa_storage::Storage;
use massa_time::{MassaTime, TimeError};
use schnellru::{ByLength, LruMap};
use crate::{
handlers::peer_handler::models::{PeerManagementCmd, PeerMessageTuple},
messages::MessagesSerializer,
sig_verifier::verify_sigs_batch,
wrap_network::ActiveConnectionsTrait,
};
use tracing::{debug, info, warn};
use super::{
cache::SharedOperationCache,
commands_propagation::OperationHandlerPropagationCommand,
commands_retrieval::OperationHandlerRetrievalCommand,
messages::{OperationMessage, OperationMessageDeserializer, OperationMessageDeserializerArgs},
OperationMessageSerializer,
};
const THREAD_NAME: &str = "poh-retrieval";
static_assertions::const_assert!(THREAD_NAME.len() < 16);
pub struct OperationBatchItem {
pub instant: Instant,
pub peer_id: PeerId,
pub operations_prefix_ids: OperationPrefixIds,
}
pub struct RetrievalThread {
receiver: MassaReceiver<PeerMessageTuple>,
pool_controller: Box<dyn PoolController>,
cache: SharedOperationCache,
asked_operations: LruMap<OperationPrefixId, (Instant, Vec<PeerId>)>,
active_connections: Box<dyn ActiveConnectionsTrait>,
op_batch_buffer: VecDeque<OperationBatchItem>,
storage: Storage,
config: ProtocolConfig,
internal_sender: MassaSender<OperationHandlerPropagationCommand>,
receiver_ext: MassaReceiver<OperationHandlerRetrievalCommand>,
operation_message_serializer: MessagesSerializer,
peer_cmd_sender: MassaSender<PeerManagementCmd>,
_massa_metrics: MassaMetrics,
}
impl RetrievalThread {
fn run(&mut self) {
let operation_message_deserializer =
OperationMessageDeserializer::new(OperationMessageDeserializerArgs {
max_operations_prefix_ids: self.config.max_operations_per_message as u32,
max_operations: self.config.max_operations_per_message as u32,
max_datastore_value_length: self.config.max_op_datastore_value_length,
max_function_name_length: self.config.max_size_function_name,
max_parameters_size: self.config.max_size_call_sc_parameter,
max_op_datastore_entry_count: self.config.max_op_datastore_entry_count,
max_op_datastore_key_length: self.config.max_op_datastore_key_length,
max_op_datastore_value_length: self.config.max_op_datastore_value_length,
chain_id: self.config.chain_id,
});
let tick_ask_operations = tick(self.config.operation_batch_proc_period.to_duration());
loop {
select! {
recv(self.receiver) -> msg => {
self.receiver.update_metrics();
match msg {
Ok((peer_id, message)) => {
let (rest, message) = match operation_message_deserializer
.deserialize::<DeserializeError>(&message) {
Ok((rest, message)) => (rest, message),
Err(err) => {
warn!("Error when deserializing message from peer {}: Err = {}", peer_id, err);
continue;
}
};
if !rest.is_empty() {
println!("Error: message not fully consumed");
return;
}
match message {
OperationMessage::Operations(ops) => {
debug!("Received operation message: Operations from {}", peer_id);
if let Err(err) = note_operations_from_peer(
&self.storage,
&mut self.cache,
&self.config,
ops,
&peer_id,
&mut self.internal_sender,
&mut self.pool_controller
) {
warn!("peer {} sent us critically incorrect operation, which may be an attack attempt by the remote peer or a loss of sync between us and the remote peer. Err = {}", peer_id, err);
if let Err(e) = self.ban_node(&peer_id) {
warn!("Error when banning node: {}", e);
}
}
}
OperationMessage::OperationsAnnouncement(announcement) => {
debug!("Received operation message: OperationsAnnouncement from {}", peer_id);
if let Err(err) =
self.on_operations_announcements_received(announcement, &peer_id)
{
warn!("error when processing announcement received from peer {}: Err = {}", peer_id, err);
}
}
OperationMessage::AskForOperations(ask) => {
debug!("Received operation message: AskForOperations from {}", peer_id);
if let Err(err) = self.on_asked_operations_received(&peer_id, ask) {
warn!("error when processing asked operations received from peer {}: Err = {}", peer_id, err);
}
}
}
}
Err(_) => {
info!("Stop operation retrieval thread");
return;
}
}
},
recv(self.receiver_ext) -> msg => {
self.receiver_ext.update_metrics();
match msg {
Ok(cmd) => match cmd {
OperationHandlerRetrievalCommand::Stop => {
info!("Stop operation retrieval thread");
return;
}
},
Err(_) => {
info!("Stop operation retrieval thread");
return;
}
}
}
recv(tick_ask_operations) -> _ => {
if let Err(err) = self.update_ask_operation() {
warn!("Error in update_ask_operation: {}", err);
};
}
}
}
}
fn on_operations_announcements_received(
&mut self,
mut op_batch: OperationPrefixIds,
peer_id: &PeerId,
) -> Result<(), ProtocolError> {
if !self
.active_connections
.get_peer_ids_connected()
.contains(peer_id)
{
return Ok(());
}
self.cache
.write()
.insert_peer_known_ops(peer_id, &op_batch.iter().copied().collect::<Vec<_>>());
{
let cache_read = self.cache.read();
op_batch.retain(|prefix| cache_read.checked_operations_prefix.peek(prefix).is_none());
}
let mut ask_set = OperationPrefixIds::with_capacity(op_batch.len());
let mut future_set = OperationPrefixIds::with_capacity(op_batch.len());
let now = Instant::now();
let mut count_reask = 0;
for op_id in op_batch {
let opt_previous_ask = match self.asked_operations.get(&op_id) {
Some(previous_ask) => {
if previous_ask.1.contains(peer_id) {
continue; } else {
Some(previous_ask) }
}
None => None,
};
if let Some((previous_ask_time, previous_ask_peers)) = opt_previous_ask {
if now
.checked_duration_since(*previous_ask_time)
.unwrap_or_default()
> self.config.operation_batch_proc_period.to_duration()
{
count_reask += 1;
ask_set.insert(op_id);
*previous_ask_time = now;
previous_ask_peers.push(*peer_id);
} else {
future_set.insert(op_id);
}
} else {
ask_set.insert(op_id);
self.asked_operations.insert(op_id, (now, vec![*peer_id]));
}
} if count_reask > 0 {
massa_trace!("re-ask operations.", { "count": count_reask });
}
if self.op_batch_buffer.len() < self.config.operation_batch_buffer_capacity
&& !future_set.is_empty()
{
self.op_batch_buffer.push_back(OperationBatchItem {
instant: now
.checked_add(self.config.operation_batch_proc_period.into())
.ok_or(TimeError::TimeOverflowError)?,
peer_id: *peer_id,
operations_prefix_ids: future_set,
});
}
if !ask_set.is_empty() {
debug!(
"Send ask operations of len {} to {}",
ask_set.len(),
peer_id
);
for sub_list in ask_set
.into_iter()
.collect::<Vec<OperationPrefixId>>()
.chunks(self.config.max_operations_per_message as usize)
{
if let Err(err) = self.active_connections.send_to_peer(
peer_id,
&self.operation_message_serializer,
OperationMessage::AskForOperations(
sub_list.iter().cloned().collect::<OperationPrefixIds>(),
)
.into(),
false,
) {
warn!("Failed to send AskForOperations message to peer: {}", err);
if let ProtocolError::PeerDisconnected(_) = err {
break;
}
}
}
}
Ok(())
}
fn update_ask_operation(&mut self) -> Result<(), ProtocolError> {
let now = Instant::now();
while !self.op_batch_buffer.is_empty()
&& now >= self.op_batch_buffer.front().unwrap().instant
{
let op_batch_item = self.op_batch_buffer.pop_front().unwrap();
self.on_operations_announcements_received(
op_batch_item.operations_prefix_ids,
&op_batch_item.peer_id,
)?;
}
Ok(())
}
fn on_asked_operations_received(
&mut self,
peer_id: &PeerId,
op_pre_ids: OperationPrefixIds,
) -> Result<(), ProtocolError> {
if op_pre_ids.is_empty() {
return Ok(());
}
let mut ops: Vec<SecureShareOperation> = Vec::with_capacity(op_pre_ids.len());
{
let stored_ops = self.storage.read_operations();
for prefix in op_pre_ids {
let opt_op = match stored_ops
.get_operations_by_prefix(&prefix)
.and_then(|ids| ids.iter().next())
{
Some(id) => stored_ops.get(id),
None => continue,
};
if let Some(op) = opt_op {
ops.push(op.clone());
}
}
}
debug!("Send full operations of len {} to {}", ops.len(), peer_id);
for sub_list in ops.chunks(self.config.max_operations_per_message as usize) {
if let Err(err) = self.active_connections.send_to_peer(
peer_id,
&self.operation_message_serializer,
OperationMessage::Operations(sub_list.to_vec()).into(),
false,
) {
warn!("Failed to send Operations message to peer: {}", err);
if let ProtocolError::PeerDisconnected(_) = err {
break;
}
}
}
Ok(())
}
fn ban_node(&mut self, peer_id: &PeerId) -> Result<(), ProtocolError> {
massa_trace!("ban node from retrieval thread", { "peer_id": peer_id.to_string() });
self.peer_cmd_sender
.try_send(PeerManagementCmd::Ban(vec![*peer_id]))
.map_err(|err| ProtocolError::SendError(err.to_string()))
}
}
pub(crate) fn note_operations_from_peer(
base_storage: &Storage,
operations_cache: &mut SharedOperationCache,
config: &ProtocolConfig,
operations: Vec<SecureShareOperation>,
source_peer_id: &PeerId,
ops_propagation_sender: &mut MassaSender<OperationHandlerPropagationCommand>,
pool_controller: &mut Box<dyn PoolController>,
) -> Result<(), ProtocolError> {
massa_trace!("protocol.protocol_worker.note_operations_from_peer", { "peer": source_peer_id, "operations": operations });
let now = MassaTime::now();
let mut new_operations = PreHashMap::with_capacity(operations.len());
for operation in operations {
let expire_period_timestamp = get_block_slot_timestamp(
config.thread_count,
config.t0,
config.genesis_timestamp,
Slot::new(
operation.content.expire_period,
operation
.content_creator_address
.get_thread(config.thread_count),
),
);
match expire_period_timestamp {
Ok(slot_timestamp) => {
if slot_timestamp.saturating_add(config.max_operations_propagation_time) < now {
continue;
}
}
Err(_) => continue,
}
if operation.serialized_size() > config.max_serialized_operations_size_per_block {
return Err(ProtocolError::InvalidOperationError(format!(
"Operation {} exceeds max block size, maximum authorized {} bytes but found {} bytes",
operation.id,
operation.serialized_size(),
config.max_serialized_operations_size_per_block
)));
};
new_operations.insert(operation.id, operation);
}
let all_received_ids: PreHashSet<_> = new_operations.keys().copied().collect();
{
let cache_read = operations_cache.read();
new_operations.retain(|op_id, _| cache_read.checked_operations.peek(op_id).is_none());
}
verify_sigs_batch(
&new_operations
.iter()
.map(|(op_id, op)| (*op_id.get_hash(), op.signature, op.content_creator_pub_key))
.collect::<Vec<_>>(),
)?;
{
let mut cache_write = operations_cache.write();
for op_id in new_operations.keys().copied() {
cache_write.insert_checked_operation(op_id);
}
cache_write.insert_peer_known_ops(
source_peer_id,
&all_received_ids
.into_iter()
.map(|id| id.into_prefix())
.collect::<Vec<_>>(),
);
}
if !new_operations.is_empty() {
let mut ops = base_storage.clone_without_refs();
ops.store_operations(new_operations.into_values().collect());
if let Err(_err) = ops_propagation_sender.try_send(
OperationHandlerPropagationCommand::PropagateOperations(ops.clone()),
) {
warn!("Error sending operations to propagation channel");
}
pool_controller.add_operations(ops);
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub fn start_retrieval_thread(
receiver: MassaReceiver<PeerMessageTuple>,
pool_controller: Box<dyn PoolController>,
storage: Storage,
config: ProtocolConfig,
cache: SharedOperationCache,
active_connections: Box<dyn ActiveConnectionsTrait>,
receiver_ext: MassaReceiver<OperationHandlerRetrievalCommand>,
internal_sender: MassaSender<OperationHandlerPropagationCommand>,
peer_cmd_sender: MassaSender<PeerManagementCmd>,
massa_metrics: MassaMetrics,
) -> JoinHandle<()> {
std::thread::Builder::new()
.name(THREAD_NAME.to_string())
.spawn(move || {
let mut retrieval_thread = RetrievalThread {
receiver,
pool_controller,
storage,
internal_sender,
receiver_ext,
cache,
active_connections,
asked_operations: LruMap::new(ByLength::new(
config
.asked_operations_buffer_capacity
.try_into()
.expect("asked_operations_buffer_capacity in config must be > 0"),
)),
config,
operation_message_serializer: MessagesSerializer::new()
.with_operation_message_serializer(OperationMessageSerializer::new()),
op_batch_buffer: VecDeque::new(),
peer_cmd_sender,
_massa_metrics: massa_metrics,
};
retrieval_thread.run();
})
.expect("OS failed to start operation retrieval thread")
}