1use alloy_primitives::{map::AddressMap, Address, BlockNumber, TxNumber, B256};
3use reth_config::config::EtlConfig;
4use reth_db_api::{
5 cursor::{DbCursorRO, DbCursorRW},
6 models::{
7 sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey,
8 AccountBeforeTx, AddressStorageKey, BlockNumberAddress, ShardedKey,
9 },
10 table::{Decode, Decompress, Table},
11 transaction::DbTx,
12 BlockNumberList,
13};
14use reth_etl::Collector;
15use reth_primitives_traits::NodePrimitives;
16use reth_provider::{
17 providers::StaticFileProvider, to_range, BlockReader, DBProvider, EitherWriter, ProviderError,
18 StaticFileProviderFactory,
19};
20use reth_stages_api::StageError;
21use reth_static_file_types::StaticFileSegment;
22use reth_storage_api::{ChangeSetReader, StorageChangeSetReader};
23use std::{collections::HashMap, hash::Hash, ops::RangeBounds};
24use tracing::info;
25
26const DEFAULT_CACHE_THRESHOLD: u64 = 100_000;
28
29pub(crate) fn collect_history_indices<Provider, CS, H, P>(
47 provider: &Provider,
48 range: impl RangeBounds<CS::Key>,
49 sharded_key_factory: impl Fn(P, BlockNumber) -> H::Key,
50 partial_key_factory: impl Fn((CS::Key, CS::Value)) -> (u64, P),
51 etl_config: &EtlConfig,
52) -> Result<Collector<H::Key, H::Value>, StageError>
53where
54 Provider: DBProvider,
55 CS: Table,
56 H: Table<Value = BlockNumberList>,
57 P: Copy + Eq + Hash,
58{
59 let mut changeset_cursor = provider.tx_ref().cursor_read::<CS>()?;
60
61 let mut collector = Collector::new(etl_config.file_size, etl_config.dir.clone());
62 let mut cache: HashMap<P, Vec<u64>> = HashMap::default();
63
64 let mut collect = |cache: &mut HashMap<P, Vec<u64>>| {
65 for (key, indices) in cache.drain() {
66 let last = *indices.last().expect("qed");
67 collector
68 .insert(sharded_key_factory(key, last), BlockNumberList::new_pre_sorted(indices))?;
69 }
70 Ok::<(), StageError>(())
71 };
72
73 let total_changesets = provider.tx_ref().entries::<CS>()?;
75 let interval = (total_changesets / 1000).max(1);
76
77 let mut flush_counter = 0;
78 let mut current_block_number = u64::MAX;
79 for (idx, entry) in changeset_cursor.walk_range(range)?.enumerate() {
80 let (block_number, key) = partial_key_factory(entry?);
81 cache.entry(key).or_default().push(block_number);
82
83 if idx > 0 && idx.is_multiple_of(interval) && total_changesets > 1000 {
84 info!(target: "sync::stages::index_history", progress = %format!("{:.4}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices");
85 }
86
87 if current_block_number != block_number {
89 current_block_number = block_number;
90 flush_counter += 1;
91 if flush_counter > DEFAULT_CACHE_THRESHOLD {
92 collect(&mut cache)?;
93 flush_counter = 0;
94 }
95 }
96 }
97 collect(&mut cache)?;
98
99 Ok(collector)
100}
101
102fn collect_indices<K, F>(
104 cache: impl Iterator<Item = (K, Vec<u64>)>,
105 mut insert_fn: F,
106) -> Result<(), StageError>
107where
108 F: FnMut(K, Vec<u64>) -> Result<(), StageError>,
109{
110 for (key, indices) in cache {
111 insert_fn(key, indices)?
112 }
113 Ok(())
114}
115
116pub(crate) fn collect_account_history_indices<Provider>(
118 provider: &Provider,
119 range: impl RangeBounds<BlockNumber>,
120 etl_config: &EtlConfig,
121) -> Result<Collector<ShardedKey<Address>, BlockNumberList>, StageError>
122where
123 Provider: DBProvider + ChangeSetReader + StaticFileProviderFactory,
124{
125 let mut collector = Collector::new(etl_config.file_size, etl_config.dir.clone());
126 let mut cache: AddressMap<Vec<u64>> = AddressMap::default();
127
128 let mut insert_fn = |address: Address, indices: Vec<u64>| {
129 let last = indices.last().expect("indices is non-empty");
130 collector
131 .insert(ShardedKey::new(address, *last), BlockNumberList::new_pre_sorted(indices))?;
132 Ok(())
133 };
134
135 let range = to_range(range);
137
138 let static_file_provider = provider.static_file_provider();
140
141 let total_changesets = static_file_provider.account_changeset_count()?;
143 let interval = (total_changesets / 1000).max(1);
144
145 let walker = static_file_provider.walk_account_changeset_range(range);
146
147 let mut flush_counter = 0;
148 let mut current_block_number = u64::MAX;
149
150 for (idx, changeset_result) in walker.enumerate() {
151 let (block_number, AccountBeforeTx { address, .. }) = changeset_result?;
152 cache.entry(address).or_default().push(block_number);
153
154 if idx > 0 && idx % interval == 0 && total_changesets > 1000 {
155 info!(target: "sync::stages::index_history", progress = %format!("{:.4}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices");
156 }
157
158 if block_number != current_block_number {
159 current_block_number = block_number;
160 flush_counter += 1;
161 }
162
163 if flush_counter > DEFAULT_CACHE_THRESHOLD {
164 collect_indices(cache.drain(), &mut insert_fn)?;
165 flush_counter = 0;
166 }
167 }
168 collect_indices(cache.into_iter(), insert_fn)?;
169
170 Ok(collector)
171}
172
173pub(crate) fn collect_storage_history_indices<Provider>(
175 provider: &Provider,
176 range: impl RangeBounds<BlockNumber>,
177 etl_config: &EtlConfig,
178) -> Result<Collector<StorageShardedKey, BlockNumberList>, StageError>
179where
180 Provider: DBProvider + StorageChangeSetReader + StaticFileProviderFactory,
181{
182 let mut collector = Collector::new(etl_config.file_size, etl_config.dir.clone());
183 let mut cache: HashMap<AddressStorageKey, Vec<u64>> = HashMap::default();
184
185 let mut insert_fn = |key: AddressStorageKey, indices: Vec<u64>| {
186 let last = indices.last().expect("qed");
187 collector.insert(
188 StorageShardedKey::new(key.0 .0, key.0 .1, *last),
189 BlockNumberList::new_pre_sorted(indices),
190 )?;
191 Ok::<(), StageError>(())
192 };
193
194 let range = to_range(range);
195 let static_file_provider = provider.static_file_provider();
196
197 let total_changesets = static_file_provider.storage_changeset_count()?;
198 let interval = (total_changesets / 1000).max(1);
199
200 let walker = static_file_provider.walk_storage_changeset_range(range);
201
202 let mut flush_counter = 0;
203 let mut current_block_number = u64::MAX;
204
205 for (idx, changeset_result) in walker.enumerate() {
206 let (BlockNumberAddress((block_number, address)), storage) = changeset_result?;
207 cache.entry(AddressStorageKey((address, storage.key))).or_default().push(block_number);
208
209 if idx > 0 && idx % interval == 0 && total_changesets > 1000 {
210 info!(target: "sync::stages::index_history", progress = %format!("{:.4}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices");
211 }
212
213 if block_number != current_block_number {
214 current_block_number = block_number;
215 flush_counter += 1;
216 }
217
218 if flush_counter > DEFAULT_CACHE_THRESHOLD {
219 collect_indices(cache.drain(), &mut insert_fn)?;
220 flush_counter = 0;
221 }
222 }
223
224 collect_indices(cache.into_iter(), insert_fn)?;
225
226 Ok(collector)
227}
228
229pub(crate) fn load_account_history<N, CURSOR>(
241 mut collector: Collector<ShardedKey<Address>, BlockNumberList>,
242 append_only: bool,
243 writer: &mut EitherWriter<'_, CURSOR, N>,
244) -> Result<(), StageError>
245where
246 N: NodePrimitives,
247 CURSOR: DbCursorRW<reth_db_api::tables::AccountsHistory>
248 + DbCursorRO<reth_db_api::tables::AccountsHistory>,
249{
250 let mut current_address: Option<Address> = None;
251 let mut current_list = Vec::<u64>::new();
253
254 let total_entries = collector.len();
255 let interval = (total_entries / 10).max(1);
256
257 for (index, element) in collector.iter()?.enumerate() {
258 let (k, v) = element?;
259 let sharded_key = ShardedKey::<Address>::decode_owned(k)?;
260 let new_list = BlockNumberList::decompress_owned(v)?;
261
262 if index > 0 && index.is_multiple_of(interval) && total_entries > 10 {
263 info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices");
264 }
265
266 let address = sharded_key.key;
267
268 if current_address != Some(address) {
270 if let Some(prev_addr) = current_address {
272 flush_account_history_shards(prev_addr, &mut current_list, append_only, writer)?;
273 }
274
275 current_address = Some(address);
276 current_list.clear();
277
278 if !append_only &&
281 let Some(last_shard) = writer.get_last_account_history_shard(address)?
282 {
283 current_list.extend(last_shard.iter());
284 }
285 }
286
287 current_list.extend(new_list.iter());
289
290 flush_account_history_shards_partial(address, &mut current_list, append_only, writer)?;
292 }
293
294 if let Some(addr) = current_address {
296 flush_account_history_shards(addr, &mut current_list, append_only, writer)?;
297 }
298
299 Ok(())
300}
301
302fn flush_account_history_shards_partial<N, CURSOR>(
308 address: Address,
309 list: &mut Vec<u64>,
310 append_only: bool,
311 writer: &mut EitherWriter<'_, CURSOR, N>,
312) -> Result<(), StageError>
313where
314 N: NodePrimitives,
315 CURSOR: DbCursorRW<reth_db_api::tables::AccountsHistory>
316 + DbCursorRO<reth_db_api::tables::AccountsHistory>,
317{
318 if list.len() <= NUM_OF_INDICES_IN_SHARD {
320 return Ok(());
321 }
322
323 let num_full_shards = list.len() / NUM_OF_INDICES_IN_SHARD;
324
325 let shards_to_flush = if list.len().is_multiple_of(NUM_OF_INDICES_IN_SHARD) {
328 num_full_shards - 1
329 } else {
330 num_full_shards
331 };
332
333 if shards_to_flush == 0 {
334 return Ok(());
335 }
336
337 let flush_len = shards_to_flush * NUM_OF_INDICES_IN_SHARD;
339 let remainder = list.split_off(flush_len);
340
341 for chunk in list.chunks(NUM_OF_INDICES_IN_SHARD) {
343 let highest = *chunk.last().expect("chunk is non-empty");
344 let key = ShardedKey::new(address, highest);
345 let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
346
347 if append_only {
348 writer.append_account_history(key, &value)?;
349 } else {
350 writer.upsert_account_history(key, &value)?;
351 }
352 }
353
354 *list = remainder;
356 Ok(())
357}
358
359fn flush_account_history_shards<N, CURSOR>(
364 address: Address,
365 list: &mut Vec<u64>,
366 append_only: bool,
367 writer: &mut EitherWriter<'_, CURSOR, N>,
368) -> Result<(), StageError>
369where
370 N: NodePrimitives,
371 CURSOR: DbCursorRW<reth_db_api::tables::AccountsHistory>
372 + DbCursorRO<reth_db_api::tables::AccountsHistory>,
373{
374 if list.is_empty() {
375 return Ok(());
376 }
377
378 let num_chunks = list.len().div_ceil(NUM_OF_INDICES_IN_SHARD);
379
380 for (i, chunk) in list.chunks(NUM_OF_INDICES_IN_SHARD).enumerate() {
381 let is_last = i == num_chunks - 1;
382
383 let highest = if is_last { u64::MAX } else { *chunk.last().expect("chunk is non-empty") };
386
387 let key = ShardedKey::new(address, highest);
388 let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
389
390 if append_only {
391 writer.append_account_history(key, &value)?;
392 } else {
393 writer.upsert_account_history(key, &value)?;
394 }
395 }
396
397 list.clear();
398 Ok(())
399}
400
401pub(crate) fn missing_static_data_error<Provider>(
404 last_tx_num: TxNumber,
405 static_file_provider: &StaticFileProvider<Provider::Primitives>,
406 provider: &Provider,
407 segment: StaticFileSegment,
408) -> Result<StageError, ProviderError>
409where
410 Provider: BlockReader + StaticFileProviderFactory,
411{
412 let mut last_block =
413 static_file_provider.get_highest_static_file_block(segment).unwrap_or_default();
414
415 loop {
418 if let Some(indices) = provider.block_body_indices(last_block)? &&
419 indices.last_tx_num() <= last_tx_num
420 {
421 break
422 }
423 if last_block == 0 {
424 break
425 }
426 last_block -= 1;
427 }
428
429 let missing_block = Box::new(provider.sealed_header(last_block + 1)?.unwrap_or_default());
430
431 Ok(StageError::MissingStaticFileData {
432 block: Box::new(missing_block.block_with_parent()),
433 segment,
434 })
435}
436
437pub(crate) fn load_storage_history<N, CURSOR>(
449 mut collector: Collector<StorageShardedKey, BlockNumberList>,
450 append_only: bool,
451 writer: &mut EitherWriter<'_, CURSOR, N>,
452) -> Result<(), StageError>
453where
454 N: NodePrimitives,
455 CURSOR: DbCursorRW<reth_db_api::tables::StoragesHistory>
456 + DbCursorRO<reth_db_api::tables::StoragesHistory>,
457{
458 let mut current_key: Option<(Address, B256)> = None;
459 let mut current_list = Vec::<u64>::new();
461
462 let total_entries = collector.len();
463 let interval = (total_entries / 10).max(1);
464
465 for (index, element) in collector.iter()?.enumerate() {
466 let (k, v) = element?;
467 let sharded_key = StorageShardedKey::decode_owned(k)?;
468 let new_list = BlockNumberList::decompress_owned(v)?;
469
470 if index > 0 && index.is_multiple_of(interval) && total_entries > 10 {
471 info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices");
472 }
473
474 let partial_key = (sharded_key.address, sharded_key.sharded_key.key);
475
476 if current_key != Some(partial_key) {
478 if let Some((prev_addr, prev_storage_key)) = current_key {
480 flush_storage_history_shards(
481 prev_addr,
482 prev_storage_key,
483 &mut current_list,
484 append_only,
485 writer,
486 )?;
487 }
488
489 current_key = Some(partial_key);
490 current_list.clear();
491
492 if !append_only &&
495 let Some(last_shard) =
496 writer.get_last_storage_history_shard(partial_key.0, partial_key.1)?
497 {
498 current_list.extend(last_shard.iter());
499 }
500 }
501
502 current_list.extend(new_list.iter());
504
505 flush_storage_history_shards_partial(
507 partial_key.0,
508 partial_key.1,
509 &mut current_list,
510 append_only,
511 writer,
512 )?;
513 }
514
515 if let Some((addr, storage_key)) = current_key {
517 flush_storage_history_shards(addr, storage_key, &mut current_list, append_only, writer)?;
518 }
519
520 Ok(())
521}
522
523fn flush_storage_history_shards_partial<N, CURSOR>(
529 address: Address,
530 storage_key: B256,
531 list: &mut Vec<u64>,
532 append_only: bool,
533 writer: &mut EitherWriter<'_, CURSOR, N>,
534) -> Result<(), StageError>
535where
536 N: NodePrimitives,
537 CURSOR: DbCursorRW<reth_db_api::tables::StoragesHistory>
538 + DbCursorRO<reth_db_api::tables::StoragesHistory>,
539{
540 if list.len() <= NUM_OF_INDICES_IN_SHARD {
542 return Ok(());
543 }
544
545 let num_full_shards = list.len() / NUM_OF_INDICES_IN_SHARD;
546
547 let shards_to_flush = if list.len().is_multiple_of(NUM_OF_INDICES_IN_SHARD) {
550 num_full_shards - 1
551 } else {
552 num_full_shards
553 };
554
555 if shards_to_flush == 0 {
556 return Ok(());
557 }
558
559 let flush_len = shards_to_flush * NUM_OF_INDICES_IN_SHARD;
561 let remainder = list.split_off(flush_len);
562
563 for chunk in list.chunks(NUM_OF_INDICES_IN_SHARD) {
565 let highest = *chunk.last().expect("chunk is non-empty");
566 let key = StorageShardedKey::new(address, storage_key, highest);
567 let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
568
569 if append_only {
570 writer.append_storage_history(key, &value)?;
571 } else {
572 writer.upsert_storage_history(key, &value)?;
573 }
574 }
575
576 *list = remainder;
578 Ok(())
579}
580
581fn flush_storage_history_shards<N, CURSOR>(
587 address: Address,
588 storage_key: B256,
589 list: &mut Vec<u64>,
590 append_only: bool,
591 writer: &mut EitherWriter<'_, CURSOR, N>,
592) -> Result<(), StageError>
593where
594 N: NodePrimitives,
595 CURSOR: DbCursorRW<reth_db_api::tables::StoragesHistory>
596 + DbCursorRO<reth_db_api::tables::StoragesHistory>,
597{
598 if list.is_empty() {
599 return Ok(());
600 }
601
602 let num_chunks = list.len().div_ceil(NUM_OF_INDICES_IN_SHARD);
603
604 for (i, chunk) in list.chunks(NUM_OF_INDICES_IN_SHARD).enumerate() {
605 let is_last = i == num_chunks - 1;
606
607 let highest = if is_last { u64::MAX } else { *chunk.last().expect("chunk is non-empty") };
610
611 let key = StorageShardedKey::new(address, storage_key, highest);
612 let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
613
614 if append_only {
615 writer.append_storage_history(key, &value)?;
616 } else {
617 writer.upsert_storage_history(key, &value)?;
618 }
619 }
620
621 list.clear();
622 Ok(())
623}