1use alloy_primitives::{BlockNumber, TxNumber};
3use reth_config::config::EtlConfig;
4use reth_db_api::{
5 cursor::{DbCursorRO, DbCursorRW},
6 models::sharded_key::NUM_OF_INDICES_IN_SHARD,
7 table::{Decompress, Table},
8 transaction::{DbTx, DbTxMut},
9 BlockNumberList, DatabaseError,
10};
11use reth_etl::Collector;
12use reth_provider::{
13 providers::StaticFileProvider, BlockReader, DBProvider, ProviderError,
14 StaticFileProviderFactory,
15};
16use reth_stages_api::StageError;
17use reth_static_file_types::StaticFileSegment;
18use std::{collections::HashMap, hash::Hash, ops::RangeBounds};
19use tracing::info;
20
21const DEFAULT_CACHE_THRESHOLD: u64 = 100_000;
23
24pub(crate) fn collect_history_indices<Provider, CS, H, P>(
42 provider: &Provider,
43 range: impl RangeBounds<CS::Key>,
44 sharded_key_factory: impl Fn(P, BlockNumber) -> H::Key,
45 partial_key_factory: impl Fn((CS::Key, CS::Value)) -> (u64, P),
46 etl_config: &EtlConfig,
47) -> Result<Collector<H::Key, H::Value>, StageError>
48where
49 Provider: DBProvider,
50 CS: Table,
51 H: Table<Value = BlockNumberList>,
52 P: Copy + Eq + Hash,
53{
54 let mut changeset_cursor = provider.tx_ref().cursor_read::<CS>()?;
55
56 let mut collector = Collector::new(etl_config.file_size, etl_config.dir.clone());
57 let mut cache: HashMap<P, Vec<u64>> = HashMap::default();
58
59 let mut collect = |cache: &HashMap<P, Vec<u64>>| {
60 for (key, indices) in cache {
61 let last = indices.last().expect("qed");
62 collector.insert(
63 sharded_key_factory(*key, *last),
64 BlockNumberList::new_pre_sorted(indices.iter().copied()),
65 )?;
66 }
67 Ok::<(), StageError>(())
68 };
69
70 let total_changesets = provider.tx_ref().entries::<CS>()?;
72 let interval = (total_changesets / 1000).max(1);
73
74 let mut flush_counter = 0;
75 let mut current_block_number = u64::MAX;
76 for (idx, entry) in changeset_cursor.walk_range(range)?.enumerate() {
77 let (block_number, key) = partial_key_factory(entry?);
78 cache.entry(key).or_default().push(block_number);
79
80 if idx > 0 && idx.is_multiple_of(interval) && total_changesets > 1000 {
81 info!(target: "sync::stages::index_history", progress = %format!("{:.4}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices");
82 }
83
84 if current_block_number != block_number {
86 current_block_number = block_number;
87 flush_counter += 1;
88 if flush_counter > DEFAULT_CACHE_THRESHOLD {
89 collect(&cache)?;
90 cache.clear();
91 flush_counter = 0;
92 }
93 }
94 }
95 collect(&cache)?;
96
97 Ok(collector)
98}
99
100pub(crate) fn load_history_indices<Provider, H, P>(
109 provider: &Provider,
110 mut collector: Collector<H::Key, H::Value>,
111 append_only: bool,
112 sharded_key_factory: impl Clone + Fn(P, u64) -> <H as Table>::Key,
113 decode_key: impl Fn(Vec<u8>) -> Result<<H as Table>::Key, DatabaseError>,
114 get_partial: impl Fn(<H as Table>::Key) -> P,
115) -> Result<(), StageError>
116where
117 Provider: DBProvider<Tx: DbTxMut>,
118 H: Table<Value = BlockNumberList>,
119 P: Copy + Default + Eq,
120{
121 let mut write_cursor = provider.tx_ref().cursor_write::<H>()?;
122 let mut current_partial = P::default();
123 let mut current_list = Vec::<u64>::new();
124
125 let total_entries = collector.len();
127 let interval = (total_entries / 10).max(1);
128
129 for (index, element) in collector.iter()?.enumerate() {
130 let (k, v) = element?;
131 let sharded_key = decode_key(k)?;
132 let new_list = BlockNumberList::decompress_owned(v)?;
133
134 if index > 0 && index.is_multiple_of(interval) && total_entries > 10 {
135 info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices");
136 }
137
138 let partial_key = get_partial(sharded_key);
141
142 if current_partial != partial_key {
143 load_indices(
146 &mut write_cursor,
147 current_partial,
148 &mut current_list,
149 &sharded_key_factory,
150 append_only,
151 LoadMode::Flush,
152 )?;
153
154 current_partial = partial_key;
155 current_list.clear();
156
157 if !append_only &&
160 let Some((_, last_database_shard)) =
161 write_cursor.seek_exact(sharded_key_factory(current_partial, u64::MAX))?
162 {
163 current_list.extend(last_database_shard.iter());
164 }
165 }
166
167 current_list.extend(new_list.iter());
168 load_indices(
169 &mut write_cursor,
170 current_partial,
171 &mut current_list,
172 &sharded_key_factory,
173 append_only,
174 LoadMode::KeepLast,
175 )?;
176 }
177
178 load_indices(
180 &mut write_cursor,
181 current_partial,
182 &mut current_list,
183 &sharded_key_factory,
184 append_only,
185 LoadMode::Flush,
186 )?;
187
188 Ok(())
189}
190
191pub(crate) fn load_indices<H, C, P>(
193 cursor: &mut C,
194 partial_key: P,
195 list: &mut Vec<BlockNumber>,
196 sharded_key_factory: &impl Fn(P, BlockNumber) -> <H as Table>::Key,
197 append_only: bool,
198 mode: LoadMode,
199) -> Result<(), StageError>
200where
201 C: DbCursorRO<H> + DbCursorRW<H>,
202 H: Table<Value = BlockNumberList>,
203 P: Copy,
204{
205 if list.len() > NUM_OF_INDICES_IN_SHARD || mode.is_flush() {
206 let chunks = list
207 .chunks(NUM_OF_INDICES_IN_SHARD)
208 .map(|chunks| chunks.to_vec())
209 .collect::<Vec<Vec<u64>>>();
210
211 let mut iter = chunks.into_iter().peekable();
212 while let Some(chunk) = iter.next() {
213 let mut highest = *chunk.last().expect("at least one index");
214
215 if !mode.is_flush() && iter.peek().is_none() {
216 *list = chunk;
217 } else {
218 if iter.peek().is_none() {
219 highest = u64::MAX;
220 }
221 let key = sharded_key_factory(partial_key, highest);
222 let value = BlockNumberList::new_pre_sorted(chunk);
223
224 if append_only {
225 cursor.append(key, &value)?;
226 } else {
227 cursor.upsert(key, &value)?;
228 }
229 }
230 }
231 }
232
233 Ok(())
234}
235
236pub(crate) enum LoadMode {
238 KeepLast,
240 Flush,
242}
243
244impl LoadMode {
245 const fn is_flush(&self) -> bool {
246 matches!(self, Self::Flush)
247 }
248}
249
250pub(crate) fn missing_static_data_error<Provider>(
253 last_tx_num: TxNumber,
254 static_file_provider: &StaticFileProvider<Provider::Primitives>,
255 provider: &Provider,
256 segment: StaticFileSegment,
257) -> Result<StageError, ProviderError>
258where
259 Provider: BlockReader + StaticFileProviderFactory,
260{
261 let mut last_block =
262 static_file_provider.get_highest_static_file_block(segment).unwrap_or_default();
263
264 loop {
267 if let Some(indices) = provider.block_body_indices(last_block)? &&
268 indices.last_tx_num() <= last_tx_num
269 {
270 break
271 }
272 if last_block == 0 {
273 break
274 }
275 last_block -= 1;
276 }
277
278 let missing_block = Box::new(provider.sealed_header(last_block + 1)?.unwrap_or_default());
279
280 Ok(StageError::MissingStaticFileData {
281 block: Box::new(missing_block.block_with_parent()),
282 segment,
283 })
284}