use crate::{
config::{self, NonReservedPeerMode},
error,
types::ProtocolName,
ReputationChange,
};
use bytes::Bytes;
use codec::{DecodeAll, Encode};
use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
use libp2p::{
core::Endpoint,
swarm::{
behaviour::FromSwarm, ConnectionDenied, ConnectionId, NetworkBehaviour, PollParameters,
THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
},
Multiaddr, PeerId,
};
use log::{debug, error, warn};
use sc_network_common::{role::Roles, sync::message::BlockAnnouncesHandshake};
use sc_utils::mpsc::TracingUnboundedSender;
use sp_runtime::traits::Block as BlockT;
use std::{
collections::{HashMap, HashSet},
future::Future,
iter,
pin::Pin,
task::Poll,
};
use message::{generic::Message as GenericMessage, Message};
use notifications::{Notifications, NotificationsOut};
pub use notifications::{NotificationsSink, NotifsHandlerError, Ready};
mod notifications;
pub mod message;
pub(crate) const BLOCK_ANNOUNCES_TRANSACTIONS_SUBSTREAM_SIZE: u64 = 16 * 1024 * 1024;
const HARDCODED_PEERSETS_SYNC: crate::peerset::SetId = crate::peerset::SetId::from(0);
const NUM_HARDCODED_PEERSETS: usize = 1;
mod rep {
use crate::ReputationChange as Rep;
pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
}
type PendingSyncSubstreamValidation =
Pin<Box<dyn Future<Output = Result<(PeerId, Roles), PeerId>> + Send>>;
pub struct Protocol<B: BlockT> {
peerset_handle: crate::peerset::PeersetHandle,
behaviour: Notifications,
notification_protocols: Vec<ProtocolName>,
bad_handshake_substreams: HashSet<(PeerId, crate::peerset::SetId)>,
peers: HashMap<PeerId, Roles>,
sync_substream_validations: FuturesUnordered<PendingSyncSubstreamValidation>,
tx: TracingUnboundedSender<crate::event::SyncEvent<B>>,
_marker: std::marker::PhantomData<B>,
}
impl<B: BlockT> Protocol<B> {
pub fn new(
roles: Roles,
network_config: &config::NetworkConfiguration,
notification_protocols: Vec<config::NonDefaultSetConfig>,
block_announces_protocol: config::NonDefaultSetConfig,
tx: TracingUnboundedSender<crate::event::SyncEvent<B>>,
) -> error::Result<(Self, crate::peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> {
let mut known_addresses = Vec::new();
let (peerset, peerset_handle) = {
let mut sets =
Vec::with_capacity(NUM_HARDCODED_PEERSETS + notification_protocols.len());
let mut default_sets_reserved = HashSet::new();
for reserved in network_config.default_peers_set.reserved_nodes.iter() {
default_sets_reserved.insert(reserved.peer_id);
if !reserved.multiaddr.is_empty() {
known_addresses.push((reserved.peer_id, reserved.multiaddr.clone()));
}
}
let mut bootnodes = Vec::with_capacity(network_config.boot_nodes.len());
for bootnode in network_config.boot_nodes.iter() {
bootnodes.push(bootnode.peer_id);
}
sets.push(crate::peerset::SetConfig {
in_peers: network_config.default_peers_set.in_peers,
out_peers: network_config.default_peers_set.out_peers,
bootnodes,
reserved_nodes: default_sets_reserved.clone(),
reserved_only: network_config.default_peers_set.non_reserved_mode ==
NonReservedPeerMode::Deny,
});
for set_cfg in ¬ification_protocols {
let mut reserved_nodes = HashSet::new();
for reserved in set_cfg.set_config.reserved_nodes.iter() {
reserved_nodes.insert(reserved.peer_id);
known_addresses.push((reserved.peer_id, reserved.multiaddr.clone()));
}
let reserved_only =
set_cfg.set_config.non_reserved_mode == NonReservedPeerMode::Deny;
sets.push(crate::peerset::SetConfig {
in_peers: set_cfg.set_config.in_peers,
out_peers: set_cfg.set_config.out_peers,
bootnodes: Vec::new(),
reserved_nodes,
reserved_only,
});
}
crate::peerset::Peerset::from_config(crate::peerset::PeersetConfig { sets })
};
let behaviour = {
Notifications::new(
peerset,
iter::once(notifications::ProtocolConfig {
name: block_announces_protocol.notifications_protocol.clone(),
fallback_names: block_announces_protocol.fallback_names.clone(),
handshake: block_announces_protocol.handshake.as_ref().unwrap().to_vec(),
max_notification_size: block_announces_protocol.max_notification_size,
})
.chain(notification_protocols.iter().map(|s| notifications::ProtocolConfig {
name: s.notifications_protocol.clone(),
fallback_names: s.fallback_names.clone(),
handshake: s.handshake.as_ref().map_or(roles.encode(), |h| (*h).to_vec()),
max_notification_size: s.max_notification_size,
})),
)
};
let protocol = Self {
peerset_handle: peerset_handle.clone(),
behaviour,
notification_protocols: iter::once(block_announces_protocol.notifications_protocol)
.chain(notification_protocols.iter().map(|s| s.notifications_protocol.clone()))
.collect(),
bad_handshake_substreams: Default::default(),
peers: HashMap::new(),
sync_substream_validations: FuturesUnordered::new(),
tx,
_marker: Default::default(),
};
Ok((protocol, peerset_handle, known_addresses))
}
pub fn open_peers(&self) -> impl Iterator<Item = &PeerId> {
self.behaviour.open_peers()
}
pub fn num_discovered_peers(&self) -> usize {
self.behaviour.num_discovered_peers()
}
pub fn disconnect_peer(&mut self, peer_id: &PeerId, protocol_name: ProtocolName) {
if let Some(position) = self.notification_protocols.iter().position(|p| *p == protocol_name)
{
self.behaviour.disconnect_peer(peer_id, crate::peerset::SetId::from(position));
self.peers.remove(peer_id);
} else {
warn!(target: "sub-libp2p", "disconnect_peer() with invalid protocol name")
}
}
pub fn peerset_debug_info(&mut self) -> serde_json::Value {
self.behaviour.peerset_debug_info()
}
pub fn num_connected_peers(&self) -> usize {
self.peers.len()
}
pub fn report_peer(&self, who: PeerId, reputation: ReputationChange) {
self.peerset_handle.report_peer(who, reputation)
}
pub fn set_notification_handshake(&mut self, protocol: ProtocolName, handshake: Vec<u8>) {
if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) {
self.behaviour
.set_notif_protocol_handshake(crate::peerset::SetId::from(index), handshake);
} else {
error!(
target: "sub-libp2p",
"set_notification_handshake with unknown protocol: {}",
protocol
);
}
}
pub fn set_reserved_only(&self, reserved_only: bool) {
self.peerset_handle.set_reserved_only(HARDCODED_PEERSETS_SYNC, reserved_only);
}
pub fn remove_reserved_peer(&self, peer: PeerId) {
self.peerset_handle.remove_reserved_peer(HARDCODED_PEERSETS_SYNC, peer);
}
pub fn reserved_peers(&self, pending_response: oneshot::Sender<Vec<PeerId>>) {
self.behaviour.reserved_peers(HARDCODED_PEERSETS_SYNC, pending_response);
}
pub fn add_reserved_peer(&self, peer: PeerId) {
self.peerset_handle.add_reserved_peer(HARDCODED_PEERSETS_SYNC, peer);
}
pub fn set_reserved_peers(&self, peers: HashSet<PeerId>) {
self.peerset_handle.set_reserved_peers(HARDCODED_PEERSETS_SYNC, peers);
}
pub fn set_reserved_peerset_peers(&self, protocol: ProtocolName, peers: HashSet<PeerId>) {
if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) {
self.peerset_handle
.set_reserved_peers(crate::peerset::SetId::from(index), peers);
} else {
error!(
target: "sub-libp2p",
"set_reserved_peerset_peers with unknown protocol: {}",
protocol
);
}
}
pub fn remove_set_reserved_peer(&self, protocol: ProtocolName, peer: PeerId) {
if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) {
self.peerset_handle
.remove_reserved_peer(crate::peerset::SetId::from(index), peer);
} else {
error!(
target: "sub-libp2p",
"remove_set_reserved_peer with unknown protocol: {}",
protocol
);
}
}
pub fn add_set_reserved_peer(&self, protocol: ProtocolName, peer: PeerId) {
if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) {
self.peerset_handle.add_reserved_peer(crate::peerset::SetId::from(index), peer);
} else {
error!(
target: "sub-libp2p",
"add_set_reserved_peer with unknown protocol: {}",
protocol
);
}
}
pub fn add_known_peer(&mut self, peer_id: PeerId) {
self.peerset_handle.add_known_peer(peer_id);
}
}
#[derive(Debug)]
#[must_use]
pub enum CustomMessageOutcome {
NotificationStreamOpened {
remote: PeerId,
protocol: ProtocolName,
negotiated_fallback: Option<ProtocolName>,
roles: Roles,
received_handshake: Vec<u8>,
notifications_sink: NotificationsSink,
},
NotificationStreamReplaced {
remote: PeerId,
protocol: ProtocolName,
notifications_sink: NotificationsSink,
},
NotificationStreamClosed { remote: PeerId, protocol: ProtocolName },
NotificationsReceived { remote: PeerId, messages: Vec<(ProtocolName, Bytes)> },
None,
}
impl<B: BlockT> NetworkBehaviour for Protocol<B> {
type ConnectionHandler = <Notifications as NetworkBehaviour>::ConnectionHandler;
type ToSwarm = CustomMessageOutcome;
fn handle_established_inbound_connection(
&mut self,
connection_id: ConnectionId,
peer: PeerId,
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
self.behaviour.handle_established_inbound_connection(
connection_id,
peer,
local_addr,
remote_addr,
)
}
fn handle_established_outbound_connection(
&mut self,
connection_id: ConnectionId,
peer: PeerId,
addr: &Multiaddr,
role_override: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
self.behaviour.handle_established_outbound_connection(
connection_id,
peer,
addr,
role_override,
)
}
fn handle_pending_outbound_connection(
&mut self,
_connection_id: ConnectionId,
_maybe_peer: Option<PeerId>,
_addresses: &[Multiaddr],
_effective_role: Endpoint,
) -> Result<Vec<Multiaddr>, ConnectionDenied> {
Ok(Vec::new())
}
fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
self.behaviour.on_swarm_event(event);
}
fn on_connection_handler_event(
&mut self,
peer_id: PeerId,
connection_id: ConnectionId,
event: THandlerOutEvent<Self>,
) {
self.behaviour.on_connection_handler_event(peer_id, connection_id, event);
}
fn poll(
&mut self,
cx: &mut std::task::Context,
params: &mut impl PollParameters,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
while let Poll::Ready(Some(validation_result)) =
self.sync_substream_validations.poll_next_unpin(cx)
{
match validation_result {
Ok((peer, roles)) => {
self.peers.insert(peer, roles);
},
Err(peer) => {
log::debug!(
target: "sub-libp2p",
"`SyncingEngine` rejected stream"
);
self.behaviour.disconnect_peer(&peer, HARDCODED_PEERSETS_SYNC);
},
}
}
let event = match self.behaviour.poll(cx, params) {
Poll::Pending => return Poll::Pending,
Poll::Ready(ToSwarm::GenerateEvent(ev)) => ev,
Poll::Ready(ToSwarm::Dial { opts }) => return Poll::Ready(ToSwarm::Dial { opts }),
Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler, event }) =>
return Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler, event }),
Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }) =>
return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }),
Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)) =>
return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)),
Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)) =>
return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)),
Poll::Ready(ToSwarm::ExternalAddrExpired(addr)) =>
return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)),
Poll::Ready(ToSwarm::ListenOn { opts }) =>
return Poll::Ready(ToSwarm::ListenOn { opts }),
Poll::Ready(ToSwarm::RemoveListener { id }) =>
return Poll::Ready(ToSwarm::RemoveListener { id }),
};
let outcome = match event {
NotificationsOut::CustomProtocolOpen {
peer_id,
set_id,
received_handshake,
notifications_sink,
negotiated_fallback,
inbound,
} => {
if set_id == HARDCODED_PEERSETS_SYNC {
match <Message<B> as DecodeAll>::decode_all(&mut &received_handshake[..]) {
Ok(GenericMessage::Status(handshake)) => {
let roles = handshake.roles;
let handshake = BlockAnnouncesHandshake::<B> {
roles: handshake.roles,
best_number: handshake.best_number,
best_hash: handshake.best_hash,
genesis_hash: handshake.genesis_hash,
};
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(
crate::SyncEvent::NotificationStreamOpened {
inbound,
remote: peer_id,
received_handshake: handshake,
sink: notifications_sink,
tx,
},
);
self.sync_substream_validations.push(Box::pin(async move {
match rx.await {
Ok(accepted) =>
if accepted {
Ok((peer_id, roles))
} else {
Err(peer_id)
},
Err(_) => Err(peer_id),
}
}));
CustomMessageOutcome::None
},
Ok(msg) => {
debug!(
target: "sync",
"Expected Status message from {}, but got {:?}",
peer_id,
msg,
);
self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE);
CustomMessageOutcome::None
},
Err(err) => {
match <BlockAnnouncesHandshake<B> as DecodeAll>::decode_all(
&mut &received_handshake[..],
) {
Ok(handshake) => {
let roles = handshake.roles;
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(
crate::SyncEvent::NotificationStreamOpened {
inbound,
remote: peer_id,
received_handshake: handshake,
sink: notifications_sink,
tx,
},
);
self.sync_substream_validations.push(Box::pin(async move {
match rx.await {
Ok(accepted) =>
if accepted {
Ok((peer_id, roles))
} else {
Err(peer_id)
},
Err(_) => Err(peer_id),
}
}));
CustomMessageOutcome::None
},
Err(err2) => {
log::debug!(
target: "sync",
"Couldn't decode handshake sent by {}: {:?}: {} & {}",
peer_id,
received_handshake,
err,
err2,
);
self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE);
CustomMessageOutcome::None
},
}
},
}
} else {
match (
Roles::decode_all(&mut &received_handshake[..]),
self.peers.get(&peer_id),
) {
(Ok(roles), _) => CustomMessageOutcome::NotificationStreamOpened {
remote: peer_id,
protocol: self.notification_protocols[usize::from(set_id)].clone(),
negotiated_fallback,
roles,
received_handshake,
notifications_sink,
},
(Err(_), Some(roles)) if received_handshake.is_empty() => {
CustomMessageOutcome::NotificationStreamOpened {
remote: peer_id,
protocol: self.notification_protocols[usize::from(set_id)].clone(),
negotiated_fallback,
roles: *roles,
received_handshake,
notifications_sink,
}
},
(Err(err), _) => {
debug!(target: "sync", "Failed to parse remote handshake: {}", err);
self.bad_handshake_substreams.insert((peer_id, set_id));
self.behaviour.disconnect_peer(&peer_id, set_id);
self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE);
self.peers.remove(&peer_id);
CustomMessageOutcome::None
},
}
}
},
NotificationsOut::CustomProtocolReplaced { peer_id, notifications_sink, set_id } =>
if self.bad_handshake_substreams.contains(&(peer_id, set_id)) {
CustomMessageOutcome::None
} else if set_id == HARDCODED_PEERSETS_SYNC {
let _ = self.tx.unbounded_send(crate::SyncEvent::NotificationSinkReplaced {
remote: peer_id,
sink: notifications_sink,
});
CustomMessageOutcome::None
} else {
CustomMessageOutcome::NotificationStreamReplaced {
remote: peer_id,
protocol: self.notification_protocols[usize::from(set_id)].clone(),
notifications_sink,
}
},
NotificationsOut::CustomProtocolClosed { peer_id, set_id } => {
if self.bad_handshake_substreams.remove(&(peer_id, set_id)) {
CustomMessageOutcome::None
} else if set_id == HARDCODED_PEERSETS_SYNC {
let _ = self.tx.unbounded_send(crate::SyncEvent::NotificationStreamClosed {
remote: peer_id,
});
self.peers.remove(&peer_id);
CustomMessageOutcome::None
} else {
CustomMessageOutcome::NotificationStreamClosed {
remote: peer_id,
protocol: self.notification_protocols[usize::from(set_id)].clone(),
}
}
},
NotificationsOut::Notification { peer_id, set_id, message } => {
if self.bad_handshake_substreams.contains(&(peer_id, set_id)) {
CustomMessageOutcome::None
} else if set_id == HARDCODED_PEERSETS_SYNC {
let _ = self.tx.unbounded_send(crate::SyncEvent::NotificationsReceived {
remote: peer_id,
messages: vec![message.freeze()],
});
CustomMessageOutcome::None
} else {
let protocol_name = self.notification_protocols[usize::from(set_id)].clone();
CustomMessageOutcome::NotificationsReceived {
remote: peer_id,
messages: vec![(protocol_name, message.freeze())],
}
}
},
};
if !matches!(outcome, CustomMessageOutcome::None) {
return Poll::Ready(ToSwarm::GenerateEvent(outcome))
}
cx.waker().wake_by_ref();
Poll::Pending
}
}