1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
// Copyright (c) 2023 MASSA LABS <info@massa.net>

use crate::{error::GrpcError, server::MassaPublicGrpc};
use futures_util::StreamExt;
use massa_proto_rs::massa::api::v1 as grpc_api;
use std::pin::Pin;
use std::time::Duration;
use tokio::{select, time};
use tracing::error;

/// default throughput interval in seconds
///
/// set 'high' value to avoid spamming the client with updates who doesn't need
///
/// end user can override this value by sending a request with a custom interval
const DEFAULT_THROUGHPUT_INTERVAL: u64 = 10;

/// Type declaration for TransactionsThroughput
pub type TransactionsThroughputStreamType = Pin<
    Box<
        dyn futures_util::Stream<
                Item = Result<grpc_api::TransactionsThroughputResponse, tonic::Status>,
            > + Send
            + 'static,
    >,
>;

/// Type declaration for TransactionsThroughput
pub type TransactionsThroughputServerStreamType = Pin<
    Box<
        dyn futures_util::Stream<
                Item = Result<grpc_api::TransactionsThroughputServerResponse, tonic::Status>,
            > + Send
            + 'static,
    >,
>;

/// The function returns a stream of transaction throughput statistics
pub(crate) async fn transactions_throughput(
    grpc: &MassaPublicGrpc,
    request: tonic::Request<tonic::Streaming<grpc_api::TransactionsThroughputRequest>>,
) -> Result<TransactionsThroughputStreamType, GrpcError> {
    let execution_controller = grpc.execution_controller.clone();

    // Create a channel for sending responses to the client
    let (tx, rx) = tokio::sync::mpsc::channel(grpc.grpc_config.max_channel_size);
    // Extract the incoming stream of operations messages
    let mut in_stream = request.into_inner();

    // Spawn a new Tokio task to handle the stream processing
    tokio::spawn(async move {
        let mut interval = time::interval(Duration::from_secs(DEFAULT_THROUGHPUT_INTERVAL));

        // Continuously loop until the stream ends or an error occurs
        loop {
            select! {
                // Receive a new message from the in_stream
                res = in_stream.next() => {
                    match res {
                        Some(Ok(req)) => {
                            // Update the interval timer based on the request (or use the default)
                            let new_timer = req.interval.unwrap_or(DEFAULT_THROUGHPUT_INTERVAL);
                            interval = time::interval(Duration::from_secs(new_timer));
                            interval.reset();
                        },
                        _ => {
                            // Client disconnected
                            break;
                        }
                    }
                },
                // Execute the code block whenever the timer ticks
                _ = interval.tick() => {
                    let stats = execution_controller.get_stats();
                    // Calculate the throughput over the time window
                    let nb_sec_range = stats
                        .time_window_end
                        .saturating_sub(stats.time_window_start)
                        .to_duration()
                        .as_secs();
                    let throughput = stats
                        .final_executed_operations_count
                        .checked_div(nb_sec_range as usize)
                        .unwrap_or_default() as u32;
                    // Send the throughput response back to the client
                    if let Err(e) = tx
                        .send(Ok(grpc_api::TransactionsThroughputResponse {
                            throughput,
                        }))
                        .await
                    {
                        // Log an error if sending the response fails
                        error!("failed to send back transactions_throughput response: {}", e);
                        break;
                    }
                }
            }
        }
    });

    let out_stream = tokio_stream::wrappers::ReceiverStream::new(rx);
    Ok(Box::pin(out_stream) as TransactionsThroughputStreamType)
}

/// The function returns a stream unidirectional of transaction throughput statistics
pub(crate) async fn transactions_throughput_server(
    grpc: &MassaPublicGrpc,
    request: tonic::Request<grpc_api::TransactionsThroughputServerRequest>,
) -> Result<TransactionsThroughputServerStreamType, GrpcError> {
    let execution_controller = grpc.execution_controller.clone();

    // Create a channel for sending responses to the client
    let (tx, rx) = tokio::sync::mpsc::channel(grpc.grpc_config.max_channel_size);
    // Extract the incoming stream of operations messages
    let request = request.into_inner();

    // Spawn a new Tokio task to handle the stream processing
    tokio::spawn(async move {
        let mut interval = if let Some(interval_user) = request.interval {
            time::interval(Duration::from_secs(interval_user))
        } else {
            time::interval(Duration::from_secs(DEFAULT_THROUGHPUT_INTERVAL))
        };

        // Continuously loop until the stream ends or an error occurs
        loop {
            // Execute the code block whenever the timer ticks
            interval.tick().await;

            let stats = execution_controller.get_stats();
            // Calculate the throughput over the time window
            let nb_sec_range = stats
                .time_window_end
                .saturating_sub(stats.time_window_start)
                .to_duration()
                .as_secs();
            let throughput = stats
                .final_executed_operations_count
                .checked_div(nb_sec_range as usize)
                .unwrap_or_default() as u32;
            // Send the throughput response back to the client
            if let Err(e) = tx
                .send(Ok(grpc_api::TransactionsThroughputServerResponse {
                    throughput,
                }))
                .await
            {
                // Log an error if sending the response fails
                error!(
                    "failed to send back transactions_throughput response: {}",
                    e
                );
                break;
            }
        }
    });

    let out_stream = tokio_stream::wrappers::ReceiverStream::new(rx);
    Ok(Box::pin(out_stream) as TransactionsThroughputServerStreamType)
}