1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
// Copyright (c) 2023 MASSA LABS <info@massa.net>
use crate::{error::GrpcError, server::MassaPublicGrpc};
use futures_util::StreamExt;
use massa_proto_rs::massa::api::v1 as grpc_api;
use std::pin::Pin;
use std::time::Duration;
use tokio::{select, time};
use tracing::error;
/// default throughput interval in seconds
///
/// set 'high' value to avoid spamming the client with updates who doesn't need
///
/// end user can override this value by sending a request with a custom interval
const DEFAULT_THROUGHPUT_INTERVAL: u64 = 10;
/// Type declaration for TransactionsThroughput
pub type TransactionsThroughputStreamType = Pin<
Box<
dyn futures_util::Stream<
Item = Result<grpc_api::TransactionsThroughputResponse, tonic::Status>,
> + Send
+ 'static,
>,
>;
/// Type declaration for TransactionsThroughput
pub type TransactionsThroughputServerStreamType = Pin<
Box<
dyn futures_util::Stream<
Item = Result<grpc_api::TransactionsThroughputServerResponse, tonic::Status>,
> + Send
+ 'static,
>,
>;
/// The function returns a stream of transaction throughput statistics
pub(crate) async fn transactions_throughput(
grpc: &MassaPublicGrpc,
request: tonic::Request<tonic::Streaming<grpc_api::TransactionsThroughputRequest>>,
) -> Result<TransactionsThroughputStreamType, GrpcError> {
let execution_controller = grpc.execution_controller.clone();
// Create a channel for sending responses to the client
let (tx, rx) = tokio::sync::mpsc::channel(grpc.grpc_config.max_channel_size);
// Extract the incoming stream of operations messages
let mut in_stream = request.into_inner();
// Spawn a new Tokio task to handle the stream processing
tokio::spawn(async move {
let mut interval = time::interval(Duration::from_secs(DEFAULT_THROUGHPUT_INTERVAL));
// Continuously loop until the stream ends or an error occurs
loop {
select! {
// Receive a new message from the in_stream
res = in_stream.next() => {
match res {
Some(Ok(req)) => {
// Update the interval timer based on the request (or use the default)
let new_timer = req.interval.unwrap_or(DEFAULT_THROUGHPUT_INTERVAL);
interval = time::interval(Duration::from_secs(new_timer));
interval.reset();
},
_ => {
// Client disconnected
break;
}
}
},
// Execute the code block whenever the timer ticks
_ = interval.tick() => {
let stats = execution_controller.get_stats();
// Calculate the throughput over the time window
let nb_sec_range = stats
.time_window_end
.saturating_sub(stats.time_window_start)
.to_duration()
.as_secs();
let throughput = stats
.final_executed_operations_count
.checked_div(nb_sec_range as usize)
.unwrap_or_default() as u32;
// Send the throughput response back to the client
if let Err(e) = tx
.send(Ok(grpc_api::TransactionsThroughputResponse {
throughput,
}))
.await
{
// Log an error if sending the response fails
error!("failed to send back transactions_throughput response: {}", e);
break;
}
}
}
}
});
let out_stream = tokio_stream::wrappers::ReceiverStream::new(rx);
Ok(Box::pin(out_stream) as TransactionsThroughputStreamType)
}
/// The function returns a stream unidirectional of transaction throughput statistics
pub(crate) async fn transactions_throughput_server(
grpc: &MassaPublicGrpc,
request: tonic::Request<grpc_api::TransactionsThroughputServerRequest>,
) -> Result<TransactionsThroughputServerStreamType, GrpcError> {
let execution_controller = grpc.execution_controller.clone();
// Create a channel for sending responses to the client
let (tx, rx) = tokio::sync::mpsc::channel(grpc.grpc_config.max_channel_size);
// Extract the incoming stream of operations messages
let request = request.into_inner();
// Spawn a new Tokio task to handle the stream processing
tokio::spawn(async move {
let mut interval = if let Some(interval_user) = request.interval {
time::interval(Duration::from_secs(interval_user))
} else {
time::interval(Duration::from_secs(DEFAULT_THROUGHPUT_INTERVAL))
};
// Continuously loop until the stream ends or an error occurs
loop {
// Execute the code block whenever the timer ticks
interval.tick().await;
let stats = execution_controller.get_stats();
// Calculate the throughput over the time window
let nb_sec_range = stats
.time_window_end
.saturating_sub(stats.time_window_start)
.to_duration()
.as_secs();
let throughput = stats
.final_executed_operations_count
.checked_div(nb_sec_range as usize)
.unwrap_or_default() as u32;
// Send the throughput response back to the client
if let Err(e) = tx
.send(Ok(grpc_api::TransactionsThroughputServerResponse {
throughput,
}))
.await
{
// Log an error if sending the response fails
error!(
"failed to send back transactions_throughput response: {}",
e
);
break;
}
}
});
let out_stream = tokio_stream::wrappers::ReceiverStream::new(rx);
Ok(Box::pin(out_stream) as TransactionsThroughputServerStreamType)
}