use crate::chain_head::{
chain_head::LOG_TARGET,
event::{
BestBlockChanged, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent,
RuntimeVersionEvent,
},
subscription::{SubscriptionManagement, SubscriptionManagementError},
};
use futures::{
channel::oneshot,
stream::{self, Stream, StreamExt},
};
use futures_util::future::Either;
use jsonrpsee::SubscriptionSink;
use log::{debug, error};
use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, FinalityNotification,
};
use sp_api::CallApiAt;
use sp_blockchain::{
Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata, Info,
};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
use std::{collections::HashSet, sync::Arc};
pub struct ChainHeadFollower<BE: Backend<Block>, Block: BlockT, Client> {
client: Arc<Client>,
backend: Arc<BE>,
sub_handle: Arc<SubscriptionManagement<Block, BE>>,
with_runtime: bool,
sub_id: String,
best_block_cache: Option<Block::Hash>,
}
impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Client> {
pub fn new(
client: Arc<Client>,
backend: Arc<BE>,
sub_handle: Arc<SubscriptionManagement<Block, BE>>,
with_runtime: bool,
sub_id: String,
) -> Self {
Self { client, backend, sub_handle, with_runtime, sub_id, best_block_cache: None }
}
}
enum NotificationType<Block: BlockT> {
InitialEvents(Vec<FollowEvent<Block::Hash>>),
NewBlock(BlockImportNotification<Block>),
Finalized(FinalityNotification<Block>),
}
#[derive(Clone, Debug)]
struct InitialBlocks<Block: BlockT> {
finalized_block_descendants: Vec<(Block::Hash, Block::Hash)>,
pruned_forks: HashSet<Block::Hash>,
}
struct StartupPoint<Block: BlockT> {
pub best_hash: Block::Hash,
pub finalized_hash: Block::Hash,
pub finalized_number: NumberFor<Block>,
}
impl<Block: BlockT> From<Info<Block>> for StartupPoint<Block> {
fn from(info: Info<Block>) -> Self {
StartupPoint::<Block> {
best_hash: info.best_hash,
finalized_hash: info.finalized_hash,
finalized_number: info.finalized_number,
}
}
}
impl<BE, Block, Client> ChainHeadFollower<BE, Block, Client>
where
Block: BlockT + 'static,
BE: Backend<Block> + 'static,
Client: BlockBackend<Block>
+ HeaderBackend<Block>
+ HeaderMetadata<Block, Error = BlockChainError>
+ BlockchainEvents<Block>
+ CallApiAt<Block>
+ 'static,
{
fn generate_runtime_event(
&self,
block: Block::Hash,
parent: Option<Block::Hash>,
) -> Option<RuntimeEvent> {
if !self.with_runtime {
return None
}
let block_rt = match self.client.runtime_version_at(block) {
Ok(rt) => rt,
Err(err) => return Some(err.into()),
};
let parent = match parent {
Some(parent) => parent,
None => return Some(RuntimeEvent::Valid(RuntimeVersionEvent { spec: block_rt })),
};
let parent_rt = match self.client.runtime_version_at(parent) {
Ok(rt) => rt,
Err(err) => return Some(err.into()),
};
if block_rt != parent_rt {
Some(RuntimeEvent::Valid(RuntimeVersionEvent { spec: block_rt }))
} else {
None
}
}
fn get_init_blocks_with_forks(
&self,
startup_point: &StartupPoint<Block>,
) -> Result<InitialBlocks<Block>, SubscriptionManagementError> {
let blockchain = self.backend.blockchain();
let leaves = blockchain.leaves()?;
let finalized = startup_point.finalized_hash;
let mut pruned_forks = HashSet::new();
let mut finalized_block_descendants = Vec::new();
let mut unique_descendants = HashSet::new();
for leaf in leaves {
let tree_route = sp_blockchain::tree_route(blockchain, finalized, leaf)?;
let blocks = tree_route.enacted().iter().map(|block| block.hash);
if !tree_route.retracted().is_empty() {
pruned_forks.extend(blocks);
} else {
let parents = std::iter::once(finalized).chain(blocks.clone());
for pair in blocks.zip(parents) {
if unique_descendants.insert(pair) {
finalized_block_descendants.push(pair);
}
}
}
}
Ok(InitialBlocks { finalized_block_descendants, pruned_forks })
}
fn generate_init_events(
&mut self,
startup_point: &StartupPoint<Block>,
) -> Result<(Vec<FollowEvent<Block::Hash>>, HashSet<Block::Hash>), SubscriptionManagementError>
{
let init = self.get_init_blocks_with_forks(startup_point)?;
let initial_blocks = init.finalized_block_descendants;
let finalized_block_hash = startup_point.finalized_hash;
self.sub_handle.pin_block(&self.sub_id, finalized_block_hash)?;
let finalized_block_runtime = self.generate_runtime_event(finalized_block_hash, None);
let initialized_event = FollowEvent::Initialized(Initialized {
finalized_block_hash,
finalized_block_runtime,
with_runtime: self.with_runtime,
});
let mut finalized_block_descendants = Vec::with_capacity(initial_blocks.len() + 1);
finalized_block_descendants.push(initialized_event);
for (child, parent) in initial_blocks.into_iter() {
self.sub_handle.pin_block(&self.sub_id, child)?;
let new_runtime = self.generate_runtime_event(child, Some(parent));
let event = FollowEvent::NewBlock(NewBlock {
block_hash: child,
parent_block_hash: parent,
new_runtime,
with_runtime: self.with_runtime,
});
finalized_block_descendants.push(event);
}
let best_block_hash = startup_point.best_hash;
if best_block_hash != finalized_block_hash {
let best_block = FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash });
self.best_block_cache = Some(best_block_hash);
finalized_block_descendants.push(best_block);
};
Ok((finalized_block_descendants, init.pruned_forks))
}
fn generate_import_events(
&mut self,
block_hash: Block::Hash,
parent_block_hash: Block::Hash,
is_best_block: bool,
) -> Vec<FollowEvent<Block::Hash>> {
let new_runtime = self.generate_runtime_event(block_hash, Some(parent_block_hash));
let new_block = FollowEvent::NewBlock(NewBlock {
block_hash,
parent_block_hash,
new_runtime,
with_runtime: self.with_runtime,
});
if !is_best_block {
return vec![new_block]
}
let best_block_event =
FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash: block_hash });
match self.best_block_cache {
Some(block_cache) => {
if block_cache != block_hash {
self.best_block_cache = Some(block_hash);
vec![new_block, best_block_event]
} else {
vec![new_block]
}
},
None => {
self.best_block_cache = Some(block_hash);
vec![new_block, best_block_event]
},
}
}
fn handle_import_blocks(
&mut self,
notification: BlockImportNotification<Block>,
startup_point: &StartupPoint<Block>,
) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError> {
if !self.sub_handle.pin_block(&self.sub_id, notification.hash)? {
return Ok(Default::default())
}
if *notification.header.number() < startup_point.finalized_number {
return Ok(Default::default())
}
Ok(self.generate_import_events(
notification.hash,
*notification.header.parent_hash(),
notification.is_new_best,
))
}
fn generate_finalized_events(
&mut self,
finalized_block_hashes: &[Block::Hash],
) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError> {
let mut events = Vec::new();
let Some(first_hash) = finalized_block_hashes.get(0) else { return Ok(Default::default()) };
let Some(first_header) = self.client.header(*first_hash)? else {
return Err(SubscriptionManagementError::BlockHeaderAbsent)
};
let parents =
std::iter::once(first_header.parent_hash()).chain(finalized_block_hashes.iter());
for (i, (hash, parent)) in finalized_block_hashes.iter().zip(parents).enumerate() {
if !self.sub_handle.pin_block(&self.sub_id, *hash)? {
continue
}
if i + 1 != finalized_block_hashes.len() {
events.extend(self.generate_import_events(*hash, *parent, false));
} else {
if let Some(best_block_hash) = self.best_block_cache {
let ancestor = sp_blockchain::lowest_common_ancestor(
&*self.client,
*hash,
best_block_hash,
)?;
if ancestor.hash == *hash {
return Err(SubscriptionManagementError::Custom(
"A descendent of the finalized block was already reported".into(),
))
}
}
events.extend(self.generate_import_events(*hash, *parent, true))
}
}
Ok(events)
}
fn get_pruned_hashes(
&self,
stale_heads: &[Block::Hash],
last_finalized: Block::Hash,
to_ignore: &mut HashSet<Block::Hash>,
) -> Result<Vec<Block::Hash>, SubscriptionManagementError> {
let blockchain = self.backend.blockchain();
let mut pruned = Vec::new();
for stale_head in stale_heads {
let tree_route = sp_blockchain::tree_route(blockchain, last_finalized, *stale_head)?;
pruned.extend(tree_route.enacted().iter().filter_map(|block| {
if !to_ignore.remove(&block.hash) {
Some(block.hash)
} else {
None
}
}))
}
Ok(pruned)
}
fn handle_finalized_blocks(
&mut self,
notification: FinalityNotification<Block>,
to_ignore: &mut HashSet<Block::Hash>,
startup_point: &StartupPoint<Block>,
) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError> {
let last_finalized = notification.hash;
if *notification.header.number() < startup_point.finalized_number {
return Ok(Default::default())
}
let mut finalized_block_hashes = notification.tree_route.to_vec();
finalized_block_hashes.push(last_finalized);
let mut events = self.generate_finalized_events(&finalized_block_hashes)?;
let pruned_block_hashes =
self.get_pruned_hashes(¬ification.stale_heads, last_finalized, to_ignore)?;
let finalized_event = FollowEvent::Finalized(Finalized {
finalized_block_hashes,
pruned_block_hashes: pruned_block_hashes.clone(),
});
match self.best_block_cache {
Some(block_cache) => {
if !pruned_block_hashes.iter().any(|hash| *hash == block_cache) {
events.push(finalized_event);
return Ok(events)
}
let best_block_hash = self.client.info().best_hash;
if best_block_hash == block_cache {
error!(
target: LOG_TARGET,
"[follow][id={:?}] Client does not contain different best block",
self.sub_id,
);
events.push(finalized_event);
Ok(events)
} else {
self.best_block_cache = Some(best_block_hash);
let best_block_event =
FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash });
events.extend([best_block_event, finalized_event]);
Ok(events)
}
},
None => {
events.push(finalized_event);
Ok(events)
},
}
}
async fn submit_events<EventStream>(
&mut self,
startup_point: &StartupPoint<Block>,
mut stream: EventStream,
mut to_ignore: HashSet<Block::Hash>,
mut sink: SubscriptionSink,
rx_stop: oneshot::Receiver<()>,
) where
EventStream: Stream<Item = NotificationType<Block>> + Unpin,
{
let mut stream_item = stream.next();
let mut stop_event = rx_stop;
while let Either::Left((Some(event), next_stop_event)) =
futures_util::future::select(stream_item, stop_event).await
{
let events = match event {
NotificationType::InitialEvents(events) => Ok(events),
NotificationType::NewBlock(notification) =>
self.handle_import_blocks(notification, &startup_point),
NotificationType::Finalized(notification) =>
self.handle_finalized_blocks(notification, &mut to_ignore, &startup_point),
};
let events = match events {
Ok(events) => events,
Err(err) => {
debug!(
target: LOG_TARGET,
"[follow][id={:?}] Failed to handle stream notification {:?}",
self.sub_id,
err
);
let _ = sink.send(&FollowEvent::<String>::Stop);
return
},
};
for event in events {
let result = sink.send(&event);
if let Err(err) = result {
debug!(
target: LOG_TARGET,
"[follow][id={:?}] Failed to send event {:?}", self.sub_id, err
);
let _ = sink.send(&FollowEvent::<String>::Stop);
return
}
if let Ok(false) = result {
return
}
}
stream_item = stream.next();
stop_event = next_stop_event;
}
let _ = sink.send(&FollowEvent::<String>::Stop);
}
pub async fn generate_events(
&mut self,
mut sink: SubscriptionSink,
rx_stop: oneshot::Receiver<()>,
) {
let stream_import = self
.client
.import_notification_stream()
.map(|notification| NotificationType::NewBlock(notification));
let stream_finalized = self
.client
.finality_notification_stream()
.map(|notification| NotificationType::Finalized(notification));
let startup_point = StartupPoint::from(self.client.info());
let (initial_events, pruned_forks) = match self.generate_init_events(&startup_point) {
Ok(blocks) => blocks,
Err(err) => {
debug!(
target: LOG_TARGET,
"[follow][id={:?}] Failed to generate the initial events {:?}",
self.sub_id,
err
);
let _ = sink.send(&FollowEvent::<Block::Hash>::Stop);
return
},
};
let initial = NotificationType::InitialEvents(initial_events);
let merged = tokio_stream::StreamExt::merge(stream_import, stream_finalized);
let stream = stream::once(futures::future::ready(initial)).chain(merged);
self.submit_events(&startup_point, stream.boxed(), pruned_forks, sink, rx_stop)
.await;
}
}