#![warn(missing_docs)]
#![warn(unused_crate_dependencies)]
use api_trait::MassaApiServer;
use http::Method;
use jsonrpsee::core::{client::Error as JsonRpseeError, RpcResult};
use jsonrpsee::proc_macros::rpc;
use jsonrpsee::server::middleware::http::HostFilterLayer;
use jsonrpsee::server::{BatchRequestConfig, PingConfig, ServerBuilder, ServerHandle};
use jsonrpsee::RpcModule;
use massa_api_exports::execution::{
DeferredCallResponse, DeferredCallsQuoteRequest, DeferredCallsQuoteResponse,
DeferredCallsSlotResponse, Transfer,
};
use massa_api_exports::{
address::{AddressFilter, AddressInfo},
block::{BlockInfo, BlockSummary},
config::APIConfig,
datastore::{DatastoreEntryInput, DatastoreEntryOutput},
endorsement::EndorsementInfo,
error::ApiError::WrongAPI,
execution::{ExecuteReadOnlyResponse, ReadOnlyBytecodeExecution, ReadOnlyCall},
node::NodeStatus,
operation::{OperationInfo, OperationInput},
page::{PageRequest, PagedVec},
TimeInterval,
};
use massa_consensus_exports::{ConsensusBroadcasts, ConsensusController};
use massa_execution_exports::ExecutionController;
use massa_models::clique::Clique;
use massa_models::composite::PubkeySig;
use massa_models::node::NodeId;
use massa_models::operation::OperationId;
use massa_models::output_event::SCOutputEvent;
use massa_models::prehash::PreHashSet;
use massa_models::{
address::Address, block::Block, block_id::BlockId, endorsement::EndorsementId,
execution::EventFilter, slot::Slot, version::Version,
};
use massa_pool_exports::{PoolBroadcasts, PoolController};
use massa_pos_exports::SelectorController;
use massa_protocol_exports::{ProtocolConfig, ProtocolController};
use massa_storage::Storage;
use massa_versioning::keypair_factory::KeyPairFactory;
use massa_wallet::Wallet;
use parking_lot::RwLock;
use serde_json::Value;
use std::net::{IpAddr, SocketAddr};
use std::sync::{Arc, Condvar, Mutex};
use tower_http::cors::{Any, CorsLayer};
use tracing::{info, warn};
#[cfg(feature = "test-exports")]
use massa_channel as _;
#[cfg(feature = "test-exports")]
use massa_grpc as _;
mod api;
mod api_trait;
mod private;
mod public;
#[cfg(test)]
mod tests;
pub struct Public {
pub consensus_controller: Box<dyn ConsensusController>,
pub execution_controller: Box<dyn ExecutionController>,
pub selector_controller: Box<dyn SelectorController>,
pub pool_command_sender: Box<dyn PoolController>,
pub protocol_controller: Box<dyn ProtocolController>,
pub storage: Storage,
pub api_settings: APIConfig,
pub protocol_config: ProtocolConfig,
pub version: Version,
pub node_id: NodeId,
pub keypair_factory: KeyPairFactory,
}
pub struct Private {
pub protocol_controller: Box<dyn ProtocolController>,
pub execution_controller: Box<dyn ExecutionController>,
pub api_settings: APIConfig,
pub stop_cv: Arc<(Mutex<bool>, Condvar)>,
pub node_wallet: Arc<RwLock<Wallet>>,
}
pub struct ApiV2 {
pub consensus_controller: Box<dyn ConsensusController>,
pub consensus_broadcasts: ConsensusBroadcasts,
pub execution_controller: Box<dyn ExecutionController>,
pub pool_broadcasts: PoolBroadcasts,
pub api_settings: APIConfig,
pub version: Version,
}
pub struct API<T>(T);
#[async_trait::async_trait]
pub trait RpcServer: MassaRpcServer {
async fn serve(
self,
url: &SocketAddr,
api_config: &APIConfig,
) -> Result<StopHandle, JsonRpseeError>;
}
#[async_trait::async_trait]
pub trait ApiServer: MassaApiServer {
async fn serve(
self,
url: &SocketAddr,
api_config: &APIConfig,
) -> Result<StopHandle, JsonRpseeError>;
}
async fn serve<T>(
api: RpcModule<T>,
url: &SocketAddr,
api_config: &APIConfig,
) -> Result<StopHandle, JsonRpseeError> {
let ping_config = PingConfig::new().ping_interval(api_config.ping_interval.to_duration());
let mut server_builder = ServerBuilder::new()
.max_request_body_size(api_config.max_request_body_size)
.max_response_body_size(api_config.max_response_body_size)
.max_connections(api_config.max_connections)
.set_batch_request_config(if api_config.batch_request_limit > 0 {
BatchRequestConfig::Limit(api_config.batch_request_limit)
} else {
BatchRequestConfig::Disabled
})
.enable_ws_ping(ping_config);
if api_config.enable_http && !api_config.enable_ws {
server_builder = server_builder.http_only();
} else if api_config.enable_ws && !api_config.enable_http {
server_builder = server_builder.ws_only()
} else if !api_config.enable_http && !api_config.enable_ws {
panic!("wrong server configuration, you can't disable both http and ws");
}
let cors = CorsLayer::new()
.allow_methods([Method::POST, Method::OPTIONS])
.allow_origin(Any)
.allow_headers([http::header::CONTENT_TYPE]);
let hosts = if api_config.allow_hosts.is_empty() {
vec!["*:*"]
} else {
api_config
.allow_hosts
.iter()
.map(|hostname| hostname.as_str())
.collect()
};
let allowed_hosts = HostFilterLayer::new(hosts).expect("failed to build allowed hosts filter");
let middleware = tower::ServiceBuilder::new()
.layer(cors)
.layer(allowed_hosts);
let server = server_builder
.set_http_middleware(middleware)
.build(url)
.await
.expect("failed to build server");
let server_handler = server.start(api);
let stop_handler = StopHandle { server_handler };
Ok(stop_handler)
}
pub struct StopHandle {
server_handler: ServerHandle,
}
impl StopHandle {
pub async fn stop(self) {
match self.server_handler.stop() {
Ok(_) => {
info!("API stop signal sent successfully");
}
Err(err) => warn!("API thread panicked: {:?}", err),
}
self.server_handler.stopped().await;
}
}
#[rpc(server)]
pub trait MassaRpc {
#[method(name = "stop_node")]
fn stop_node(&self) -> RpcResult<()>;
#[method(name = "node_sign_message")]
async fn node_sign_message(&self, arg: Vec<u8>) -> RpcResult<PubkeySig>;
#[method(name = "add_staking_secret_keys")]
async fn add_staking_secret_keys(&self, arg: Vec<String>) -> RpcResult<()>;
#[method(name = "execute_read_only_bytecode")]
async fn execute_read_only_bytecode(
&self,
arg: Vec<ReadOnlyBytecodeExecution>,
) -> RpcResult<Vec<ExecuteReadOnlyResponse>>;
#[method(name = "execute_read_only_call")]
async fn execute_read_only_call(
&self,
arg: Vec<ReadOnlyCall>,
) -> RpcResult<Vec<ExecuteReadOnlyResponse>>;
#[method(name = "remove_staking_addresses")]
async fn remove_staking_addresses(&self, arg: Vec<Address>) -> RpcResult<()>;
#[method(name = "get_staking_addresses")]
async fn get_staking_addresses(&self) -> RpcResult<PreHashSet<Address>>;
#[method(name = "node_ban_by_ip")]
async fn node_ban_by_ip(&self, arg: Vec<IpAddr>) -> RpcResult<()>;
#[method(name = "node_ban_by_id")]
async fn node_ban_by_id(&self, arg: Vec<NodeId>) -> RpcResult<()>;
#[method(name = "node_peers_whitelist")]
async fn node_peers_whitelist(&self) -> RpcResult<Vec<IpAddr>>;
#[method(name = "node_add_to_peers_whitelist")]
async fn node_add_to_peers_whitelist(&self, arg: Vec<IpAddr>) -> RpcResult<()>;
#[method(name = "node_remove_from_peers_whitelist")]
async fn node_remove_from_peers_whitelist(&self, arg: Vec<IpAddr>) -> RpcResult<()>;
#[method(name = "node_bootstrap_whitelist")]
async fn node_bootstrap_whitelist(&self) -> RpcResult<Vec<IpAddr>>;
#[method(name = "node_bootstrap_whitelist_allow_all")]
async fn node_bootstrap_whitelist_allow_all(&self) -> RpcResult<()>;
#[method(name = "node_add_to_bootstrap_whitelist")]
async fn node_add_to_bootstrap_whitelist(&self, arg: Vec<IpAddr>) -> RpcResult<()>;
#[method(name = "node_remove_from_bootstrap_whitelist")]
async fn node_remove_from_bootstrap_whitelist(&self, arg: Vec<IpAddr>) -> RpcResult<()>;
#[method(name = "node_bootstrap_blacklist")]
async fn node_bootstrap_blacklist(&self) -> RpcResult<Vec<IpAddr>>;
#[method(name = "node_add_to_bootstrap_blacklist")]
async fn node_add_to_bootstrap_blacklist(&self, arg: Vec<IpAddr>) -> RpcResult<()>;
#[method(name = "node_remove_from_bootstrap_blacklist")]
async fn node_remove_from_bootstrap_blacklist(&self, arg: Vec<IpAddr>) -> RpcResult<()>;
#[method(name = "node_unban_by_ip")]
async fn node_unban_by_ip(&self, arg: Vec<IpAddr>) -> RpcResult<()>;
#[method(name = "node_unban_by_id")]
async fn node_unban_by_id(&self, arg: Vec<NodeId>) -> RpcResult<()>;
#[method(name = "get_status")]
async fn get_status(&self) -> RpcResult<NodeStatus>;
#[method(name = "get_cliques")]
async fn get_cliques(&self) -> RpcResult<Vec<Clique>>;
#[method(name = "get_stakers")]
async fn get_stakers(
&self,
page_request: Option<PageRequest>,
) -> RpcResult<PagedVec<(Address, u64)>>;
#[method(name = "get_operations")]
async fn get_operations(&self, arg: Vec<OperationId>) -> RpcResult<Vec<OperationInfo>>;
#[method(name = "get_endorsements")]
async fn get_endorsements(&self, arg: Vec<EndorsementId>) -> RpcResult<Vec<EndorsementInfo>>;
#[method(name = "get_blocks")]
async fn get_blocks(&self, arg: Vec<BlockId>) -> RpcResult<Vec<BlockInfo>>;
#[method(name = "get_blockclique_block_by_slot")]
async fn get_blockclique_block_by_slot(&self, arg: Slot) -> RpcResult<Option<Block>>;
#[method(name = "get_graph_interval")]
async fn get_graph_interval(&self, arg: TimeInterval) -> RpcResult<Vec<BlockSummary>>;
#[method(name = "get_datastore_entries")]
async fn get_datastore_entries(
&self,
arg: Vec<DatastoreEntryInput>,
) -> RpcResult<Vec<DatastoreEntryOutput>>;
#[method(name = "get_addresses")]
async fn get_addresses(&self, arg: Vec<Address>) -> RpcResult<Vec<AddressInfo>>;
#[method(name = "get_addresses_bytecode")]
async fn get_addresses_bytecode(&self, args: Vec<AddressFilter>) -> RpcResult<Vec<Vec<u8>>>;
#[method(name = "get_slots_transfers")]
async fn get_slots_transfers(&self, arg: Vec<Slot>) -> RpcResult<Vec<Vec<Transfer>>>;
#[method(name = "send_operations")]
async fn send_operations(&self, arg: Vec<OperationInput>) -> RpcResult<Vec<OperationId>>;
#[method(name = "get_filtered_sc_output_event")]
async fn get_filtered_sc_output_event(&self, arg: EventFilter)
-> RpcResult<Vec<SCOutputEvent>>;
#[method(name = "rpc.discover")]
async fn get_openrpc_spec(&self) -> RpcResult<Value>;
#[method(name = "get_deferred_call_quote")]
async fn get_deferred_call_quote(
&self,
arg: Vec<DeferredCallsQuoteRequest>,
) -> RpcResult<Vec<DeferredCallsQuoteResponse>>;
#[method(name = "get_deferred_call_info")]
async fn get_deferred_call_info(
&self,
arg: Vec<String>,
) -> RpcResult<Vec<DeferredCallResponse>>;
#[method(name = "get_deferred_call_ids_by_slot")]
async fn get_deferred_call_ids_by_slot(
&self,
arg: Vec<Slot>,
) -> RpcResult<Vec<DeferredCallsSlotResponse>>;
}
fn wrong_api<T>() -> RpcResult<T> {
Err((WrongAPI).into())
}