reth_optimism_cli/commands/
import_receipts.rs
1use crate::receipt_file_codec::OpGethReceiptFileCodec;
5use clap::Parser;
6use reth_cli::chainspec::ChainSpecParser;
7use reth_cli_commands::common::{AccessRights, CliNodeTypes, Environment, EnvironmentArgs};
8use reth_db_api::tables;
9use reth_downloaders::{
10 file_client::{ChunkedFileReader, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE},
11 receipt_file_client::ReceiptFileClient,
12};
13use reth_execution_types::ExecutionOutcome;
14use reth_node_builder::ReceiptTy;
15use reth_node_core::version::SHORT_VERSION;
16use reth_optimism_chainspec::OpChainSpec;
17use reth_optimism_primitives::{bedrock::is_dup_tx, OpPrimitives, OpReceipt};
18use reth_primitives_traits::NodePrimitives;
19use reth_provider::{
20 providers::ProviderNodeTypes, writer::UnifiedStorageWriter, DatabaseProviderFactory,
21 OriginalValuesKnown, ProviderFactory, StageCheckpointReader, StageCheckpointWriter,
22 StateWriter, StaticFileProviderFactory, StatsReader, StorageLocation,
23};
24use reth_stages::{StageCheckpoint, StageId};
25use reth_static_file_types::StaticFileSegment;
26use std::{
27 path::{Path, PathBuf},
28 sync::Arc,
29};
30use tracing::{debug, info, trace, warn};
31
32#[derive(Debug, Parser)]
34pub struct ImportReceiptsOpCommand<C: ChainSpecParser> {
35 #[command(flatten)]
36 env: EnvironmentArgs<C>,
37
38 #[arg(long, value_name = "CHUNK_LEN", verbatim_doc_comment)]
40 chunk_len: Option<u64>,
41
42 #[arg(value_name = "IMPORT_PATH", verbatim_doc_comment)]
47 path: PathBuf,
48}
49
50impl<C: ChainSpecParser<ChainSpec = OpChainSpec>> ImportReceiptsOpCommand<C> {
51 pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec, Primitives = OpPrimitives>>(
53 self,
54 ) -> eyre::Result<()> {
55 info!(target: "reth::cli", "reth {} starting", SHORT_VERSION);
56
57 debug!(target: "reth::cli",
58 chunk_byte_len=self.chunk_len.unwrap_or(DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE),
59 "Chunking receipts import"
60 );
61
62 let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RW)?;
63
64 import_receipts_from_file(
65 provider_factory,
66 self.path,
67 self.chunk_len,
68 |first_block, receipts| {
69 let mut total_filtered_out_dup_txns = 0;
70 for (index, receipts_for_block) in receipts.iter_mut().enumerate() {
71 if is_dup_tx(first_block + index as u64) {
72 receipts_for_block.clear();
73 total_filtered_out_dup_txns += 1;
74 }
75 }
76
77 total_filtered_out_dup_txns
78 },
79 )
80 .await
81 }
82}
83
84impl<C: ChainSpecParser> ImportReceiptsOpCommand<C> {
85 pub const fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
87 Some(&self.env.chain)
88 }
89}
90
91pub async fn import_receipts_from_file<N, P, F>(
93 provider_factory: ProviderFactory<N>,
94 path: P,
95 chunk_len: Option<u64>,
96 filter: F,
97) -> eyre::Result<()>
98where
99 N: ProviderNodeTypes<ChainSpec = OpChainSpec, Primitives: NodePrimitives<Receipt = OpReceipt>>,
100 P: AsRef<Path>,
101 F: FnMut(u64, &mut Vec<Vec<OpReceipt>>) -> usize,
102{
103 for stage in StageId::ALL {
104 let checkpoint = provider_factory.database_provider_ro()?.get_stage_checkpoint(stage)?;
105 trace!(target: "reth::cli",
106 ?stage,
107 ?checkpoint,
108 "Read stage checkpoints from db"
109 );
110 }
111
112 let reader = ChunkedFileReader::new(&path, chunk_len).await?;
114
115 let _ = import_receipts_from_reader(&provider_factory, reader, filter).await?;
117
118 info!(target: "reth::cli",
119 "Receipt file imported"
120 );
121
122 Ok(())
123}
124
125pub async fn import_receipts_from_reader<N, F>(
132 provider_factory: &ProviderFactory<N>,
133 mut reader: ChunkedFileReader,
134 mut filter: F,
135) -> eyre::Result<ImportReceiptsResult>
136where
137 N: ProviderNodeTypes<Primitives: NodePrimitives<Receipt = OpReceipt>>,
138 F: FnMut(u64, &mut Vec<Vec<ReceiptTy<N>>>) -> usize,
139{
140 let static_file_provider = provider_factory.static_file_provider();
141
142 if let Some(num_receipts) =
144 static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts)
145 {
146 if num_receipts > 0 {
147 eyre::bail!("Expected no receipts in storage, but found {num_receipts}.");
148 }
149 }
150 match static_file_provider.get_highest_static_file_block(StaticFileSegment::Receipts) {
151 Some(receipts_block) => {
152 if receipts_block > 0 {
153 eyre::bail!("Expected highest receipt block to be 0, but found {receipts_block}.");
154 }
155 }
156 None => {
157 eyre::bail!(
158 "Receipts was not initialized. Please import blocks and transactions before calling this command."
159 );
160 }
161 }
162
163 let provider = provider_factory.database_provider_rw()?;
164 let mut total_decoded_receipts = 0;
165 let mut total_receipts = 0;
166 let mut total_filtered_out_dup_txns = 0;
167 let mut highest_block_receipts = 0;
168
169 let highest_block_transactions = static_file_provider
170 .get_highest_static_file_block(StaticFileSegment::Transactions)
171 .expect("transaction static files must exist before importing receipts");
172
173 while let Some(file_client) =
174 reader.next_receipts_chunk::<ReceiptFileClient<OpGethReceiptFileCodec<OpReceipt>>>().await?
175 {
176 if highest_block_receipts == highest_block_transactions {
177 warn!(target: "reth::cli", highest_block_receipts, highest_block_transactions, "Ignoring all other blocks in the file since we have reached the desired height");
178 break
179 }
180
181 let ReceiptFileClient {
183 mut receipts,
184 mut first_block,
185 total_receipts: total_receipts_chunk,
186 ..
187 } = file_client;
188
189 total_decoded_receipts += total_receipts_chunk;
191
192 total_filtered_out_dup_txns += filter(first_block, &mut receipts);
193
194 info!(target: "reth::cli",
195 first_receipts_block=?first_block,
196 total_receipts_chunk,
197 "Importing receipt file chunk"
198 );
199
200 if first_block == 0 {
206 let genesis_receipts = receipts.remove(0);
208 debug_assert!(genesis_receipts.is_empty());
209 first_block = 1;
211 }
212 highest_block_receipts = first_block + receipts.len() as u64 - 1;
213
214 if highest_block_receipts > highest_block_transactions {
216 let excess = highest_block_receipts - highest_block_transactions;
217 highest_block_receipts -= excess;
218
219 receipts.truncate(receipts.len() - excess as usize);
221
222 warn!(target: "reth::cli", highest_block_receipts, "Too many decoded blocks, ignoring the last {excess}.");
223 }
224
225 total_receipts += receipts.iter().map(|v| v.len()).sum::<usize>();
227
228 let execution_outcome =
232 ExecutionOutcome::new(Default::default(), receipts, first_block, Default::default());
233
234 provider.write_state(
236 &execution_outcome,
237 OriginalValuesKnown::Yes,
238 StorageLocation::StaticFiles,
239 )?;
240 }
241
242 let total_imported_txns = static_file_provider
244 .count_entries::<tables::Transactions>()
245 .expect("transaction static files must exist before importing receipts");
246
247 if total_receipts != total_imported_txns {
248 eyre::bail!(
249 "Number of receipts ({total_receipts}) inconsistent with transactions {total_imported_txns}"
250 )
251 }
252
253 if highest_block_receipts != highest_block_transactions {
255 eyre::bail!(
256 "Receipt block height ({highest_block_receipts}) inconsistent with transactions' {highest_block_transactions}"
257 )
258 }
259
260 provider
262 .save_stage_checkpoint(StageId::Execution, StageCheckpoint::new(highest_block_receipts))?;
263
264 UnifiedStorageWriter::commit(provider)?;
265
266 Ok(ImportReceiptsResult { total_decoded_receipts, total_filtered_out_dup_txns })
267}
268
269#[derive(Debug)]
271pub struct ImportReceiptsResult {
272 pub total_decoded_receipts: usize,
274 pub total_filtered_out_dup_txns: usize,
276}
277
278#[cfg(test)]
279mod test {
280 use alloy_primitives::hex;
281 use reth_db_common::init::init_genesis;
282 use reth_optimism_chainspec::OP_MAINNET;
283 use reth_optimism_node::OpNode;
284 use reth_provider::test_utils::create_test_provider_factory_with_node_types;
285 use reth_stages::test_utils::TestStageDB;
286 use tempfile::tempfile;
287 use tokio::{
288 fs::File,
289 io::{AsyncSeekExt, AsyncWriteExt, SeekFrom},
290 };
291
292 use crate::receipt_file_codec::test::{
293 HACK_RECEIPT_ENCODED_BLOCK_1, HACK_RECEIPT_ENCODED_BLOCK_2, HACK_RECEIPT_ENCODED_BLOCK_3,
294 };
295
296 use super::*;
297
298 const EMPTY_RECEIPTS_GENESIS_BLOCK: &[u8] = &hex!("c0");
300
301 #[ignore]
302 #[tokio::test]
303 async fn filter_out_genesis_block_receipts() {
304 let mut f: File = tempfile().unwrap().into();
305 f.write_all(EMPTY_RECEIPTS_GENESIS_BLOCK).await.unwrap();
306 f.write_all(HACK_RECEIPT_ENCODED_BLOCK_1).await.unwrap();
307 f.write_all(HACK_RECEIPT_ENCODED_BLOCK_2).await.unwrap();
308 f.write_all(HACK_RECEIPT_ENCODED_BLOCK_3).await.unwrap();
309 f.flush().await.unwrap();
310 f.seek(SeekFrom::Start(0)).await.unwrap();
311
312 let reader =
313 ChunkedFileReader::from_file(f, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE).await.unwrap();
314
315 let db = TestStageDB::default();
316 init_genesis(&db.factory).unwrap();
317
318 let provider_factory =
320 create_test_provider_factory_with_node_types::<OpNode>(OP_MAINNET.clone());
321 let ImportReceiptsResult { total_decoded_receipts, total_filtered_out_dup_txns } =
322 import_receipts_from_reader(&provider_factory, reader, |_, _| 0).await.unwrap();
323
324 assert_eq!(total_decoded_receipts, 3);
325 assert_eq!(total_filtered_out_dup_txns, 0);
326 }
327}