use crate::{
chain_head::{
api::ChainHeadApiServer,
chain_head_follow::ChainHeadFollower,
error::Error as ChainHeadRpcError,
event::{ChainHeadEvent, ChainHeadResult, ErrorEvent, FollowEvent, NetworkConfig},
subscription::{SubscriptionManagement, SubscriptionManagementError},
},
SubscriptionTaskExecutor,
};
use codec::Encode;
use futures::future::FutureExt;
use jsonrpsee::{
core::{async_trait, RpcResult},
types::{SubscriptionEmptyError, SubscriptionId, SubscriptionResult},
SubscriptionSink,
};
use log::debug;
use sc_client_api::{
Backend, BlockBackend, BlockchainEvents, CallExecutor, ChildInfo, ExecutorProvider, StorageKey,
StorageProvider,
};
use sp_api::CallApiAt;
use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata};
use sp_core::{hexdisplay::HexDisplay, storage::well_known_keys, traits::CallContext, Bytes};
use sp_runtime::traits::Block as BlockT;
use std::{marker::PhantomData, sync::Arc, time::Duration};
pub(crate) const LOG_TARGET: &str = "rpc-spec-v2";
pub struct ChainHead<BE: Backend<Block>, Block: BlockT, Client> {
client: Arc<Client>,
backend: Arc<BE>,
executor: SubscriptionTaskExecutor,
subscriptions: Arc<SubscriptionManagement<Block, BE>>,
genesis_hash: String,
_phantom: PhantomData<Block>,
}
impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
pub fn new<GenesisHash: AsRef<[u8]>>(
client: Arc<Client>,
backend: Arc<BE>,
executor: SubscriptionTaskExecutor,
genesis_hash: GenesisHash,
max_pinned_blocks: usize,
max_pinned_duration: Duration,
) -> Self {
let genesis_hash = format!("0x{:?}", HexDisplay::from(&genesis_hash.as_ref()));
Self {
client,
backend: backend.clone(),
executor,
subscriptions: Arc::new(SubscriptionManagement::new(
max_pinned_blocks,
max_pinned_duration,
backend,
)),
genesis_hash,
_phantom: PhantomData,
}
}
fn accept_subscription(
&self,
sink: &mut SubscriptionSink,
) -> Result<String, SubscriptionEmptyError> {
sink.accept()?;
let Some(sub_id) = sink.subscription_id() else {
return Err(SubscriptionEmptyError)
};
let sub_id = match sub_id {
SubscriptionId::Num(num) => num.to_string(),
SubscriptionId::Str(id) => id.into_owned().into(),
};
Ok(sub_id)
}
}
fn parse_hex_param(
sink: &mut SubscriptionSink,
param: String,
) -> Result<Vec<u8>, SubscriptionEmptyError> {
if param.is_empty() {
return Ok(Default::default())
}
match array_bytes::hex2bytes(¶m) {
Ok(bytes) => Ok(bytes),
Err(_) => {
let _ = sink.reject(ChainHeadRpcError::InvalidParam(param));
Err(SubscriptionEmptyError)
},
}
}
#[async_trait]
impl<BE, Block, Client> ChainHeadApiServer<Block::Hash> for ChainHead<BE, Block, Client>
where
Block: BlockT + 'static,
Block::Header: Unpin,
BE: Backend<Block> + 'static,
Client: BlockBackend<Block>
+ ExecutorProvider<Block>
+ HeaderBackend<Block>
+ HeaderMetadata<Block, Error = BlockChainError>
+ BlockchainEvents<Block>
+ CallApiAt<Block>
+ StorageProvider<Block, BE>
+ 'static,
{
fn chain_head_unstable_follow(
&self,
mut sink: SubscriptionSink,
with_runtime: bool,
) -> SubscriptionResult {
let sub_id = match self.accept_subscription(&mut sink) {
Ok(sub_id) => sub_id,
Err(err) => {
sink.close(ChainHeadRpcError::InvalidSubscriptionID);
return Err(err)
},
};
let Some(rx_stop) = self.subscriptions.insert_subscription(sub_id.clone(), with_runtime)
else {
debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription already accepted", sub_id);
let _ = sink.send(&FollowEvent::<Block::Hash>::Stop);
return Ok(())
};
debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription accepted", sub_id);
let subscriptions = self.subscriptions.clone();
let backend = self.backend.clone();
let client = self.client.clone();
let fut = async move {
let mut chain_head_follow = ChainHeadFollower::new(
client,
backend,
subscriptions.clone(),
with_runtime,
sub_id.clone(),
);
chain_head_follow.generate_events(sink, rx_stop).await;
subscriptions.remove_subscription(&sub_id);
debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription removed", sub_id);
};
self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
Ok(())
}
fn chain_head_unstable_body(
&self,
mut sink: SubscriptionSink,
follow_subscription: String,
hash: Block::Hash,
_network_config: Option<NetworkConfig>,
) -> SubscriptionResult {
let client = self.client.clone();
let subscriptions = self.subscriptions.clone();
let block_guard = match subscriptions.lock_block(&follow_subscription, hash) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
let _ = sink.send(&ChainHeadEvent::<String>::Disjoint);
return Ok(())
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
let _ = sink.reject(ChainHeadRpcError::InvalidBlock);
return Ok(())
},
Err(error) => {
let _ = sink.send(&ChainHeadEvent::<String>::Error(ErrorEvent {
error: error.to_string(),
}));
return Ok(())
},
};
let fut = async move {
let _block_guard = block_guard;
let event = match client.block(hash) {
Ok(Some(signed_block)) => {
let extrinsics = signed_block.block.extrinsics();
let result = format!("0x{:?}", HexDisplay::from(&extrinsics.encode()));
ChainHeadEvent::Done(ChainHeadResult { result })
},
Ok(None) => {
debug!(
target: LOG_TARGET,
"[body][id={:?}] Stopping subscription because hash={:?} was pruned",
&follow_subscription,
hash
);
subscriptions.remove_subscription(&follow_subscription);
ChainHeadEvent::<String>::Disjoint
},
Err(error) => ChainHeadEvent::Error(ErrorEvent { error: error.to_string() }),
};
let _ = sink.send(&event);
};
self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
Ok(())
}
fn chain_head_unstable_header(
&self,
follow_subscription: String,
hash: Block::Hash,
) -> RpcResult<Option<String>> {
let _block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
return Ok(None)
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
return Err(ChainHeadRpcError::InvalidBlock.into())
},
Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
};
self.client
.header(hash)
.map(|opt_header| opt_header.map(|h| format!("0x{:?}", HexDisplay::from(&h.encode()))))
.map_err(ChainHeadRpcError::FetchBlockHeader)
.map_err(Into::into)
}
fn chain_head_unstable_genesis_hash(&self) -> RpcResult<String> {
Ok(self.genesis_hash.clone())
}
fn chain_head_unstable_storage(
&self,
mut sink: SubscriptionSink,
follow_subscription: String,
hash: Block::Hash,
key: String,
child_key: Option<String>,
_network_config: Option<NetworkConfig>,
) -> SubscriptionResult {
let key = StorageKey(parse_hex_param(&mut sink, key)?);
let child_key = child_key
.map(|child_key| parse_hex_param(&mut sink, child_key))
.transpose()?
.map(ChildInfo::new_default_from_vec);
let client = self.client.clone();
let subscriptions = self.subscriptions.clone();
let block_guard = match subscriptions.lock_block(&follow_subscription, hash) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
let _ = sink.send(&ChainHeadEvent::<String>::Disjoint);
return Ok(())
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
let _ = sink.reject(ChainHeadRpcError::InvalidBlock);
return Ok(())
},
Err(error) => {
let _ = sink.send(&ChainHeadEvent::<String>::Error(ErrorEvent {
error: error.to_string(),
}));
return Ok(())
},
};
let fut = async move {
let _block_guard = block_guard;
if let Some(child_key) = child_key {
if well_known_keys::is_default_child_storage_key(child_key.storage_key()) ||
well_known_keys::is_child_storage_key(child_key.storage_key())
{
let _ = sink
.send(&ChainHeadEvent::Done(ChainHeadResult { result: None::<String> }));
return
}
let res = client
.child_storage(hash, &child_key, &key)
.map(|result| {
let result =
result.map(|storage| format!("0x{:?}", HexDisplay::from(&storage.0)));
ChainHeadEvent::Done(ChainHeadResult { result })
})
.unwrap_or_else(|error| {
ChainHeadEvent::Error(ErrorEvent { error: error.to_string() })
});
let _ = sink.send(&res);
return
}
if well_known_keys::is_default_child_storage_key(&key.0) ||
well_known_keys::is_child_storage_key(&key.0)
{
let _ =
sink.send(&ChainHeadEvent::Done(ChainHeadResult { result: None::<String> }));
return
}
let res = client
.storage(hash, &key)
.map(|result| {
let result =
result.map(|storage| format!("0x{:?}", HexDisplay::from(&storage.0)));
ChainHeadEvent::Done(ChainHeadResult { result })
})
.unwrap_or_else(|error| {
ChainHeadEvent::Error(ErrorEvent { error: error.to_string() })
});
let _ = sink.send(&res);
};
self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
Ok(())
}
fn chain_head_unstable_call(
&self,
mut sink: SubscriptionSink,
follow_subscription: String,
hash: Block::Hash,
function: String,
call_parameters: String,
_network_config: Option<NetworkConfig>,
) -> SubscriptionResult {
let call_parameters = Bytes::from(parse_hex_param(&mut sink, call_parameters)?);
let client = self.client.clone();
let subscriptions = self.subscriptions.clone();
let block_guard = match subscriptions.lock_block(&follow_subscription, hash) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
let _ = sink.send(&ChainHeadEvent::<String>::Disjoint);
return Ok(())
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
let _ = sink.reject(ChainHeadRpcError::InvalidBlock);
return Ok(())
},
Err(error) => {
let _ = sink.send(&ChainHeadEvent::<String>::Error(ErrorEvent {
error: error.to_string(),
}));
return Ok(())
},
};
let fut = async move {
if !block_guard.has_runtime() {
let _ = sink.reject(ChainHeadRpcError::InvalidParam(
"The runtime updates flag must be set".into(),
));
return
}
let res = client
.executor()
.call(hash, &function, &call_parameters, CallContext::Offchain)
.map(|result| {
let result = format!("0x{:?}", HexDisplay::from(&result));
ChainHeadEvent::Done(ChainHeadResult { result })
})
.unwrap_or_else(|error| {
ChainHeadEvent::Error(ErrorEvent { error: error.to_string() })
});
let _ = sink.send(&res);
};
self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
Ok(())
}
fn chain_head_unstable_unpin(
&self,
follow_subscription: String,
hash: Block::Hash,
) -> RpcResult<()> {
match self.subscriptions.unpin_block(&follow_subscription, hash) {
Ok(()) => Ok(()),
Err(SubscriptionManagementError::SubscriptionAbsent) => {
Ok(())
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
Err(ChainHeadRpcError::InvalidBlock.into())
},
Err(_) => Err(ChainHeadRpcError::InvalidBlock.into()),
}
}
}