1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
use massa_proto_rs::massa::api::v1::{self as grpc_api};
#[cfg(feature = "execution-info")]
use tonic::Request;
use std::pin::Pin;
#[cfg(feature = "execution-info")]
use crate::{error::GrpcError, server::MassaPublicGrpc};
#[cfg(feature = "execution-info")]
use super::trait_filters_impl::{FilterGrpc, NewExecutionInfoFilter};
/// Type declaration for New execution Info server
pub type NewTransferInfoServerStreamType = Pin<
Box<
dyn futures_util::Stream<
Item = Result<grpc_api::NewTransfersInfoServerResponse, tonic::Status>,
> + Send
+ 'static,
>,
>;
#[cfg(feature = "execution-info")]
pub(crate) async fn new_transfer_info_server(
grpc: &MassaPublicGrpc,
request: Request<grpc_api::NewTransfersInfoServerRequest>,
) -> Result<NewTransferInfoServerStreamType, GrpcError> {
use std::time::Duration;
use tokio::{select, time};
use tracing::error;
// Create a channel to handle communication with the client
let (tx, rx) = tokio::sync::mpsc::channel(grpc.grpc_config.max_channel_size);
// Get the inner request
let request = request.into_inner();
// Subscribe to the new operations channel
let mut subscriber = grpc
.execution_channels
.slot_execution_info_sender
.subscribe();
// Clone grpc to be able to use it in the spawned task
let config = grpc.grpc_config.clone();
tokio::spawn(async move {
let filter = match NewExecutionInfoFilter::build_from_request(request.address, &config) {
Ok(filter) => filter,
Err(err) => {
error!("failed to get filter: {}", err);
// Send the error response back to the client
if let Err(e) = tx.send(Err(err.into())).await {
error!("failed to send back NewOperations error response: {}", e);
}
return;
}
};
// Create a timer that ticks every 10 seconds to check if the client is still connected
// otherwise the server has no way to check if client has disconnected (and can help to save some resources)
let mut interval = time::interval(Duration::from_secs(
config.unidirectional_stream_interval_check,
));
// Continuously loop until the stream ends or an error occurs
loop {
select! {
// Receive a new filled block from the subscriber
event = subscriber.recv() => {
match event {
Ok(massa_operation) => {
// Check if the operation should be sent
if let Some(data) = filter.filter_output(massa_operation, &config) {
// Send the new operation through the channel
if let Err(e) = tx
.send(Ok(grpc_api::NewTransfersInfoServerResponse::from(data)))
.await
{
error!("failed to send operation : {}", e);
break;
}
}
}
Err(e) => error!("error on receive new operation: {}", e)
}
},
// Execute the code block whenever the timer ticks
_ = interval.tick() => {
if tx.is_closed() {
// Client disconnected
break;
}
}
}
}
});
let out_stream = tokio_stream::wrappers::ReceiverStream::new(rx);
Ok(Box::pin(out_stream) as NewTransferInfoServerStreamType)
}