reth_optimism_cli/commands/
import_receipts.rs

1//! Command that imports OP mainnet receipts from Bedrock datadir, exported via
2//! <https://github.com/testinprod-io/op-geth/pull/1>.
3
4use crate::receipt_file_codec::OpGethReceiptFileCodec;
5use clap::Parser;
6use reth_cli::chainspec::ChainSpecParser;
7use reth_cli_commands::common::{AccessRights, CliNodeTypes, Environment, EnvironmentArgs};
8use reth_db_api::tables;
9use reth_downloaders::{
10    file_client::{ChunkedFileReader, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE},
11    receipt_file_client::ReceiptFileClient,
12};
13use reth_execution_types::ExecutionOutcome;
14use reth_node_builder::ReceiptTy;
15use reth_node_core::version::SHORT_VERSION;
16use reth_optimism_chainspec::OpChainSpec;
17use reth_optimism_primitives::{bedrock::is_dup_tx, OpPrimitives, OpReceipt};
18use reth_primitives_traits::NodePrimitives;
19use reth_provider::{
20    providers::ProviderNodeTypes, writer::UnifiedStorageWriter, DatabaseProviderFactory,
21    OriginalValuesKnown, ProviderFactory, StageCheckpointReader, StageCheckpointWriter,
22    StateWriter, StaticFileProviderFactory, StatsReader, StorageLocation,
23};
24use reth_stages::{StageCheckpoint, StageId};
25use reth_static_file_types::StaticFileSegment;
26use std::{
27    path::{Path, PathBuf},
28    sync::Arc,
29};
30use tracing::{debug, info, trace, warn};
31
32/// Initializes the database with the genesis block.
33#[derive(Debug, Parser)]
34pub struct ImportReceiptsOpCommand<C: ChainSpecParser> {
35    #[command(flatten)]
36    env: EnvironmentArgs<C>,
37
38    /// Chunk byte length to read from file.
39    #[arg(long, value_name = "CHUNK_LEN", verbatim_doc_comment)]
40    chunk_len: Option<u64>,
41
42    /// The path to a receipts file for import. File must use `OpGethReceiptFileCodec` (used for
43    /// exporting OP chain segment below Bedrock block via testinprod/op-geth).
44    ///
45    /// <https://github.com/testinprod-io/op-geth/pull/1>
46    #[arg(value_name = "IMPORT_PATH", verbatim_doc_comment)]
47    path: PathBuf,
48}
49
50impl<C: ChainSpecParser<ChainSpec = OpChainSpec>> ImportReceiptsOpCommand<C> {
51    /// Execute `import` command
52    pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec, Primitives = OpPrimitives>>(
53        self,
54    ) -> eyre::Result<()> {
55        info!(target: "reth::cli", "reth {} starting", SHORT_VERSION);
56
57        debug!(target: "reth::cli",
58            chunk_byte_len=self.chunk_len.unwrap_or(DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE),
59            "Chunking receipts import"
60        );
61
62        let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RW)?;
63
64        import_receipts_from_file(
65            provider_factory,
66            self.path,
67            self.chunk_len,
68            |first_block, receipts| {
69                let mut total_filtered_out_dup_txns = 0;
70                for (index, receipts_for_block) in receipts.iter_mut().enumerate() {
71                    if is_dup_tx(first_block + index as u64) {
72                        receipts_for_block.clear();
73                        total_filtered_out_dup_txns += 1;
74                    }
75                }
76
77                total_filtered_out_dup_txns
78            },
79        )
80        .await
81    }
82}
83
84impl<C: ChainSpecParser> ImportReceiptsOpCommand<C> {
85    /// Returns the underlying chain being used to run this command
86    pub const fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
87        Some(&self.env.chain)
88    }
89}
90
91/// Imports receipts to static files from file in chunks. See [`import_receipts_from_reader`].
92pub async fn import_receipts_from_file<N, P, F>(
93    provider_factory: ProviderFactory<N>,
94    path: P,
95    chunk_len: Option<u64>,
96    filter: F,
97) -> eyre::Result<()>
98where
99    N: ProviderNodeTypes<ChainSpec = OpChainSpec, Primitives: NodePrimitives<Receipt = OpReceipt>>,
100    P: AsRef<Path>,
101    F: FnMut(u64, &mut Vec<Vec<OpReceipt>>) -> usize,
102{
103    for stage in StageId::ALL {
104        let checkpoint = provider_factory.database_provider_ro()?.get_stage_checkpoint(stage)?;
105        trace!(target: "reth::cli",
106            ?stage,
107            ?checkpoint,
108            "Read stage checkpoints from db"
109        );
110    }
111
112    // open file
113    let reader = ChunkedFileReader::new(&path, chunk_len).await?;
114
115    // import receipts
116    let _ = import_receipts_from_reader(&provider_factory, reader, filter).await?;
117
118    info!(target: "reth::cli",
119        "Receipt file imported"
120    );
121
122    Ok(())
123}
124
125/// Imports receipts to static files. Takes a filter callback as parameter, that returns the total
126/// number of filtered out receipts.
127///
128/// Caution! Filter callback must replace completely filtered out receipts for a block, with empty
129/// vectors, rather than `vec!(None)`. This is since the code for writing to static files, expects
130/// indices in the receipts list, to map to sequential block numbers.
131pub async fn import_receipts_from_reader<N, F>(
132    provider_factory: &ProviderFactory<N>,
133    mut reader: ChunkedFileReader,
134    mut filter: F,
135) -> eyre::Result<ImportReceiptsResult>
136where
137    N: ProviderNodeTypes<Primitives: NodePrimitives<Receipt = OpReceipt>>,
138    F: FnMut(u64, &mut Vec<Vec<ReceiptTy<N>>>) -> usize,
139{
140    let static_file_provider = provider_factory.static_file_provider();
141
142    // Ensure that receipts hasn't been initialized apart from `init_genesis`.
143    if let Some(num_receipts) =
144        static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts)
145    {
146        if num_receipts > 0 {
147            eyre::bail!("Expected no receipts in storage, but found {num_receipts}.");
148        }
149    }
150    match static_file_provider.get_highest_static_file_block(StaticFileSegment::Receipts) {
151        Some(receipts_block) => {
152            if receipts_block > 0 {
153                eyre::bail!("Expected highest receipt block to be 0, but found {receipts_block}.");
154            }
155        }
156        None => {
157            eyre::bail!(
158                "Receipts was not initialized. Please import blocks and transactions before calling this command."
159            );
160        }
161    }
162
163    let provider = provider_factory.database_provider_rw()?;
164    let mut total_decoded_receipts = 0;
165    let mut total_receipts = 0;
166    let mut total_filtered_out_dup_txns = 0;
167    let mut highest_block_receipts = 0;
168
169    let highest_block_transactions = static_file_provider
170        .get_highest_static_file_block(StaticFileSegment::Transactions)
171        .expect("transaction static files must exist before importing receipts");
172
173    while let Some(file_client) =
174        reader.next_receipts_chunk::<ReceiptFileClient<OpGethReceiptFileCodec<OpReceipt>>>().await?
175    {
176        if highest_block_receipts == highest_block_transactions {
177            warn!(target: "reth::cli",  highest_block_receipts, highest_block_transactions, "Ignoring all other blocks in the file since we have reached the desired height");
178            break
179        }
180
181        // create a new file client from chunk read from file
182        let ReceiptFileClient {
183            mut receipts,
184            mut first_block,
185            total_receipts: total_receipts_chunk,
186            ..
187        } = file_client;
188
189        // mark these as decoded
190        total_decoded_receipts += total_receipts_chunk;
191
192        total_filtered_out_dup_txns += filter(first_block, &mut receipts);
193
194        info!(target: "reth::cli",
195            first_receipts_block=?first_block,
196            total_receipts_chunk,
197            "Importing receipt file chunk"
198        );
199
200        // It is possible for the first receipt returned by the file client to be the genesis
201        // block. In this case, we just prepend empty receipts to the current list of receipts.
202        // When initially writing to static files, the provider expects the first block to be block
203        // one. So, if the first block returned by the file client is the genesis block, we remove
204        // those receipts.
205        if first_block == 0 {
206            // remove the first empty receipts
207            let genesis_receipts = receipts.remove(0);
208            debug_assert!(genesis_receipts.is_empty());
209            // this ensures the execution outcome and static file producer start at block 1
210            first_block = 1;
211        }
212        highest_block_receipts = first_block + receipts.len() as u64 - 1;
213
214        // RLP file may have too many blocks. We ignore the excess, but warn the user.
215        if highest_block_receipts > highest_block_transactions {
216            let excess = highest_block_receipts - highest_block_transactions;
217            highest_block_receipts -= excess;
218
219            // Remove the last `excess` blocks
220            receipts.truncate(receipts.len() - excess as usize);
221
222            warn!(target: "reth::cli", highest_block_receipts, "Too many decoded blocks, ignoring the last {excess}.");
223        }
224
225        // Update total_receipts after all filtering
226        total_receipts += receipts.iter().map(|v| v.len()).sum::<usize>();
227
228        // We're reusing receipt writing code internal to
229        // `UnifiedStorageWriter::append_receipts_from_blocks`, so we just use a default empty
230        // `BundleState`.
231        let execution_outcome =
232            ExecutionOutcome::new(Default::default(), receipts, first_block, Default::default());
233
234        // finally, write the receipts
235        provider.write_state(
236            &execution_outcome,
237            OriginalValuesKnown::Yes,
238            StorageLocation::StaticFiles,
239        )?;
240    }
241
242    // Only commit if we have imported as many receipts as the number of transactions.
243    let total_imported_txns = static_file_provider
244        .count_entries::<tables::Transactions>()
245        .expect("transaction static files must exist before importing receipts");
246
247    if total_receipts != total_imported_txns {
248        eyre::bail!(
249            "Number of receipts ({total_receipts}) inconsistent with transactions {total_imported_txns}"
250        )
251    }
252
253    // Only commit if the receipt block height matches the one from transactions.
254    if highest_block_receipts != highest_block_transactions {
255        eyre::bail!(
256            "Receipt block height ({highest_block_receipts}) inconsistent with transactions' {highest_block_transactions}"
257        )
258    }
259
260    // Required or any access-write provider factory will attempt to unwind to 0.
261    provider
262        .save_stage_checkpoint(StageId::Execution, StageCheckpoint::new(highest_block_receipts))?;
263
264    UnifiedStorageWriter::commit(provider)?;
265
266    Ok(ImportReceiptsResult { total_decoded_receipts, total_filtered_out_dup_txns })
267}
268
269/// Result of importing receipts in chunks.
270#[derive(Debug)]
271pub struct ImportReceiptsResult {
272    /// Total decoded receipts.
273    pub total_decoded_receipts: usize,
274    /// Total filtered out receipts.
275    pub total_filtered_out_dup_txns: usize,
276}
277
278#[cfg(test)]
279mod test {
280    use alloy_primitives::hex;
281    use reth_db_common::init::init_genesis;
282    use reth_optimism_chainspec::OP_MAINNET;
283    use reth_optimism_node::OpNode;
284    use reth_provider::test_utils::create_test_provider_factory_with_node_types;
285    use reth_stages::test_utils::TestStageDB;
286    use tempfile::tempfile;
287    use tokio::{
288        fs::File,
289        io::{AsyncSeekExt, AsyncWriteExt, SeekFrom},
290    };
291
292    use crate::receipt_file_codec::test::{
293        HACK_RECEIPT_ENCODED_BLOCK_1, HACK_RECEIPT_ENCODED_BLOCK_2, HACK_RECEIPT_ENCODED_BLOCK_3,
294    };
295
296    use super::*;
297
298    /// No receipts for genesis block
299    const EMPTY_RECEIPTS_GENESIS_BLOCK: &[u8] = &hex!("c0");
300
301    #[ignore]
302    #[tokio::test]
303    async fn filter_out_genesis_block_receipts() {
304        let mut f: File = tempfile().unwrap().into();
305        f.write_all(EMPTY_RECEIPTS_GENESIS_BLOCK).await.unwrap();
306        f.write_all(HACK_RECEIPT_ENCODED_BLOCK_1).await.unwrap();
307        f.write_all(HACK_RECEIPT_ENCODED_BLOCK_2).await.unwrap();
308        f.write_all(HACK_RECEIPT_ENCODED_BLOCK_3).await.unwrap();
309        f.flush().await.unwrap();
310        f.seek(SeekFrom::Start(0)).await.unwrap();
311
312        let reader =
313            ChunkedFileReader::from_file(f, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE).await.unwrap();
314
315        let db = TestStageDB::default();
316        init_genesis(&db.factory).unwrap();
317
318        // todo: where does import command init receipts ? probably somewhere in pipeline
319        let provider_factory =
320            create_test_provider_factory_with_node_types::<OpNode>(OP_MAINNET.clone());
321        let ImportReceiptsResult { total_decoded_receipts, total_filtered_out_dup_txns } =
322            import_receipts_from_reader(&provider_factory, reader, |_, _| 0).await.unwrap();
323
324        assert_eq!(total_decoded_receipts, 3);
325        assert_eq!(total_filtered_out_dup_txns, 0);
326    }
327}