1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395
// This file is part of a fork of Substrate which has had various changes.
// Copyright (C) Parity Technologies (UK) Ltd.
// Copyright (C) 2022-2023 Luke Parker
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Peer Set Manager (PSM). Contains the strategy for choosing which nodes the network should be
//! connected to.
//!
//! The PSM handles *sets* of nodes. A set of nodes is defined as the nodes that are believed to
//! support a certain capability, such as handling blocks and transactions of a specific chain,
//! or collating a certain parachain.
//!
//! For each node in each set, the peerset holds a flag specifying whether the node is
//! connected to us or not.
//!
//! This connected/disconnected status is specific to the node and set combination, and it is for
//! example possible for a node to be connected through a specific set but not another.
//!
//! In addition, for each, set, the peerset also holds a list of reserved nodes towards which it
//! will at all time try to maintain a connection with.
use crate::{
peer_store::{PeerStore, PeerStoreHandle, PeerStoreProvider},
protocol_controller::{ProtocolController, ProtocolHandle},
};
use futures::{
channel::oneshot,
future::{join_all, BoxFuture, JoinAll},
prelude::*,
stream::Stream,
};
use log::debug;
use sc_network_common::types::ReputationChange;
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use serde_json::json;
use std::{
collections::HashSet,
pin::Pin,
task::{Context, Poll},
};
use libp2p::PeerId;
pub const LOG_TARGET: &str = "peerset";
#[derive(Debug)]
enum Action {
AddReservedPeer(SetId, PeerId),
RemoveReservedPeer(SetId, PeerId),
SetReservedPeers(SetId, HashSet<PeerId>),
SetReservedOnly(SetId, bool),
ReportPeer(PeerId, ReputationChange),
AddKnownPeer(PeerId),
PeerReputation(PeerId, oneshot::Sender<i32>),
}
/// Identifier of a set in the peerset.
///
/// Can be constructed using the `From<usize>` trait implementation based on the index of the set
/// within [`PeersetConfig::sets`]. For example, the first element of [`PeersetConfig::sets`] is
/// later referred to with `SetId::from(0)`. It is intended that the code responsible for building
/// the [`PeersetConfig`] is also responsible for constructing the [`SetId`]s.
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct SetId(usize);
impl SetId {
pub const fn from(id: usize) -> Self {
Self(id)
}
}
impl From<usize> for SetId {
fn from(id: usize) -> Self {
Self(id)
}
}
impl From<SetId> for usize {
fn from(id: SetId) -> Self {
id.0
}
}
/// Shared handle to the peer set manager (PSM). Distributed around the code.
#[derive(Debug, Clone)]
pub struct PeersetHandle {
tx: TracingUnboundedSender<Action>,
}
impl PeersetHandle {
/// Adds a new reserved peer. The peerset will make an effort to always remain connected to
/// this peer.
///
/// Has no effect if the node was already a reserved peer.
///
/// > **Note**: Keep in mind that the networking has to know an address for this node,
/// > otherwise it will not be able to connect to it.
pub fn add_reserved_peer(&self, set_id: SetId, peer_id: PeerId) {
let _ = self.tx.unbounded_send(Action::AddReservedPeer(set_id, peer_id));
}
/// Remove a previously-added reserved peer.
///
/// Has no effect if the node was not a reserved peer.
pub fn remove_reserved_peer(&self, set_id: SetId, peer_id: PeerId) {
let _ = self.tx.unbounded_send(Action::RemoveReservedPeer(set_id, peer_id));
}
/// Sets whether or not the peerset only has connections with nodes marked as reserved for
/// the given set.
pub fn set_reserved_only(&self, set_id: SetId, reserved: bool) {
let _ = self.tx.unbounded_send(Action::SetReservedOnly(set_id, reserved));
}
/// Set reserved peers to the new set.
pub fn set_reserved_peers(&self, set_id: SetId, peer_ids: HashSet<PeerId>) {
let _ = self.tx.unbounded_send(Action::SetReservedPeers(set_id, peer_ids));
}
/// Reports an adjustment to the reputation of the given peer.
pub fn report_peer(&self, peer_id: PeerId, score_diff: ReputationChange) {
let _ = self.tx.unbounded_send(Action::ReportPeer(peer_id, score_diff));
}
/// Add a peer to the list of known peers.
pub fn add_known_peer(&self, peer_id: PeerId) {
let _ = self.tx.unbounded_send(Action::AddKnownPeer(peer_id));
}
/// Returns the reputation value of the peer.
pub async fn peer_reputation(self, peer_id: PeerId) -> Result<i32, ()> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(Action::PeerReputation(peer_id, tx));
// The channel can only be closed if the peerset no longer exists.
rx.await.map_err(|_| ())
}
}
/// Message that can be sent by the peer set manager (PSM).
#[derive(Debug, PartialEq)]
pub enum Message {
/// Request to open a connection to the given peer. From the point of view of the PSM, we are
/// immediately connected.
Connect {
/// Set id to connect on.
set_id: SetId,
/// Peer to connect to.
peer_id: PeerId,
},
/// Drop the connection to the given peer, or cancel the connection attempt after a `Connect`.
Drop {
/// Set id to disconnect on.
set_id: SetId,
/// Peer to disconnect from.
peer_id: PeerId,
},
/// Equivalent to `Connect` for the peer corresponding to this incoming index.
Accept(IncomingIndex),
/// Equivalent to `Drop` for the peer corresponding to this incoming index.
Reject(IncomingIndex),
}
/// Opaque identifier for an incoming connection. Allocated by the network.
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct IncomingIndex(pub u64);
impl From<u64> for IncomingIndex {
fn from(val: u64) -> Self {
Self(val)
}
}
/// Configuration to pass when creating the peer set manager.
#[derive(Debug)]
pub struct PeersetConfig {
/// List of sets of nodes the peerset manages.
pub sets: Vec<SetConfig>,
}
/// Configuration for a single set of nodes.
#[derive(Debug)]
pub struct SetConfig {
/// Maximum number of ingoing links to peers.
pub in_peers: u32,
/// Maximum number of outgoing links to peers.
pub out_peers: u32,
/// List of bootstrap nodes to initialize the set with.
///
/// > **Note**: Keep in mind that the networking has to know an address for these nodes,
/// > otherwise it will not be able to connect to them.
pub bootnodes: Vec<PeerId>,
/// Lists of nodes we should always be connected to.
///
/// > **Note**: Keep in mind that the networking has to know an address for these nodes,
/// > otherwise it will not be able to connect to them.
pub reserved_nodes: HashSet<PeerId>,
/// If true, we only accept nodes in [`SetConfig::reserved_nodes`].
pub reserved_only: bool,
}
/// Side of the peer set manager owned by the network. In other words, the "receiving" side.
///
/// Implements the `Stream` trait and can be polled for messages. The `Stream` never ends and never
/// errors.
pub struct Peerset {
/// Peer reputation store handle.
peer_store_handle: PeerStoreHandle,
/// Peer reputation store.
peer_store_future: BoxFuture<'static, ()>,
/// Protocol handles.
protocol_handles: Vec<ProtocolHandle>,
/// Protocol controllers responsible for connections, per `SetId`.
protocol_controller_futures: JoinAll<BoxFuture<'static, ()>>,
/// Commands sent from protocol controllers to `Notifications`. The size of this vector never
/// changes.
from_controllers: TracingUnboundedReceiver<Message>,
/// Receiver for messages from the `PeersetHandle` and from `to_self`.
from_handle: TracingUnboundedReceiver<Action>,
}
impl Peerset {
/// Builds a new peerset from the given configuration.
pub fn from_config(config: PeersetConfig) -> (Peerset, PeersetHandle) {
let default_set_config = &config.sets[0];
let peer_store = PeerStore::new(default_set_config.bootnodes.clone());
let (to_notifications, from_controllers) =
tracing_unbounded("mpsc_protocol_controllers_to_notifications", 10_000);
let controllers = config
.sets
.into_iter()
.enumerate()
.map(|(set, set_config)| {
ProtocolController::new(
SetId::from(set),
set_config,
to_notifications.clone(),
Box::new(peer_store.handle()),
)
})
.collect::<Vec<_>>();
let (protocol_handles, protocol_controllers): (Vec<ProtocolHandle>, Vec<_>) =
controllers.into_iter().unzip();
let (tx, from_handle) = tracing_unbounded("mpsc_peerset_messages", 10_000);
let handle = PeersetHandle { tx };
let protocol_controller_futures =
join_all(protocol_controllers.into_iter().map(|c| c.run().boxed()));
let peerset = Peerset {
peer_store_handle: peer_store.handle(),
peer_store_future: peer_store.run().boxed(),
protocol_handles,
protocol_controller_futures,
from_controllers,
from_handle,
};
(peerset, handle)
}
/// Returns the list of reserved peers.
pub fn reserved_peers(&self, set_id: SetId, pending_response: oneshot::Sender<Vec<PeerId>>) {
self.protocol_handles[set_id.0].reserved_peers(pending_response);
}
/// Indicate that we received an incoming connection. Must be answered either with
/// a corresponding `Accept` or `Reject`, except if we were already connected to this peer.
///
/// Note that this mechanism is orthogonal to `Connect`/`Drop`. Accepting an incoming
/// connection implicitly means `Connect`, but incoming connections aren't cancelled by
/// `dropped`.
// Implementation note: because of concurrency issues, it is possible that we push a `Connect`
// message to the output channel with a `PeerId`, and that `incoming` gets called with the same
// `PeerId` before that message has been read by the user. In this situation we must not answer.
pub fn incoming(&mut self, set_id: SetId, peer_id: PeerId, index: IncomingIndex) {
self.protocol_handles[set_id.0].incoming_connection(peer_id, index);
}
/// Indicate that we dropped an active connection with a peer, or that we failed to connect.
///
/// Must only be called after the PSM has either generated a `Connect` message with this
/// `PeerId`, or accepted an incoming connection with this `PeerId`.
pub fn dropped(&mut self, set_id: SetId, peer_id: PeerId, _reason: DropReason) {
self.protocol_handles[set_id.0].dropped(peer_id);
}
/// Produces a JSON object containing the state of the peerset manager, for debugging purposes.
pub fn debug_info(&mut self) -> serde_json::Value {
// TODO: Check what info we can include here.
// Issue reference: https://github.com/paritytech/substrate/issues/14160.
json!("unimplemented")
}
/// Returns the number of peers that we have discovered.
pub fn num_discovered_peers(&self) -> usize {
self.peer_store_handle.num_known_peers()
}
}
impl Stream for Peerset {
type Item = Message;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if let Poll::Ready(msg) = self.from_controllers.poll_next_unpin(cx) {
if let Some(msg) = msg {
return Poll::Ready(Some(msg))
} else {
debug!(
target: LOG_TARGET,
"All `ProtocolController`s have terminated, terminating `Peerset`."
);
return Poll::Ready(None)
}
}
while let Poll::Ready(action) = self.from_handle.poll_next_unpin(cx) {
if let Some(action) = action {
match action {
Action::AddReservedPeer(set_id, peer_id) =>
self.protocol_handles[set_id.0].add_reserved_peer(peer_id),
Action::RemoveReservedPeer(set_id, peer_id) =>
self.protocol_handles[set_id.0].remove_reserved_peer(peer_id),
Action::SetReservedPeers(set_id, peer_ids) =>
self.protocol_handles[set_id.0].set_reserved_peers(peer_ids),
Action::SetReservedOnly(set_id, reserved_only) =>
self.protocol_handles[set_id.0].set_reserved_only(reserved_only),
Action::ReportPeer(peer_id, score_diff) =>
self.peer_store_handle.report_peer(peer_id, score_diff),
Action::AddKnownPeer(peer_id) => self.peer_store_handle.add_known_peer(peer_id),
Action::PeerReputation(peer_id, pending_response) => {
let _ =
pending_response.send(self.peer_store_handle.peer_reputation(&peer_id));
},
}
} else {
debug!(target: LOG_TARGET, "`PeersetHandle` was dropped, terminating `Peerset`.");
return Poll::Ready(None)
}
}
if let Poll::Ready(()) = self.peer_store_future.poll_unpin(cx) {
debug!(target: LOG_TARGET, "`PeerStore` has terminated, terminating `PeerSet`.");
return Poll::Ready(None)
}
if let Poll::Ready(_) = self.protocol_controller_futures.poll_unpin(cx) {
debug!(
target: LOG_TARGET,
"All `ProtocolHandle`s have terminated, terminating `PeerSet`."
);
return Poll::Ready(None)
}
Poll::Pending
}
}
/// Reason for calling [`Peerset::dropped`].
#[derive(Debug)]
pub enum DropReason {
/// Substream or connection has been closed for an unknown reason.
Unknown,
/// Substream or connection has been explicitly refused by the target. In other words, the
/// peer doesn't actually belong to this set.
Refused,
}