1use alloy_primitives::{map::AddressMap, Address, BlockNumber, TxNumber, B256};
3use reth_config::config::EtlConfig;
4use reth_db_api::{
5 cursor::{DbCursorRO, DbCursorRW},
6 models::{
7 sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey,
8 AccountBeforeTx, AddressStorageKey, BlockNumberAddress, ShardedKey,
9 },
10 table::{Decode, Decompress, Table},
11 transaction::DbTx,
12 BlockNumberList,
13};
14use reth_etl::Collector;
15use reth_primitives_traits::NodePrimitives;
16use reth_provider::{
17 providers::StaticFileProvider, to_range, BlockReader, DBProvider, EitherWriter, ProviderError,
18 StaticFileProviderFactory,
19};
20use reth_stages_api::StageError;
21use reth_static_file_types::StaticFileSegment;
22use reth_storage_api::{ChangeSetReader, StorageChangeSetReader};
23use std::{collections::HashMap, hash::Hash, ops::RangeBounds};
24use tracing::info;
25
26const DEFAULT_CACHE_THRESHOLD: u64 = 100_000;
28
29pub(crate) fn collect_history_indices<Provider, CS, H, P>(
47 provider: &Provider,
48 range: impl RangeBounds<CS::Key>,
49 sharded_key_factory: impl Fn(P, BlockNumber) -> H::Key,
50 partial_key_factory: impl Fn((CS::Key, CS::Value)) -> (u64, P),
51 etl_config: &EtlConfig,
52) -> Result<Collector<H::Key, H::Value>, StageError>
53where
54 Provider: DBProvider,
55 CS: Table,
56 H: Table<Value = BlockNumberList>,
57 P: Copy + Eq + Hash,
58{
59 let mut changeset_cursor = provider.tx_ref().cursor_read::<CS>()?;
60
61 let mut collector = Collector::new(etl_config.file_size, etl_config.dir.clone());
62 let mut cache: HashMap<P, Vec<u64>> = HashMap::default();
63
64 let mut collect = |cache: &mut HashMap<P, Vec<u64>>| {
65 for (key, indices) in cache.drain() {
66 let last = *indices.last().expect("qed");
67 collector.insert(
68 sharded_key_factory(key, last),
69 BlockNumberList::new_pre_sorted(indices.into_iter()),
70 )?;
71 }
72 Ok::<(), StageError>(())
73 };
74
75 let total_changesets = provider.tx_ref().entries::<CS>()?;
77 let interval = (total_changesets / 1000).max(1);
78
79 let mut flush_counter = 0;
80 let mut current_block_number = u64::MAX;
81 for (idx, entry) in changeset_cursor.walk_range(range)?.enumerate() {
82 let (block_number, key) = partial_key_factory(entry?);
83 cache.entry(key).or_default().push(block_number);
84
85 if idx > 0 && idx.is_multiple_of(interval) && total_changesets > 1000 {
86 info!(target: "sync::stages::index_history", progress = %format!("{:.4}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices");
87 }
88
89 if current_block_number != block_number {
91 current_block_number = block_number;
92 flush_counter += 1;
93 if flush_counter > DEFAULT_CACHE_THRESHOLD {
94 collect(&mut cache)?;
95 flush_counter = 0;
96 }
97 }
98 }
99 collect(&mut cache)?;
100
101 Ok(collector)
102}
103
104fn collect_indices<K, F>(
106 cache: impl Iterator<Item = (K, Vec<u64>)>,
107 mut insert_fn: F,
108) -> Result<(), StageError>
109where
110 F: FnMut(K, Vec<u64>) -> Result<(), StageError>,
111{
112 for (key, indices) in cache {
113 insert_fn(key, indices)?
114 }
115 Ok(())
116}
117
118pub(crate) fn collect_account_history_indices<Provider>(
120 provider: &Provider,
121 range: impl RangeBounds<BlockNumber>,
122 etl_config: &EtlConfig,
123) -> Result<Collector<ShardedKey<Address>, BlockNumberList>, StageError>
124where
125 Provider: DBProvider + ChangeSetReader + StaticFileProviderFactory,
126{
127 let mut collector = Collector::new(etl_config.file_size, etl_config.dir.clone());
128 let mut cache: AddressMap<Vec<u64>> = AddressMap::default();
129
130 let mut insert_fn = |address: Address, indices: Vec<u64>| {
131 let last = indices.last().expect("indices is non-empty");
132 collector.insert(
133 ShardedKey::new(address, *last),
134 BlockNumberList::new_pre_sorted(indices.into_iter()),
135 )?;
136 Ok(())
137 };
138
139 let range = to_range(range);
141
142 let static_file_provider = provider.static_file_provider();
144
145 let total_changesets = static_file_provider.account_changeset_count()?;
147 let interval = (total_changesets / 1000).max(1);
148
149 let walker = static_file_provider.walk_account_changeset_range(range);
150
151 let mut flush_counter = 0;
152 let mut current_block_number = u64::MAX;
153
154 for (idx, changeset_result) in walker.enumerate() {
155 let (block_number, AccountBeforeTx { address, .. }) = changeset_result?;
156 cache.entry(address).or_default().push(block_number);
157
158 if idx > 0 && idx % interval == 0 && total_changesets > 1000 {
159 info!(target: "sync::stages::index_history", progress = %format!("{:.4}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices");
160 }
161
162 if block_number != current_block_number {
163 current_block_number = block_number;
164 flush_counter += 1;
165 }
166
167 if flush_counter > DEFAULT_CACHE_THRESHOLD {
168 collect_indices(cache.drain(), &mut insert_fn)?;
169 flush_counter = 0;
170 }
171 }
172 collect_indices(cache.into_iter(), insert_fn)?;
173
174 Ok(collector)
175}
176
177pub(crate) fn collect_storage_history_indices<Provider>(
179 provider: &Provider,
180 range: impl RangeBounds<BlockNumber>,
181 etl_config: &EtlConfig,
182) -> Result<Collector<StorageShardedKey, BlockNumberList>, StageError>
183where
184 Provider: DBProvider + StorageChangeSetReader + StaticFileProviderFactory,
185{
186 let mut collector = Collector::new(etl_config.file_size, etl_config.dir.clone());
187 let mut cache: HashMap<AddressStorageKey, Vec<u64>> = HashMap::default();
188
189 let mut insert_fn = |key: AddressStorageKey, indices: Vec<u64>| {
190 let last = indices.last().expect("qed");
191 collector.insert(
192 StorageShardedKey::new(key.0 .0, key.0 .1, *last),
193 BlockNumberList::new_pre_sorted(indices.into_iter()),
194 )?;
195 Ok::<(), StageError>(())
196 };
197
198 let range = to_range(range);
199 let static_file_provider = provider.static_file_provider();
200
201 let total_changesets = static_file_provider.storage_changeset_count()?;
202 let interval = (total_changesets / 1000).max(1);
203
204 let walker = static_file_provider.walk_storage_changeset_range(range);
205
206 let mut flush_counter = 0;
207 let mut current_block_number = u64::MAX;
208
209 for (idx, changeset_result) in walker.enumerate() {
210 let (BlockNumberAddress((block_number, address)), storage) = changeset_result?;
211 cache
212 .entry(AddressStorageKey((address, storage.key.as_b256())))
213 .or_default()
214 .push(block_number);
215
216 if idx > 0 && idx % interval == 0 && total_changesets > 1000 {
217 info!(target: "sync::stages::index_history", progress = %format!("{:.4}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices");
218 }
219
220 if block_number != current_block_number {
221 current_block_number = block_number;
222 flush_counter += 1;
223 }
224
225 if flush_counter > DEFAULT_CACHE_THRESHOLD {
226 collect_indices(cache.drain(), &mut insert_fn)?;
227 flush_counter = 0;
228 }
229 }
230
231 collect_indices(cache.into_iter(), insert_fn)?;
232
233 Ok(collector)
234}
235
236pub(crate) fn load_account_history<N, CURSOR>(
248 mut collector: Collector<ShardedKey<Address>, BlockNumberList>,
249 append_only: bool,
250 writer: &mut EitherWriter<'_, CURSOR, N>,
251) -> Result<(), StageError>
252where
253 N: NodePrimitives,
254 CURSOR: DbCursorRW<reth_db_api::tables::AccountsHistory>
255 + DbCursorRO<reth_db_api::tables::AccountsHistory>,
256{
257 let mut current_address: Option<Address> = None;
258 let mut current_list = Vec::<u64>::new();
260
261 let total_entries = collector.len();
262 let interval = (total_entries / 10).max(1);
263
264 for (index, element) in collector.iter()?.enumerate() {
265 let (k, v) = element?;
266 let sharded_key = ShardedKey::<Address>::decode_owned(k)?;
267 let new_list = BlockNumberList::decompress_owned(v)?;
268
269 if index > 0 && index.is_multiple_of(interval) && total_entries > 10 {
270 info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices");
271 }
272
273 let address = sharded_key.key;
274
275 if current_address != Some(address) {
277 if let Some(prev_addr) = current_address {
279 flush_account_history_shards(prev_addr, &mut current_list, append_only, writer)?;
280 }
281
282 current_address = Some(address);
283 current_list.clear();
284
285 if !append_only &&
288 let Some(last_shard) = writer.get_last_account_history_shard(address)?
289 {
290 current_list.extend(last_shard.iter());
291 }
292 }
293
294 current_list.extend(new_list.iter());
296
297 flush_account_history_shards_partial(address, &mut current_list, append_only, writer)?;
299 }
300
301 if let Some(addr) = current_address {
303 flush_account_history_shards(addr, &mut current_list, append_only, writer)?;
304 }
305
306 Ok(())
307}
308
309fn flush_account_history_shards_partial<N, CURSOR>(
315 address: Address,
316 list: &mut Vec<u64>,
317 append_only: bool,
318 writer: &mut EitherWriter<'_, CURSOR, N>,
319) -> Result<(), StageError>
320where
321 N: NodePrimitives,
322 CURSOR: DbCursorRW<reth_db_api::tables::AccountsHistory>
323 + DbCursorRO<reth_db_api::tables::AccountsHistory>,
324{
325 if list.len() <= NUM_OF_INDICES_IN_SHARD {
327 return Ok(());
328 }
329
330 let num_full_shards = list.len() / NUM_OF_INDICES_IN_SHARD;
331
332 let shards_to_flush = if list.len().is_multiple_of(NUM_OF_INDICES_IN_SHARD) {
335 num_full_shards - 1
336 } else {
337 num_full_shards
338 };
339
340 if shards_to_flush == 0 {
341 return Ok(());
342 }
343
344 let flush_len = shards_to_flush * NUM_OF_INDICES_IN_SHARD;
346 let remainder = list.split_off(flush_len);
347
348 for chunk in list.chunks(NUM_OF_INDICES_IN_SHARD) {
350 let highest = *chunk.last().expect("chunk is non-empty");
351 let key = ShardedKey::new(address, highest);
352 let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
353
354 if append_only {
355 writer.append_account_history(key, &value)?;
356 } else {
357 writer.upsert_account_history(key, &value)?;
358 }
359 }
360
361 *list = remainder;
363 Ok(())
364}
365
366fn flush_account_history_shards<N, CURSOR>(
371 address: Address,
372 list: &mut Vec<u64>,
373 append_only: bool,
374 writer: &mut EitherWriter<'_, CURSOR, N>,
375) -> Result<(), StageError>
376where
377 N: NodePrimitives,
378 CURSOR: DbCursorRW<reth_db_api::tables::AccountsHistory>
379 + DbCursorRO<reth_db_api::tables::AccountsHistory>,
380{
381 if list.is_empty() {
382 return Ok(());
383 }
384
385 let num_chunks = list.len().div_ceil(NUM_OF_INDICES_IN_SHARD);
386
387 for (i, chunk) in list.chunks(NUM_OF_INDICES_IN_SHARD).enumerate() {
388 let is_last = i == num_chunks - 1;
389
390 let highest = if is_last { u64::MAX } else { *chunk.last().expect("chunk is non-empty") };
393
394 let key = ShardedKey::new(address, highest);
395 let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
396
397 if append_only {
398 writer.append_account_history(key, &value)?;
399 } else {
400 writer.upsert_account_history(key, &value)?;
401 }
402 }
403
404 list.clear();
405 Ok(())
406}
407
408pub(crate) fn missing_static_data_error<Provider>(
411 last_tx_num: TxNumber,
412 static_file_provider: &StaticFileProvider<Provider::Primitives>,
413 provider: &Provider,
414 segment: StaticFileSegment,
415) -> Result<StageError, ProviderError>
416where
417 Provider: BlockReader + StaticFileProviderFactory,
418{
419 let mut last_block =
420 static_file_provider.get_highest_static_file_block(segment).unwrap_or_default();
421
422 loop {
425 if let Some(indices) = provider.block_body_indices(last_block)? &&
426 indices.last_tx_num() <= last_tx_num
427 {
428 break
429 }
430 if last_block == 0 {
431 break
432 }
433 last_block -= 1;
434 }
435
436 let missing_block = Box::new(provider.sealed_header(last_block + 1)?.unwrap_or_default());
437
438 Ok(StageError::MissingStaticFileData {
439 block: Box::new(missing_block.block_with_parent()),
440 segment,
441 })
442}
443
444pub(crate) fn load_storage_history<N, CURSOR>(
456 mut collector: Collector<StorageShardedKey, BlockNumberList>,
457 append_only: bool,
458 writer: &mut EitherWriter<'_, CURSOR, N>,
459) -> Result<(), StageError>
460where
461 N: NodePrimitives,
462 CURSOR: DbCursorRW<reth_db_api::tables::StoragesHistory>
463 + DbCursorRO<reth_db_api::tables::StoragesHistory>,
464{
465 let mut current_key: Option<(Address, B256)> = None;
466 let mut current_list = Vec::<u64>::new();
468
469 let total_entries = collector.len();
470 let interval = (total_entries / 10).max(1);
471
472 for (index, element) in collector.iter()?.enumerate() {
473 let (k, v) = element?;
474 let sharded_key = StorageShardedKey::decode_owned(k)?;
475 let new_list = BlockNumberList::decompress_owned(v)?;
476
477 if index > 0 && index.is_multiple_of(interval) && total_entries > 10 {
478 info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices");
479 }
480
481 let partial_key = (sharded_key.address, sharded_key.sharded_key.key);
482
483 if current_key != Some(partial_key) {
485 if let Some((prev_addr, prev_storage_key)) = current_key {
487 flush_storage_history_shards(
488 prev_addr,
489 prev_storage_key,
490 &mut current_list,
491 append_only,
492 writer,
493 )?;
494 }
495
496 current_key = Some(partial_key);
497 current_list.clear();
498
499 if !append_only &&
502 let Some(last_shard) =
503 writer.get_last_storage_history_shard(partial_key.0, partial_key.1)?
504 {
505 current_list.extend(last_shard.iter());
506 }
507 }
508
509 current_list.extend(new_list.iter());
511
512 flush_storage_history_shards_partial(
514 partial_key.0,
515 partial_key.1,
516 &mut current_list,
517 append_only,
518 writer,
519 )?;
520 }
521
522 if let Some((addr, storage_key)) = current_key {
524 flush_storage_history_shards(addr, storage_key, &mut current_list, append_only, writer)?;
525 }
526
527 Ok(())
528}
529
530fn flush_storage_history_shards_partial<N, CURSOR>(
536 address: Address,
537 storage_key: B256,
538 list: &mut Vec<u64>,
539 append_only: bool,
540 writer: &mut EitherWriter<'_, CURSOR, N>,
541) -> Result<(), StageError>
542where
543 N: NodePrimitives,
544 CURSOR: DbCursorRW<reth_db_api::tables::StoragesHistory>
545 + DbCursorRO<reth_db_api::tables::StoragesHistory>,
546{
547 if list.len() <= NUM_OF_INDICES_IN_SHARD {
549 return Ok(());
550 }
551
552 let num_full_shards = list.len() / NUM_OF_INDICES_IN_SHARD;
553
554 let shards_to_flush = if list.len().is_multiple_of(NUM_OF_INDICES_IN_SHARD) {
557 num_full_shards - 1
558 } else {
559 num_full_shards
560 };
561
562 if shards_to_flush == 0 {
563 return Ok(());
564 }
565
566 let flush_len = shards_to_flush * NUM_OF_INDICES_IN_SHARD;
568 let remainder = list.split_off(flush_len);
569
570 for chunk in list.chunks(NUM_OF_INDICES_IN_SHARD) {
572 let highest = *chunk.last().expect("chunk is non-empty");
573 let key = StorageShardedKey::new(address, storage_key, highest);
574 let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
575
576 if append_only {
577 writer.append_storage_history(key, &value)?;
578 } else {
579 writer.upsert_storage_history(key, &value)?;
580 }
581 }
582
583 *list = remainder;
585 Ok(())
586}
587
588fn flush_storage_history_shards<N, CURSOR>(
594 address: Address,
595 storage_key: B256,
596 list: &mut Vec<u64>,
597 append_only: bool,
598 writer: &mut EitherWriter<'_, CURSOR, N>,
599) -> Result<(), StageError>
600where
601 N: NodePrimitives,
602 CURSOR: DbCursorRW<reth_db_api::tables::StoragesHistory>
603 + DbCursorRO<reth_db_api::tables::StoragesHistory>,
604{
605 if list.is_empty() {
606 return Ok(());
607 }
608
609 let num_chunks = list.len().div_ceil(NUM_OF_INDICES_IN_SHARD);
610
611 for (i, chunk) in list.chunks(NUM_OF_INDICES_IN_SHARD).enumerate() {
612 let is_last = i == num_chunks - 1;
613
614 let highest = if is_last { u64::MAX } else { *chunk.last().expect("chunk is non-empty") };
617
618 let key = StorageShardedKey::new(address, storage_key, highest);
619 let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
620
621 if append_only {
622 writer.append_storage_history(key, &value)?;
623 } else {
624 writer.upsert_storage_history(key, &value)?;
625 }
626 }
627
628 list.clear();
629 Ok(())
630}