1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// This file is part of a fork of Substrate which has had various changes.

// Copyright (C) Parity Technologies (UK) Ltd.
// Copyright (C) 2022-2023 Luke Parker
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use futures::channel::oneshot;
use parking_lot::RwLock;
use sc_client_api::Backend;
use sp_runtime::traits::Block as BlockT;
use std::{sync::Arc, time::Duration};

mod error;
mod inner;

pub use error::SubscriptionManagementError;
pub use inner::BlockGuard;
use inner::SubscriptionsInner;

/// Manage block pinning / unpinning for subscription IDs.
pub struct SubscriptionManagement<Block: BlockT, BE: Backend<Block>> {
	/// Manage subscription by mapping the subscription ID
	/// to a set of block hashes.
	inner: RwLock<SubscriptionsInner<Block, BE>>,
}

impl<Block: BlockT, BE: Backend<Block>> SubscriptionManagement<Block, BE> {
	/// Construct a new [`SubscriptionManagement`].
	pub fn new(
		global_max_pinned_blocks: usize,
		local_max_pin_duration: Duration,
		backend: Arc<BE>,
	) -> Self {
		SubscriptionManagement {
			inner: RwLock::new(SubscriptionsInner::new(
				global_max_pinned_blocks,
				local_max_pin_duration,
				backend,
			)),
		}
	}

	/// Insert a new subscription ID.
	///
	/// If the subscription was not previously inserted, returns the receiver that is
	/// triggered upon the "Stop" event. Otherwise, if the subscription ID was already
	/// inserted returns none.
	pub fn insert_subscription(
		&self,
		sub_id: String,
		runtime_updates: bool,
	) -> Option<oneshot::Receiver<()>> {
		let mut inner = self.inner.write();
		inner.insert_subscription(sub_id, runtime_updates)
	}

	/// Remove the subscription ID with associated pinned blocks.
	pub fn remove_subscription(&self, sub_id: &str) {
		let mut inner = self.inner.write();
		inner.remove_subscription(sub_id)
	}

	/// The block is pinned in the backend only once when the block's hash is first encountered.
	///
	/// Each subscription is expected to call this method twice:
	/// - once from the `NewBlock` import
	/// - once from the `Finalized` import
	///
	/// Returns
	/// - Ok(true) if the subscription did not previously contain this block
	/// - Ok(false) if the subscription already contained this this
	/// - Error if the backend failed to pin the block or the subscription ID is invalid
	pub fn pin_block(
		&self,
		sub_id: &str,
		hash: Block::Hash,
	) -> Result<bool, SubscriptionManagementError> {
		let mut inner = self.inner.write();
		inner.pin_block(sub_id, hash)
	}

	/// Unpin the block from the subscription.
	///
	/// The last subscription that unpins the block is also unpinning the block
	/// from the backend.
	///
	/// This method is called only once per subscription.
	///
	/// Returns an error if the block is not pinned for the subscription or
	/// the subscription ID is invalid.
	pub fn unpin_block(
		&self,
		sub_id: &str,
		hash: Block::Hash,
	) -> Result<(), SubscriptionManagementError> {
		let mut inner = self.inner.write();
		inner.unpin_block(sub_id, hash)
	}

	/// Ensure the block remains pinned until the return object is dropped.
	///
	/// Returns a [`BlockGuard`] that pins and unpins the block hash in RAII manner.
	/// Returns an error if the block hash is not pinned for the subscription or
	/// the subscription ID is invalid.
	pub fn lock_block(
		&self,
		sub_id: &str,
		hash: Block::Hash,
	) -> Result<BlockGuard<Block, BE>, SubscriptionManagementError> {
		let mut inner = self.inner.write();
		inner.lock_block(sub_id, hash)
	}
}