1use alloy_primitives::{BlockNumber, TxNumber};
3use reth_config::config::EtlConfig;
4use reth_db_api::{
5    cursor::{DbCursorRO, DbCursorRW},
6    models::sharded_key::NUM_OF_INDICES_IN_SHARD,
7    table::{Decompress, Table},
8    transaction::{DbTx, DbTxMut},
9    BlockNumberList, DatabaseError,
10};
11use reth_etl::Collector;
12use reth_provider::{
13    providers::StaticFileProvider, BlockReader, DBProvider, ProviderError,
14    StaticFileProviderFactory,
15};
16use reth_stages_api::StageError;
17use reth_static_file_types::StaticFileSegment;
18use std::{collections::HashMap, hash::Hash, ops::RangeBounds};
19use tracing::info;
20
21const DEFAULT_CACHE_THRESHOLD: u64 = 100_000;
23
24pub(crate) fn collect_history_indices<Provider, CS, H, P>(
42    provider: &Provider,
43    range: impl RangeBounds<CS::Key>,
44    sharded_key_factory: impl Fn(P, BlockNumber) -> H::Key,
45    partial_key_factory: impl Fn((CS::Key, CS::Value)) -> (u64, P),
46    etl_config: &EtlConfig,
47) -> Result<Collector<H::Key, H::Value>, StageError>
48where
49    Provider: DBProvider,
50    CS: Table,
51    H: Table<Value = BlockNumberList>,
52    P: Copy + Eq + Hash,
53{
54    let mut changeset_cursor = provider.tx_ref().cursor_read::<CS>()?;
55
56    let mut collector = Collector::new(etl_config.file_size, etl_config.dir.clone());
57    let mut cache: HashMap<P, Vec<u64>> = HashMap::default();
58
59    let mut collect = |cache: &HashMap<P, Vec<u64>>| {
60        for (key, indices) in cache {
61            let last = indices.last().expect("qed");
62            collector.insert(
63                sharded_key_factory(*key, *last),
64                BlockNumberList::new_pre_sorted(indices.iter().copied()),
65            )?;
66        }
67        Ok::<(), StageError>(())
68    };
69
70    let total_changesets = provider.tx_ref().entries::<CS>()?;
72    let interval = (total_changesets / 1000).max(1);
73
74    let mut flush_counter = 0;
75    let mut current_block_number = u64::MAX;
76    for (idx, entry) in changeset_cursor.walk_range(range)?.enumerate() {
77        let (block_number, key) = partial_key_factory(entry?);
78        cache.entry(key).or_default().push(block_number);
79
80        if idx > 0 && idx.is_multiple_of(interval) && total_changesets > 1000 {
81            info!(target: "sync::stages::index_history", progress = %format!("{:.4}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices");
82        }
83
84        if current_block_number != block_number {
86            current_block_number = block_number;
87            flush_counter += 1;
88            if flush_counter > DEFAULT_CACHE_THRESHOLD {
89                collect(&cache)?;
90                cache.clear();
91                flush_counter = 0;
92            }
93        }
94    }
95    collect(&cache)?;
96
97    Ok(collector)
98}
99
100pub(crate) fn load_history_indices<Provider, H, P>(
109    provider: &Provider,
110    mut collector: Collector<H::Key, H::Value>,
111    append_only: bool,
112    sharded_key_factory: impl Clone + Fn(P, u64) -> <H as Table>::Key,
113    decode_key: impl Fn(Vec<u8>) -> Result<<H as Table>::Key, DatabaseError>,
114    get_partial: impl Fn(<H as Table>::Key) -> P,
115) -> Result<(), StageError>
116where
117    Provider: DBProvider<Tx: DbTxMut>,
118    H: Table<Value = BlockNumberList>,
119    P: Copy + Default + Eq,
120{
121    let mut write_cursor = provider.tx_ref().cursor_write::<H>()?;
122    let mut current_partial = P::default();
123    let mut current_list = Vec::<u64>::new();
124
125    let total_entries = collector.len();
127    let interval = (total_entries / 10).max(1);
128
129    for (index, element) in collector.iter()?.enumerate() {
130        let (k, v) = element?;
131        let sharded_key = decode_key(k)?;
132        let new_list = BlockNumberList::decompress_owned(v)?;
133
134        if index > 0 && index.is_multiple_of(interval) && total_entries > 10 {
135            info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices");
136        }
137
138        let partial_key = get_partial(sharded_key);
141
142        if current_partial != partial_key {
143            load_indices(
146                &mut write_cursor,
147                current_partial,
148                &mut current_list,
149                &sharded_key_factory,
150                append_only,
151                LoadMode::Flush,
152            )?;
153
154            current_partial = partial_key;
155            current_list.clear();
156
157            if !append_only &&
160                let Some((_, last_database_shard)) =
161                    write_cursor.seek_exact(sharded_key_factory(current_partial, u64::MAX))?
162            {
163                current_list.extend(last_database_shard.iter());
164            }
165        }
166
167        current_list.extend(new_list.iter());
168        load_indices(
169            &mut write_cursor,
170            current_partial,
171            &mut current_list,
172            &sharded_key_factory,
173            append_only,
174            LoadMode::KeepLast,
175        )?;
176    }
177
178    load_indices(
180        &mut write_cursor,
181        current_partial,
182        &mut current_list,
183        &sharded_key_factory,
184        append_only,
185        LoadMode::Flush,
186    )?;
187
188    Ok(())
189}
190
191pub(crate) fn load_indices<H, C, P>(
193    cursor: &mut C,
194    partial_key: P,
195    list: &mut Vec<BlockNumber>,
196    sharded_key_factory: &impl Fn(P, BlockNumber) -> <H as Table>::Key,
197    append_only: bool,
198    mode: LoadMode,
199) -> Result<(), StageError>
200where
201    C: DbCursorRO<H> + DbCursorRW<H>,
202    H: Table<Value = BlockNumberList>,
203    P: Copy,
204{
205    if list.len() > NUM_OF_INDICES_IN_SHARD || mode.is_flush() {
206        let chunks = list
207            .chunks(NUM_OF_INDICES_IN_SHARD)
208            .map(|chunks| chunks.to_vec())
209            .collect::<Vec<Vec<u64>>>();
210
211        let mut iter = chunks.into_iter().peekable();
212        while let Some(chunk) = iter.next() {
213            let mut highest = *chunk.last().expect("at least one index");
214
215            if !mode.is_flush() && iter.peek().is_none() {
216                *list = chunk;
217            } else {
218                if iter.peek().is_none() {
219                    highest = u64::MAX;
220                }
221                let key = sharded_key_factory(partial_key, highest);
222                let value = BlockNumberList::new_pre_sorted(chunk);
223
224                if append_only {
225                    cursor.append(key, &value)?;
226                } else {
227                    cursor.upsert(key, &value)?;
228                }
229            }
230        }
231    }
232
233    Ok(())
234}
235
236pub(crate) enum LoadMode {
238    KeepLast,
240    Flush,
242}
243
244impl LoadMode {
245    const fn is_flush(&self) -> bool {
246        matches!(self, Self::Flush)
247    }
248}
249
250pub(crate) fn missing_static_data_error<Provider>(
253    last_tx_num: TxNumber,
254    static_file_provider: &StaticFileProvider<Provider::Primitives>,
255    provider: &Provider,
256    segment: StaticFileSegment,
257) -> Result<StageError, ProviderError>
258where
259    Provider: BlockReader + StaticFileProviderFactory,
260{
261    let mut last_block =
262        static_file_provider.get_highest_static_file_block(segment).unwrap_or_default();
263
264    loop {
267        if let Some(indices) = provider.block_body_indices(last_block)? &&
268            indices.last_tx_num() <= last_tx_num
269        {
270            break
271        }
272        if last_block == 0 {
273            break
274        }
275        last_block -= 1;
276    }
277
278    let missing_block = Box::new(provider.sealed_header(last_block + 1)?.unwrap_or_default());
279
280    Ok(StageError::MissingStaticFileData {
281        block: Box::new(missing_block.block_with_parent()),
282        segment,
283    })
284}