reth_optimism_cli/commands/
import_receipts.rs
1use std::path::{Path, PathBuf};
5
6use clap::Parser;
7use reth_cli::chainspec::ChainSpecParser;
8use reth_cli_commands::common::{AccessRights, CliNodeTypes, Environment, EnvironmentArgs};
9use reth_db_api::tables;
10use reth_downloaders::{
11 file_client::{ChunkedFileReader, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE},
12 receipt_file_client::ReceiptFileClient,
13};
14use reth_execution_types::ExecutionOutcome;
15use reth_node_builder::ReceiptTy;
16use reth_node_core::version::SHORT_VERSION;
17use reth_optimism_chainspec::OpChainSpec;
18use reth_optimism_primitives::{bedrock::is_dup_tx, OpPrimitives, OpReceipt};
19use reth_primitives::NodePrimitives;
20use reth_provider::{
21 providers::ProviderNodeTypes, writer::UnifiedStorageWriter, DatabaseProviderFactory,
22 OriginalValuesKnown, ProviderFactory, StageCheckpointReader, StageCheckpointWriter,
23 StateWriter, StaticFileProviderFactory, StatsReader, StorageLocation,
24};
25use reth_stages::{StageCheckpoint, StageId};
26use reth_static_file_types::StaticFileSegment;
27use tracing::{debug, info, trace, warn};
28
29use crate::receipt_file_codec::OpGethReceiptFileCodec;
30
31#[derive(Debug, Parser)]
33pub struct ImportReceiptsOpCommand<C: ChainSpecParser> {
34 #[command(flatten)]
35 env: EnvironmentArgs<C>,
36
37 #[arg(long, value_name = "CHUNK_LEN", verbatim_doc_comment)]
39 chunk_len: Option<u64>,
40
41 #[arg(value_name = "IMPORT_PATH", verbatim_doc_comment)]
46 path: PathBuf,
47}
48
49impl<C: ChainSpecParser<ChainSpec = OpChainSpec>> ImportReceiptsOpCommand<C> {
50 pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec, Primitives = OpPrimitives>>(
52 self,
53 ) -> eyre::Result<()> {
54 info!(target: "reth::cli", "reth {} starting", SHORT_VERSION);
55
56 debug!(target: "reth::cli",
57 chunk_byte_len=self.chunk_len.unwrap_or(DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE),
58 "Chunking receipts import"
59 );
60
61 let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RW)?;
62
63 import_receipts_from_file(
64 provider_factory,
65 self.path,
66 self.chunk_len,
67 |first_block, receipts| {
68 let mut total_filtered_out_dup_txns = 0;
69 for (index, receipts_for_block) in receipts.iter_mut().enumerate() {
70 if is_dup_tx(first_block + index as u64) {
71 receipts_for_block.clear();
72 total_filtered_out_dup_txns += 1;
73 }
74 }
75
76 total_filtered_out_dup_txns
77 },
78 )
79 .await
80 }
81}
82
83pub async fn import_receipts_from_file<N, P, F>(
85 provider_factory: ProviderFactory<N>,
86 path: P,
87 chunk_len: Option<u64>,
88 filter: F,
89) -> eyre::Result<()>
90where
91 N: ProviderNodeTypes<ChainSpec = OpChainSpec, Primitives: NodePrimitives<Receipt = OpReceipt>>,
92 P: AsRef<Path>,
93 F: FnMut(u64, &mut Vec<Vec<OpReceipt>>) -> usize,
94{
95 for stage in StageId::ALL {
96 let checkpoint = provider_factory.database_provider_ro()?.get_stage_checkpoint(stage)?;
97 trace!(target: "reth::cli",
98 ?stage,
99 ?checkpoint,
100 "Read stage checkpoints from db"
101 );
102 }
103
104 let reader = ChunkedFileReader::new(&path, chunk_len).await?;
106
107 let _ = import_receipts_from_reader(&provider_factory, reader, filter).await?;
109
110 info!(target: "reth::cli",
111 "Receipt file imported"
112 );
113
114 Ok(())
115}
116
117pub async fn import_receipts_from_reader<N, F>(
124 provider_factory: &ProviderFactory<N>,
125 mut reader: ChunkedFileReader,
126 mut filter: F,
127) -> eyre::Result<ImportReceiptsResult>
128where
129 N: ProviderNodeTypes<Primitives: NodePrimitives<Receipt = OpReceipt>>,
130 F: FnMut(u64, &mut Vec<Vec<ReceiptTy<N>>>) -> usize,
131{
132 let static_file_provider = provider_factory.static_file_provider();
133
134 if let Some(num_receipts) =
136 static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts)
137 {
138 if num_receipts > 0 {
139 eyre::bail!("Expected no receipts in storage, but found {num_receipts}.");
140 }
141 }
142 match static_file_provider.get_highest_static_file_block(StaticFileSegment::Receipts) {
143 Some(receipts_block) => {
144 if receipts_block > 0 {
145 eyre::bail!("Expected highest receipt block to be 0, but found {receipts_block}.");
146 }
147 }
148 None => {
149 eyre::bail!("Receipts was not initialized. Please import blocks and transactions before calling this command.");
150 }
151 }
152
153 let provider = provider_factory.database_provider_rw()?;
154 let mut total_decoded_receipts = 0;
155 let mut total_receipts = 0;
156 let mut total_filtered_out_dup_txns = 0;
157 let mut highest_block_receipts = 0;
158
159 let highest_block_transactions = static_file_provider
160 .get_highest_static_file_block(StaticFileSegment::Transactions)
161 .expect("transaction static files must exist before importing receipts");
162
163 while let Some(file_client) =
164 reader.next_receipts_chunk::<ReceiptFileClient<OpGethReceiptFileCodec<OpReceipt>>>().await?
165 {
166 if highest_block_receipts == highest_block_transactions {
167 warn!(target: "reth::cli", highest_block_receipts, highest_block_transactions, "Ignoring all other blocks in the file since we have reached the desired height");
168 break
169 }
170
171 let ReceiptFileClient {
173 mut receipts,
174 mut first_block,
175 total_receipts: total_receipts_chunk,
176 ..
177 } = file_client;
178
179 total_decoded_receipts += total_receipts_chunk;
181
182 total_filtered_out_dup_txns += filter(first_block, &mut receipts);
183
184 info!(target: "reth::cli",
185 first_receipts_block=?first_block,
186 total_receipts_chunk,
187 "Importing receipt file chunk"
188 );
189
190 if first_block == 0 {
196 let genesis_receipts = receipts.remove(0);
198 debug_assert!(genesis_receipts.is_empty());
199 first_block = 1;
201 }
202 highest_block_receipts = first_block + receipts.len() as u64 - 1;
203
204 if highest_block_receipts > highest_block_transactions {
206 let excess = highest_block_receipts - highest_block_transactions;
207 highest_block_receipts -= excess;
208
209 receipts.truncate(receipts.len() - excess as usize);
211
212 warn!(target: "reth::cli", highest_block_receipts, "Too many decoded blocks, ignoring the last {excess}.");
213 }
214
215 total_receipts += receipts.iter().map(|v| v.len()).sum::<usize>();
217
218 let execution_outcome =
222 ExecutionOutcome::new(Default::default(), receipts, first_block, Default::default());
223
224 provider.write_state(
226 &execution_outcome,
227 OriginalValuesKnown::Yes,
228 StorageLocation::StaticFiles,
229 )?;
230 }
231
232 let total_imported_txns = static_file_provider
234 .count_entries::<tables::Transactions>()
235 .expect("transaction static files must exist before importing receipts");
236
237 if total_receipts != total_imported_txns {
238 eyre::bail!("Number of receipts ({total_receipts}) inconsistent with transactions {total_imported_txns}")
239 }
240
241 if highest_block_receipts != highest_block_transactions {
243 eyre::bail!("Receipt block height ({highest_block_receipts}) inconsistent with transactions' {highest_block_transactions}")
244 }
245
246 provider
248 .save_stage_checkpoint(StageId::Execution, StageCheckpoint::new(highest_block_receipts))?;
249
250 UnifiedStorageWriter::commit(provider)?;
251
252 Ok(ImportReceiptsResult { total_decoded_receipts, total_filtered_out_dup_txns })
253}
254
255#[derive(Debug)]
257pub struct ImportReceiptsResult {
258 pub total_decoded_receipts: usize,
260 pub total_filtered_out_dup_txns: usize,
262}
263
264#[cfg(test)]
265mod test {
266 use alloy_primitives::hex;
267 use reth_db_common::init::init_genesis;
268 use reth_optimism_chainspec::OP_MAINNET;
269 use reth_optimism_node::OpNode;
270 use reth_provider::test_utils::create_test_provider_factory_with_node_types;
271 use reth_stages::test_utils::TestStageDB;
272 use tempfile::tempfile;
273 use tokio::{
274 fs::File,
275 io::{AsyncSeekExt, AsyncWriteExt, SeekFrom},
276 };
277
278 use crate::receipt_file_codec::test::{
279 HACK_RECEIPT_ENCODED_BLOCK_1, HACK_RECEIPT_ENCODED_BLOCK_2, HACK_RECEIPT_ENCODED_BLOCK_3,
280 };
281
282 use super::*;
283
284 const EMPTY_RECEIPTS_GENESIS_BLOCK: &[u8] = &hex!("c0");
286
287 #[ignore]
288 #[tokio::test]
289 async fn filter_out_genesis_block_receipts() {
290 let mut f: File = tempfile().unwrap().into();
291 f.write_all(EMPTY_RECEIPTS_GENESIS_BLOCK).await.unwrap();
292 f.write_all(HACK_RECEIPT_ENCODED_BLOCK_1).await.unwrap();
293 f.write_all(HACK_RECEIPT_ENCODED_BLOCK_2).await.unwrap();
294 f.write_all(HACK_RECEIPT_ENCODED_BLOCK_3).await.unwrap();
295 f.flush().await.unwrap();
296 f.seek(SeekFrom::Start(0)).await.unwrap();
297
298 let reader =
299 ChunkedFileReader::from_file(f, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE).await.unwrap();
300
301 let db = TestStageDB::default();
302 init_genesis(&db.factory).unwrap();
303
304 let provider_factory =
306 create_test_provider_factory_with_node_types::<OpNode>(OP_MAINNET.clone());
307 let ImportReceiptsResult { total_decoded_receipts, total_filtered_out_dup_txns } =
308 import_receipts_from_reader(&provider_factory, reader, |_, _| 0).await.unwrap();
309
310 assert_eq!(total_decoded_receipts, 3);
311 assert_eq!(total_filtered_out_dup_txns, 0);
312 }
313}