1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
// Copyright (c) 2022 MASSA LABS <info@massa.net>
//! This file defines a generic finite-size execution request queue with an MPSC-based result sender.
use massa_channel::sender::MassaSender;
use massa_execution_exports::ExecutionError;
use std::collections::VecDeque;
/// Represents an execution request T coupled with an MPSC sender for a result of type R
#[derive(Debug)]
pub(crate) struct RequestWithResponseSender<T, R> {
/// The underlying execution request
request: T,
/// An std::mpsc::Sender to later send the execution output R (or an error)
response_tx: MassaSender<Result<R, ExecutionError>>,
}
impl<T, R> RequestWithResponseSender<T, R> {
/// Create a new request with response sender
///
/// # Arguments
/// * `request`: the underlying request of type T
/// * `response_tx`: an `std::mpsc::Sender` to later send the execution output R (or an error)
pub fn new(request: T, response_tx: MassaSender<Result<R, ExecutionError>>) -> Self {
RequestWithResponseSender {
request,
response_tx,
}
}
/// Cancel the request by consuming the object and sending an error through the response channel.
///
/// # Arguments
/// * err: the error to send through the response channel
pub fn cancel(self, err: ExecutionError) {
// Send a message to the request's sender to signal the cancellation.
// Ignore errors because they just mean that the emitter of the request
// has dropped the receiver and does not need the response anymore.
let _ = self.response_tx.send(Err(err));
}
/// Destructure self into a (request, response sender) pair
pub fn into_request_sender_pair(self) -> (T, MassaSender<Result<R, ExecutionError>>) {
(self.request, self.response_tx)
}
}
/// Structure representing an execution request queue with maximal length.
/// Each request is a `RequestWithResponseSender` that comes with an MPSC sender
/// to return the execution result when the execution is over (or an error).
#[derive(Debug)]
pub(crate) struct RequestQueue<T, R> {
/// Max number of item in the queue.
/// When the queue is full, extra new items are cancelled and dropped.
max_items: usize,
/// The actual underlying queue
queue: VecDeque<RequestWithResponseSender<T, R>>,
}
impl<T, R> RequestQueue<T, R> {
/// Create a new request queue
///
/// # Arguments
/// * `max_items`: the maximal number of items in the queue. When full, extra new elements are cancelled and dropped.
pub fn new(max_items: usize) -> Self {
RequestQueue {
max_items,
queue: VecDeque::with_capacity(max_items),
}
}
/// Returns the max number of items the queue can contain
pub fn capacity(&self) -> usize {
self.max_items
}
/// Extends Self with the contents of another `RequestQueue`.
/// The contents of the incoming queue are appended last.
/// Excess items with respect to `self.max_items` are canceled and dropped.
pub fn extend(&mut self, mut other: RequestQueue<T, R>) {
// compute the number of available item slots
let free_slots = self.max_items.saturating_sub(self.queue.len());
// if there are no available slots remaining, do nothing
if free_slots == 0 {
return;
}
// if there are not enough available slots to fit the entire incoming queue
if free_slots < other.queue.len() {
// truncate the incoming queue to the size that fits, cancelling excess items
other.queue.drain(free_slots..).for_each(|req| {
req.cancel(ExecutionError::ChannelError(
"maximal request queue capacity reached".into(),
))
});
}
// append the kept part of the incoming queue
self.queue.extend(other.queue);
}
/// Cancel all queued items.
///
/// # Arguments
/// * err: the error to send through the response channel of cancelled items
pub fn cancel(&mut self, err: ExecutionError) {
for req in self.queue.drain(..) {
req.cancel(err.clone());
}
}
/// Pop out the oldest element of the queue
///
/// # Returns
/// The oldest element of the queue, or None if the queue is empty
pub fn pop(&mut self) -> Option<RequestWithResponseSender<T, R>> {
self.queue.pop_front()
}
/// Push a new element at the end of the queue.
/// May fail if maximum capacity is reached,
/// in which case the request is canceled and dropped.
///
/// # Returns
/// The oldest element of the queue, or None if the queue is empty
pub fn push(&mut self, req: RequestWithResponseSender<T, R>) {
// If the queue is already full, cancel the incoming request and return.
if self.queue.len() >= self.max_items {
req.cancel(ExecutionError::ChannelError(
"maximal request queue capacity reached".into(),
));
return;
}
// Append the incoming request to the end of the queue.
self.queue.push_back(req);
}
/// Take all the elements into a new queue and reset the current queue
/*pub fn take(&mut self) -> Self {
RequestQueue {
max_items: self.max_items,
queue: std::mem::take(&mut self.queue),
}
}*/
/// Checks whether the queue is full
///
/// # Returns
/// true if the queue is full, false otherwise
pub fn is_full(&self) -> bool {
self.queue.len() >= self.max_items
}
/// Checks whether the queue is empty
///
/// # Returns
/// true if the queue is empty, false otherwise
pub fn is_empty(&self) -> bool {
self.queue.is_empty()
}
}