1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
// Copyright (c) 2023 MASSA LABS <info@massa.net>
use crate::{error::GrpcError, server::MassaPublicGrpc};
use futures_util::StreamExt;
use massa_proto_rs::massa::api::v1 as grpc_api;
use std::pin::Pin;
use std::time::Duration;
use tokio::{select, time};
use tracing::error;
/// default throughput interval in seconds
///
/// set 'high' value to avoid spamming the client with updates who doesn't need
///
/// end user can override this value by sending a request with a custom interval
const DEFAULT_THROUGHPUT_INTERVAL: u64 = 10;
/// Type declaration for TransactionsThroughput
pub type TransactionsThroughputStreamType = Pin<
Box<
dyn futures_util::Stream<
Item = Result<grpc_api::TransactionsThroughputResponse, tonic::Status>,
> + Send
+ 'static,
>,
>;
/// The function returns a stream of transaction throughput statistics
pub(crate) async fn transactions_throughput(
grpc: &MassaPublicGrpc,
request: tonic::Request<tonic::Streaming<grpc_api::TransactionsThroughputRequest>>,
) -> Result<TransactionsThroughputStreamType, GrpcError> {
let execution_controller = grpc.execution_controller.clone();
// Create a channel for sending responses to the client
let (tx, rx) = tokio::sync::mpsc::channel(grpc.grpc_config.max_channel_size);
// Extract the incoming stream of operations messages
let mut in_stream = request.into_inner();
// Spawn a new Tokio task to handle the stream processing
tokio::spawn(async move {
let mut interval = time::interval(Duration::from_secs(DEFAULT_THROUGHPUT_INTERVAL));
// Continuously loop until the stream ends or an error occurs
loop {
select! {
// Receive a new message from the in_stream
res = in_stream.next() => {
match res {
Some(Ok(req)) => {
// Update the interval timer based on the request (or use the default)
let new_timer = req.interval.unwrap_or(DEFAULT_THROUGHPUT_INTERVAL);
interval = time::interval(Duration::from_secs(new_timer));
interval.reset();
},
_ => {
// Client disconnected
break;
}
}
},
// Execute the code block whenever the timer ticks
_ = interval.tick() => {
let stats = execution_controller.get_stats();
// Calculate the throughput over the time window
let nb_sec_range = stats
.time_window_end
.saturating_sub(stats.time_window_start)
.to_duration()
.as_secs();
let throughput = stats
.final_executed_operations_count
.checked_div(nb_sec_range as usize)
.unwrap_or_default() as u32;
// Send the throughput response back to the client
if let Err(e) = tx
.send(Ok(grpc_api::TransactionsThroughputResponse {
throughput,
}))
.await
{
// Log an error if sending the response fails
error!("failed to send back transactions_throughput response: {}", e);
break;
}
}
}
}
});
let out_stream = tokio_stream::wrappers::ReceiverStream::new(rx);
Ok(Box::pin(out_stream) as TransactionsThroughputStreamType)
}