reth_optimism_cli/commands/
import_receipts.rs

1//! Command that imports OP mainnet receipts from Bedrock datadir, exported via
2//! <https://github.com/testinprod-io/op-geth/pull/1>.
3
4use std::path::{Path, PathBuf};
5
6use clap::Parser;
7use reth_cli::chainspec::ChainSpecParser;
8use reth_cli_commands::common::{AccessRights, CliNodeTypes, Environment, EnvironmentArgs};
9use reth_db_api::tables;
10use reth_downloaders::{
11    file_client::{ChunkedFileReader, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE},
12    receipt_file_client::ReceiptFileClient,
13};
14use reth_execution_types::ExecutionOutcome;
15use reth_node_builder::ReceiptTy;
16use reth_node_core::version::SHORT_VERSION;
17use reth_optimism_chainspec::OpChainSpec;
18use reth_optimism_primitives::{bedrock::is_dup_tx, OpPrimitives, OpReceipt};
19use reth_primitives::NodePrimitives;
20use reth_provider::{
21    providers::ProviderNodeTypes, writer::UnifiedStorageWriter, DatabaseProviderFactory,
22    OriginalValuesKnown, ProviderFactory, StageCheckpointReader, StageCheckpointWriter,
23    StateWriter, StaticFileProviderFactory, StatsReader, StorageLocation,
24};
25use reth_stages::{StageCheckpoint, StageId};
26use reth_static_file_types::StaticFileSegment;
27use tracing::{debug, info, trace, warn};
28
29use crate::receipt_file_codec::OpGethReceiptFileCodec;
30
31/// Initializes the database with the genesis block.
32#[derive(Debug, Parser)]
33pub struct ImportReceiptsOpCommand<C: ChainSpecParser> {
34    #[command(flatten)]
35    env: EnvironmentArgs<C>,
36
37    /// Chunk byte length to read from file.
38    #[arg(long, value_name = "CHUNK_LEN", verbatim_doc_comment)]
39    chunk_len: Option<u64>,
40
41    /// The path to a receipts file for import. File must use `OpGethReceiptFileCodec` (used for
42    /// exporting OP chain segment below Bedrock block via testinprod/op-geth).
43    ///
44    /// <https://github.com/testinprod-io/op-geth/pull/1>
45    #[arg(value_name = "IMPORT_PATH", verbatim_doc_comment)]
46    path: PathBuf,
47}
48
49impl<C: ChainSpecParser<ChainSpec = OpChainSpec>> ImportReceiptsOpCommand<C> {
50    /// Execute `import` command
51    pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec, Primitives = OpPrimitives>>(
52        self,
53    ) -> eyre::Result<()> {
54        info!(target: "reth::cli", "reth {} starting", SHORT_VERSION);
55
56        debug!(target: "reth::cli",
57            chunk_byte_len=self.chunk_len.unwrap_or(DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE),
58            "Chunking receipts import"
59        );
60
61        let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RW)?;
62
63        import_receipts_from_file(
64            provider_factory,
65            self.path,
66            self.chunk_len,
67            |first_block, receipts| {
68                let mut total_filtered_out_dup_txns = 0;
69                for (index, receipts_for_block) in receipts.iter_mut().enumerate() {
70                    if is_dup_tx(first_block + index as u64) {
71                        receipts_for_block.clear();
72                        total_filtered_out_dup_txns += 1;
73                    }
74                }
75
76                total_filtered_out_dup_txns
77            },
78        )
79        .await
80    }
81}
82
83/// Imports receipts to static files from file in chunks. See [`import_receipts_from_reader`].
84pub async fn import_receipts_from_file<N, P, F>(
85    provider_factory: ProviderFactory<N>,
86    path: P,
87    chunk_len: Option<u64>,
88    filter: F,
89) -> eyre::Result<()>
90where
91    N: ProviderNodeTypes<ChainSpec = OpChainSpec, Primitives: NodePrimitives<Receipt = OpReceipt>>,
92    P: AsRef<Path>,
93    F: FnMut(u64, &mut Vec<Vec<OpReceipt>>) -> usize,
94{
95    for stage in StageId::ALL {
96        let checkpoint = provider_factory.database_provider_ro()?.get_stage_checkpoint(stage)?;
97        trace!(target: "reth::cli",
98            ?stage,
99            ?checkpoint,
100            "Read stage checkpoints from db"
101        );
102    }
103
104    // open file
105    let reader = ChunkedFileReader::new(&path, chunk_len).await?;
106
107    // import receipts
108    let _ = import_receipts_from_reader(&provider_factory, reader, filter).await?;
109
110    info!(target: "reth::cli",
111        "Receipt file imported"
112    );
113
114    Ok(())
115}
116
117/// Imports receipts to static files. Takes a filter callback as parameter, that returns the total
118/// number of filtered out receipts.
119///
120/// Caution! Filter callback must replace completely filtered out receipts for a block, with empty
121/// vectors, rather than `vec!(None)`. This is since the code for writing to static files, expects
122/// indices in the receipts list, to map to sequential block numbers.
123pub async fn import_receipts_from_reader<N, F>(
124    provider_factory: &ProviderFactory<N>,
125    mut reader: ChunkedFileReader,
126    mut filter: F,
127) -> eyre::Result<ImportReceiptsResult>
128where
129    N: ProviderNodeTypes<Primitives: NodePrimitives<Receipt = OpReceipt>>,
130    F: FnMut(u64, &mut Vec<Vec<ReceiptTy<N>>>) -> usize,
131{
132    let static_file_provider = provider_factory.static_file_provider();
133
134    // Ensure that receipts hasn't been initialized apart from `init_genesis`.
135    if let Some(num_receipts) =
136        static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts)
137    {
138        if num_receipts > 0 {
139            eyre::bail!("Expected no receipts in storage, but found {num_receipts}.");
140        }
141    }
142    match static_file_provider.get_highest_static_file_block(StaticFileSegment::Receipts) {
143        Some(receipts_block) => {
144            if receipts_block > 0 {
145                eyre::bail!("Expected highest receipt block to be 0, but found {receipts_block}.");
146            }
147        }
148        None => {
149            eyre::bail!("Receipts was not initialized. Please import blocks and transactions before calling this command.");
150        }
151    }
152
153    let provider = provider_factory.database_provider_rw()?;
154    let mut total_decoded_receipts = 0;
155    let mut total_receipts = 0;
156    let mut total_filtered_out_dup_txns = 0;
157    let mut highest_block_receipts = 0;
158
159    let highest_block_transactions = static_file_provider
160        .get_highest_static_file_block(StaticFileSegment::Transactions)
161        .expect("transaction static files must exist before importing receipts");
162
163    while let Some(file_client) =
164        reader.next_receipts_chunk::<ReceiptFileClient<OpGethReceiptFileCodec<OpReceipt>>>().await?
165    {
166        if highest_block_receipts == highest_block_transactions {
167            warn!(target: "reth::cli",  highest_block_receipts, highest_block_transactions, "Ignoring all other blocks in the file since we have reached the desired height");
168            break
169        }
170
171        // create a new file client from chunk read from file
172        let ReceiptFileClient {
173            mut receipts,
174            mut first_block,
175            total_receipts: total_receipts_chunk,
176            ..
177        } = file_client;
178
179        // mark these as decoded
180        total_decoded_receipts += total_receipts_chunk;
181
182        total_filtered_out_dup_txns += filter(first_block, &mut receipts);
183
184        info!(target: "reth::cli",
185            first_receipts_block=?first_block,
186            total_receipts_chunk,
187            "Importing receipt file chunk"
188        );
189
190        // It is possible for the first receipt returned by the file client to be the genesis
191        // block. In this case, we just prepend empty receipts to the current list of receipts.
192        // When initially writing to static files, the provider expects the first block to be block
193        // one. So, if the first block returned by the file client is the genesis block, we remove
194        // those receipts.
195        if first_block == 0 {
196            // remove the first empty receipts
197            let genesis_receipts = receipts.remove(0);
198            debug_assert!(genesis_receipts.is_empty());
199            // this ensures the execution outcome and static file producer start at block 1
200            first_block = 1;
201        }
202        highest_block_receipts = first_block + receipts.len() as u64 - 1;
203
204        // RLP file may have too many blocks. We ignore the excess, but warn the user.
205        if highest_block_receipts > highest_block_transactions {
206            let excess = highest_block_receipts - highest_block_transactions;
207            highest_block_receipts -= excess;
208
209            // Remove the last `excess` blocks
210            receipts.truncate(receipts.len() - excess as usize);
211
212            warn!(target: "reth::cli", highest_block_receipts, "Too many decoded blocks, ignoring the last {excess}.");
213        }
214
215        // Update total_receipts after all filtering
216        total_receipts += receipts.iter().map(|v| v.len()).sum::<usize>();
217
218        // We're reusing receipt writing code internal to
219        // `UnifiedStorageWriter::append_receipts_from_blocks`, so we just use a default empty
220        // `BundleState`.
221        let execution_outcome =
222            ExecutionOutcome::new(Default::default(), receipts, first_block, Default::default());
223
224        // finally, write the receipts
225        provider.write_state(
226            &execution_outcome,
227            OriginalValuesKnown::Yes,
228            StorageLocation::StaticFiles,
229        )?;
230    }
231
232    // Only commit if we have imported as many receipts as the number of transactions.
233    let total_imported_txns = static_file_provider
234        .count_entries::<tables::Transactions>()
235        .expect("transaction static files must exist before importing receipts");
236
237    if total_receipts != total_imported_txns {
238        eyre::bail!("Number of receipts ({total_receipts}) inconsistent with transactions {total_imported_txns}")
239    }
240
241    // Only commit if the receipt block height matches the one from transactions.
242    if highest_block_receipts != highest_block_transactions {
243        eyre::bail!("Receipt block height ({highest_block_receipts}) inconsistent with transactions' {highest_block_transactions}")
244    }
245
246    // Required or any access-write provider factory will attempt to unwind to 0.
247    provider
248        .save_stage_checkpoint(StageId::Execution, StageCheckpoint::new(highest_block_receipts))?;
249
250    UnifiedStorageWriter::commit(provider)?;
251
252    Ok(ImportReceiptsResult { total_decoded_receipts, total_filtered_out_dup_txns })
253}
254
255/// Result of importing receipts in chunks.
256#[derive(Debug)]
257pub struct ImportReceiptsResult {
258    /// Total decoded receipts.
259    pub total_decoded_receipts: usize,
260    /// Total filtered out receipts.
261    pub total_filtered_out_dup_txns: usize,
262}
263
264#[cfg(test)]
265mod test {
266    use alloy_primitives::hex;
267    use reth_db_common::init::init_genesis;
268    use reth_optimism_chainspec::OP_MAINNET;
269    use reth_optimism_node::OpNode;
270    use reth_provider::test_utils::create_test_provider_factory_with_node_types;
271    use reth_stages::test_utils::TestStageDB;
272    use tempfile::tempfile;
273    use tokio::{
274        fs::File,
275        io::{AsyncSeekExt, AsyncWriteExt, SeekFrom},
276    };
277
278    use crate::receipt_file_codec::test::{
279        HACK_RECEIPT_ENCODED_BLOCK_1, HACK_RECEIPT_ENCODED_BLOCK_2, HACK_RECEIPT_ENCODED_BLOCK_3,
280    };
281
282    use super::*;
283
284    /// No receipts for genesis block
285    const EMPTY_RECEIPTS_GENESIS_BLOCK: &[u8] = &hex!("c0");
286
287    #[ignore]
288    #[tokio::test]
289    async fn filter_out_genesis_block_receipts() {
290        let mut f: File = tempfile().unwrap().into();
291        f.write_all(EMPTY_RECEIPTS_GENESIS_BLOCK).await.unwrap();
292        f.write_all(HACK_RECEIPT_ENCODED_BLOCK_1).await.unwrap();
293        f.write_all(HACK_RECEIPT_ENCODED_BLOCK_2).await.unwrap();
294        f.write_all(HACK_RECEIPT_ENCODED_BLOCK_3).await.unwrap();
295        f.flush().await.unwrap();
296        f.seek(SeekFrom::Start(0)).await.unwrap();
297
298        let reader =
299            ChunkedFileReader::from_file(f, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE).await.unwrap();
300
301        let db = TestStageDB::default();
302        init_genesis(&db.factory).unwrap();
303
304        // todo: where does import command init receipts ? probably somewhere in pipeline
305        let provider_factory =
306            create_test_provider_factory_with_node_types::<OpNode>(OP_MAINNET.clone());
307        let ImportReceiptsResult { total_decoded_receipts, total_filtered_out_dup_txns } =
308            import_receipts_from_reader(&provider_factory, reader, |_, _| 0).await.unwrap();
309
310        assert_eq!(total_decoded_receipts, 3);
311        assert_eq!(total_filtered_out_dup_txns, 0);
312    }
313}