1use std::{fmt, io};
2
3use futures::Future;
4use tokio::io::AsyncReadExt;
5use tokio_stream::StreamExt;
6use tokio_util::codec::{Decoder, FramedRead};
7use tracing::{trace, warn};
8
9use crate::{DecodedFileChunk, FileClientError};
10
11pub trait ReceiptDecoder: Decoder<Item = Option<ReceiptWithBlockNumber<Self::Receipt>>> {
13 type Receipt;
15}
16
17impl<T, R> ReceiptDecoder for T
18where
19 T: Decoder<Item = Option<ReceiptWithBlockNumber<R>>>,
20{
21 type Receipt = R;
22}
23
24#[derive(Debug)]
27pub struct ReceiptFileClient<D: ReceiptDecoder> {
28 pub receipts: Vec<Vec<D::Receipt>>,
30 pub first_block: u64,
32 pub total_receipts: usize,
34}
35
36pub trait FromReceiptReader {
38 type Error: From<io::Error>;
40
41 fn from_receipt_reader<B>(
43 reader: B,
44 num_bytes: u64,
45 prev_chunk_highest_block: Option<u64>,
46 ) -> impl Future<Output = Result<DecodedFileChunk<Self>, Self::Error>>
47 where
48 Self: Sized,
49 B: AsyncReadExt + Unpin;
50}
51
52impl<D> FromReceiptReader for ReceiptFileClient<D>
53where
54 D: ReceiptDecoder<Error = FileClientError> + fmt::Debug + Default,
55{
56 type Error = D::Error;
57
58 fn from_receipt_reader<B>(
61 reader: B,
62 num_bytes: u64,
63 prev_chunk_highest_block: Option<u64>,
64 ) -> impl Future<Output = Result<DecodedFileChunk<Self>, Self::Error>>
65 where
66 B: AsyncReadExt + Unpin,
67 {
68 let mut receipts = Vec::default();
69
70 let mut stream = FramedRead::with_capacity(reader, D::default(), num_bytes as usize);
72
73 trace!(target: "downloaders::file",
74 target_num_bytes=num_bytes,
75 capacity=stream.read_buffer().capacity(),
76 codec=?D::default(),
77 "init decode stream"
78 );
79
80 let mut remaining_bytes = vec![];
81
82 let mut log_interval = 0;
83 let mut log_interval_start_block = 0;
84
85 let mut block_number = 0;
86 let mut total_receipts = 0;
87 let mut receipts_for_block = vec![];
88 let mut first_block = None;
89
90 async move {
91 while let Some(receipt_res) = stream.next().await {
92 let receipt = match receipt_res {
93 Ok(receipt) => receipt,
94 Err(FileClientError::Rlp(err, bytes)) => {
95 trace!(target: "downloaders::file",
96 %err,
97 bytes_len=bytes.len(),
98 "partial receipt returned from decoding chunk"
99 );
100
101 remaining_bytes = bytes;
102
103 break
104 }
105 Err(err) => return Err(err),
106 };
107
108 match receipt {
109 Some(ReceiptWithBlockNumber { receipt, number }) => {
110 if block_number > number {
111 warn!(target: "downloaders::file", previous_block_number = block_number, "skipping receipt from a lower block: {number}");
112 continue
113 }
114
115 total_receipts += 1;
116
117 if first_block.is_none() {
118 first_block = Some(number);
119 block_number = number;
120 }
121
122 if block_number == number {
123 receipts_for_block.push(receipt);
124 } else {
125 receipts.push(receipts_for_block);
126
127 block_number = number;
129 receipts_for_block = vec![receipt];
130 }
131 }
132 None => {
133 match first_block {
134 Some(num) => {
135 receipts.push(receipts_for_block);
138 block_number = num + receipts.len() as u64;
140 }
141 None => {
142 if let Some(highest_block) = prev_chunk_highest_block {
144 block_number = highest_block + 1;
146 } else {
147 block_number = 0;
150 }
151 first_block = Some(block_number);
152 }
153 }
154
155 receipts_for_block = vec![];
156 }
157 }
158
159 if log_interval == 0 {
160 trace!(target: "downloaders::file",
161 block_number,
162 total_receipts,
163 "read first receipt"
164 );
165 log_interval_start_block = block_number;
166 } else if log_interval % 100_000 == 0 {
167 trace!(target: "downloaders::file",
168 blocks=?log_interval_start_block..=block_number,
169 total_receipts,
170 "read receipts from file"
171 );
172 log_interval_start_block = block_number + 1;
173 }
174 log_interval += 1;
175 }
176
177 trace!(target: "downloaders::file",
178 blocks=?log_interval_start_block..=block_number,
179 total_receipts,
180 "read receipts from file"
181 );
182
183 receipts.push(receipts_for_block);
185
186 trace!(target: "downloaders::file",
187 blocks = receipts.len(),
188 total_receipts,
189 "Initialized receipt file client"
190 );
191
192 Ok(DecodedFileChunk {
193 file_client: Self {
194 receipts,
195 first_block: first_block.unwrap_or_default(),
196 total_receipts,
197 },
198 remaining_bytes,
199 highest_block: Some(block_number),
200 })
201 }
202 }
203}
204
205#[derive(Debug, PartialEq, Eq)]
207pub struct ReceiptWithBlockNumber<R> {
208 pub receipt: R,
210 pub number: u64,
212}
213
214#[cfg(test)]
215mod test {
216 use alloy_primitives::{
217 address, b256,
218 bytes::{Buf, BytesMut},
219 hex, Bytes, Log, LogData,
220 };
221 use alloy_rlp::{Decodable, RlpDecodable};
222 use reth_ethereum_primitives::{Receipt, TxType};
223 use reth_tracing::init_test_tracing;
224 use tokio_util::codec::Decoder;
225
226 use super::{FromReceiptReader, ReceiptFileClient, ReceiptWithBlockNumber};
227 use crate::{DecodedFileChunk, FileClientError};
228
229 #[derive(Debug, PartialEq, Eq, RlpDecodable)]
230 struct MockReceipt {
231 tx_type: u8,
232 status: u64,
233 cumulative_gas_used: u64,
234 logs: Vec<Log>,
235 block_number: u64,
236 }
237
238 #[derive(Debug, PartialEq, Eq, RlpDecodable)]
239 #[rlp(trailing)]
240 struct MockReceiptContainer(Option<MockReceipt>);
241
242 impl TryFrom<MockReceipt> for ReceiptWithBlockNumber<Receipt> {
243 type Error = FileClientError;
244 fn try_from(exported_receipt: MockReceipt) -> Result<Self, Self::Error> {
245 let MockReceipt { tx_type, status, cumulative_gas_used, logs, block_number: number } =
246 exported_receipt;
247
248 let receipt = Receipt {
249 tx_type: TxType::try_from(tx_type.to_be_bytes()[0])
250 .map_err(|err| FileClientError::Rlp(err.into(), vec![tx_type]))?,
251 success: status != 0,
252 cumulative_gas_used,
253 logs,
254 };
255
256 Ok(Self { receipt, number })
257 }
258 }
259
260 #[derive(Debug, Default)]
261 struct MockReceiptFileCodec;
262
263 impl Decoder for MockReceiptFileCodec {
264 type Item = Option<ReceiptWithBlockNumber<Receipt>>;
265 type Error = FileClientError;
266
267 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
268 if src.is_empty() {
269 return Ok(None)
270 }
271
272 let buf_slice = &mut src.as_ref();
273 let receipt = MockReceiptContainer::decode(buf_slice)
274 .map_err(|err| Self::Error::Rlp(err, src.to_vec()))?
275 .0;
276 src.advance(src.len() - buf_slice.len());
277
278 Ok(Some(receipt.map(|receipt| receipt.try_into()).transpose()?))
279 }
280 }
281
282 const MOCK_RECEIPT_BLOCK_NO_TRANSACTIONS: &[u8] = &hex!("c0");
284
285 const MOCK_RECEIPT_ENCODED_BLOCK_1: &[u8] = &hex!(
286 "f901a4f901a1800183031843f90197f89b948ce8c13d816fe6daf12d6fd9e4952e1fc88850aef863a00109fc6f55cf40689f02fbaad7af7fe7bbac8a3d2186600afc7d3e10cac6027ba00000000000000000000000000000000000000000000000000000000000014218a000000000000000000000000070b17c0fe982ab4a7ac17a4c25485643151a1f2da000000000000000000000000000000000000000000000000000000000618d8837f89c948ce8c13d816fe6daf12d6fd9e4952e1fc88850aef884a092e98423f8adac6e64d0608e519fd1cefb861498385c6dee70d58fc926ddc68ba000000000000000000000000000000000000000000000000000000000d0e3ebf0a00000000000000000000000000000000000000000000000000000000000014218a000000000000000000000000070b17c0fe982ab4a7ac17a4c25485643151a1f2d80f85a948ce8c13d816fe6daf12d6fd9e4952e1fc88850aef842a0fe25c73e3b9089fac37d55c4c7efcba6f04af04cebd2fc4d6d7dbb07e1e5234fa000000000000000000000000000000000000000000000007edc6ca0bb683480008001"
287 );
288
289 const MOCK_RECEIPT_ENCODED_BLOCK_2: &[u8] = &hex!(
290 "f90106f9010380018301c60df8faf89c948ce8c13d816fe6daf12d6fd9e4952e1fc88850aef884a092e98423f8adac6e64d0608e519fd1cefb861498385c6dee70d58fc926ddc68da000000000000000000000000000000000000000000000000000000000d0ea0e40a00000000000000000000000000000000000000000000000000000000000014218a0000000000000000000000000e5e7492282fd1e3bfac337a0beccd29b15b7b24080f85a948ce8c13d816fe6daf12d6fd9e4952e1fc88850aef842a0fe25c73e3b9089fac37d55c4c7efcba6f04af04cebd2fc4d6d7dbb07e1e5234ea000000000000000000000000000000000000000000000007eda7867e0c7d480008002"
291 );
292
293 const MOCK_RECEIPT_ENCODED_BLOCK_3: &[u8] = &hex!(
294 "f90106f9010380018301c60df8faf89c948ce8c13d816fe6daf12d6fd9e4952e1fc88850aef884a092e98423f8adac6e64d0608e519fd1cefb861498385c6dee70d58fc926ddc68da000000000000000000000000000000000000000000000000000000000d101e54ba00000000000000000000000000000000000000000000000000000000000014218a0000000000000000000000000fa011d8d6c26f13abe2cefed38226e401b2b8a9980f85a948ce8c13d816fe6daf12d6fd9e4952e1fc88850aef842a0fe25c73e3b9089fac37d55c4c7efcba6f04af04cebd2fc4d6d7dbb07e1e5234ea000000000000000000000000000000000000000000000007ed8842f06277480008003"
295 );
296
297 fn mock_receipt_1() -> MockReceipt {
298 let receipt = receipt_block_1();
299 MockReceipt {
300 tx_type: receipt.receipt.tx_type as u8,
301 status: receipt.receipt.success as u64,
302
303 cumulative_gas_used: receipt.receipt.cumulative_gas_used,
304 logs: receipt.receipt.logs,
305 block_number: 1,
306 }
307 }
308
309 fn mock_receipt_2() -> MockReceipt {
310 let receipt = receipt_block_2();
311 MockReceipt {
312 tx_type: receipt.receipt.tx_type as u8,
313 status: receipt.receipt.success as u64,
314
315 cumulative_gas_used: receipt.receipt.cumulative_gas_used,
316 logs: receipt.receipt.logs,
317 block_number: 2,
318 }
319 }
320
321 fn mock_receipt_3() -> MockReceipt {
322 let receipt = receipt_block_3();
323 MockReceipt {
324 tx_type: receipt.receipt.tx_type as u8,
325 status: receipt.receipt.success as u64,
326
327 cumulative_gas_used: receipt.receipt.cumulative_gas_used,
328 logs: receipt.receipt.logs,
329 block_number: 3,
330 }
331 }
332
333 fn receipt_block_1() -> ReceiptWithBlockNumber<Receipt> {
334 let log_1 = Log {
335 address: address!("0x8ce8c13d816fe6daf12d6fd9e4952e1fc88850ae"),
336 data: LogData::new(
337 vec![
338 b256!("0x0109fc6f55cf40689f02fbaad7af7fe7bbac8a3d2186600afc7d3e10cac6027b"),
339 b256!("0x0000000000000000000000000000000000000000000000000000000000014218"),
340 b256!("0x00000000000000000000000070b17c0fe982ab4a7ac17a4c25485643151a1f2d"),
341 ],
342 Bytes::from(hex!(
343 "00000000000000000000000000000000000000000000000000000000618d8837"
344 )),
345 )
346 .unwrap(),
347 };
348
349 let log_2 = Log {
350 address: address!("0x8ce8c13d816fe6daf12d6fd9e4952e1fc88850ae"),
351 data: LogData::new(
352 vec![
353 b256!("0x92e98423f8adac6e64d0608e519fd1cefb861498385c6dee70d58fc926ddc68b"),
354 b256!("0x00000000000000000000000000000000000000000000000000000000d0e3ebf0"),
355 b256!("0x0000000000000000000000000000000000000000000000000000000000014218"),
356 b256!("0x00000000000000000000000070b17c0fe982ab4a7ac17a4c25485643151a1f2d"),
357 ],
358 Bytes::default(),
359 )
360 .unwrap(),
361 };
362
363 let log_3 = Log {
364 address: address!("0x8ce8c13d816fe6daf12d6fd9e4952e1fc88850ae"),
365 data: LogData::new(
366 vec![
367 b256!("0xfe25c73e3b9089fac37d55c4c7efcba6f04af04cebd2fc4d6d7dbb07e1e5234f"),
368 b256!("0x00000000000000000000000000000000000000000000007edc6ca0bb68348000"),
369 ],
370 Bytes::default(),
371 )
372 .unwrap(),
373 };
374
375 let mut receipt = Receipt {
377 tx_type: TxType::Legacy,
378 success: true,
379 cumulative_gas_used: 202819,
380 logs: vec![],
381 };
382 receipt.logs = vec![log_1, log_2, log_3];
383
384 ReceiptWithBlockNumber { receipt, number: 1 }
385 }
386
387 fn receipt_block_2() -> ReceiptWithBlockNumber<Receipt> {
388 let log_1 = Log {
389 address: address!("0x8ce8c13d816fe6daf12d6fd9e4952e1fc88850ae"),
390 data: LogData::new(
391 vec![
392 b256!("0x92e98423f8adac6e64d0608e519fd1cefb861498385c6dee70d58fc926ddc68d"),
393 b256!("0x00000000000000000000000000000000000000000000000000000000d0ea0e40"),
394 b256!("0x0000000000000000000000000000000000000000000000000000000000014218"),
395 b256!("0x000000000000000000000000e5e7492282fd1e3bfac337a0beccd29b15b7b240"),
396 ],
397 Bytes::default(),
398 )
399 .unwrap(),
400 };
401
402 let log_2 = Log {
403 address: address!("0x8ce8c13d816fe6daf12d6fd9e4952e1fc88850ae"),
404 data: LogData::new(
405 vec![
406 b256!("0xfe25c73e3b9089fac37d55c4c7efcba6f04af04cebd2fc4d6d7dbb07e1e5234e"),
407 b256!("0x00000000000000000000000000000000000000000000007eda7867e0c7d48000"),
408 ],
409 Bytes::default(),
410 )
411 .unwrap(),
412 };
413
414 let mut receipt = Receipt {
415 tx_type: TxType::Legacy,
416 success: true,
417 cumulative_gas_used: 116237,
418 logs: vec![],
419 };
420 receipt.logs = vec![log_1, log_2];
421
422 ReceiptWithBlockNumber { receipt, number: 2 }
423 }
424
425 fn receipt_block_3() -> ReceiptWithBlockNumber<Receipt> {
426 let log_1 = Log {
427 address: address!("0x8ce8c13d816fe6daf12d6fd9e4952e1fc88850ae"),
428 data: LogData::new(
429 vec![
430 b256!("0x92e98423f8adac6e64d0608e519fd1cefb861498385c6dee70d58fc926ddc68d"),
431 b256!("0x00000000000000000000000000000000000000000000000000000000d101e54b"),
432 b256!("0x0000000000000000000000000000000000000000000000000000000000014218"),
433 b256!("0x000000000000000000000000fa011d8d6c26f13abe2cefed38226e401b2b8a99"),
434 ],
435 Bytes::default(),
436 )
437 .unwrap(),
438 };
439
440 let log_2 = Log {
441 address: address!("0x8ce8c13d816fe6daf12d6fd9e4952e1fc88850ae"),
442 data: LogData::new(
443 vec![
444 b256!("0xfe25c73e3b9089fac37d55c4c7efcba6f04af04cebd2fc4d6d7dbb07e1e5234e"),
445 b256!("0x00000000000000000000000000000000000000000000007ed8842f0627748000"),
446 ],
447 Bytes::default(),
448 )
449 .unwrap(),
450 };
451
452 let mut receipt = Receipt {
453 tx_type: TxType::Legacy,
454 success: true,
455 cumulative_gas_used: 116237,
456 ..Default::default()
457 };
458 receipt.logs = vec![log_1, log_2];
459
460 ReceiptWithBlockNumber { receipt, number: 3 }
461 }
462
463 #[test]
464 fn decode_mock_receipt() {
465 let receipt1 = mock_receipt_1();
466 let decoded1 = MockReceiptContainer::decode(&mut &MOCK_RECEIPT_ENCODED_BLOCK_1[..])
467 .unwrap()
468 .0
469 .unwrap();
470 assert_eq!(receipt1, decoded1);
471
472 let receipt2 = mock_receipt_2();
473 let decoded2 = MockReceiptContainer::decode(&mut &MOCK_RECEIPT_ENCODED_BLOCK_2[..])
474 .unwrap()
475 .0
476 .unwrap();
477 assert_eq!(receipt2, decoded2);
478
479 let receipt3 = mock_receipt_3();
480 let decoded3 = MockReceiptContainer::decode(&mut &MOCK_RECEIPT_ENCODED_BLOCK_3[..])
481 .unwrap()
482 .0
483 .unwrap();
484 assert_eq!(receipt3, decoded3);
485 }
486
487 #[test]
488 fn receipts_codec() {
489 let mut receipt_1_to_3 = MOCK_RECEIPT_ENCODED_BLOCK_1.to_vec();
492 receipt_1_to_3.extend_from_slice(MOCK_RECEIPT_ENCODED_BLOCK_2);
493 receipt_1_to_3.extend_from_slice(MOCK_RECEIPT_ENCODED_BLOCK_3);
494
495 let encoded = &mut BytesMut::from(&receipt_1_to_3[..]);
496
497 let mut codec = MockReceiptFileCodec;
498
499 let first_decoded_receipt = codec.decode(encoded).unwrap().unwrap().unwrap();
502
503 assert_eq!(receipt_block_1(), first_decoded_receipt);
504
505 let second_decoded_receipt = codec.decode(encoded).unwrap().unwrap().unwrap();
506
507 assert_eq!(receipt_block_2(), second_decoded_receipt);
508
509 let third_decoded_receipt = codec.decode(encoded).unwrap().unwrap().unwrap();
510
511 assert_eq!(receipt_block_3(), third_decoded_receipt);
512 }
513
514 #[tokio::test]
515 async fn receipt_file_client_ovm_codec() {
516 init_test_tracing();
517
518 let mut encoded_receipts = MOCK_RECEIPT_BLOCK_NO_TRANSACTIONS.to_vec();
520 encoded_receipts.extend_from_slice(MOCK_RECEIPT_ENCODED_BLOCK_1);
522 encoded_receipts.extend_from_slice(MOCK_RECEIPT_ENCODED_BLOCK_2);
523 encoded_receipts.extend_from_slice(MOCK_RECEIPT_BLOCK_NO_TRANSACTIONS);
525
526 let encoded_byte_len = encoded_receipts.len() as u64;
527 let reader = &mut &encoded_receipts[..];
528
529 let DecodedFileChunk {
530 file_client: ReceiptFileClient { receipts, first_block, total_receipts, .. },
531 ..
532 } = ReceiptFileClient::<MockReceiptFileCodec>::from_receipt_reader(
533 reader,
534 encoded_byte_len,
535 None,
536 )
537 .await
538 .unwrap();
539
540 assert_eq!(2, total_receipts);
542 assert_eq!(0, first_block);
543 assert!(receipts[0].is_empty());
544 assert_eq!(receipt_block_1().receipt, receipts[1][0].clone());
545 assert_eq!(receipt_block_2().receipt, receipts[2][0].clone());
546 assert!(receipts[3].is_empty());
547 }
548
549 #[tokio::test]
550 async fn no_receipts_middle_block() {
551 init_test_tracing();
552
553 let mut encoded_receipts = MOCK_RECEIPT_BLOCK_NO_TRANSACTIONS.to_vec();
555 encoded_receipts.extend_from_slice(MOCK_RECEIPT_ENCODED_BLOCK_1);
557 encoded_receipts.extend_from_slice(MOCK_RECEIPT_BLOCK_NO_TRANSACTIONS);
559 encoded_receipts.extend_from_slice(MOCK_RECEIPT_ENCODED_BLOCK_3);
561
562 let encoded_byte_len = encoded_receipts.len() as u64;
563 let reader = &mut &encoded_receipts[..];
564
565 let DecodedFileChunk {
566 file_client: ReceiptFileClient { receipts, first_block, total_receipts, .. },
567 ..
568 } = ReceiptFileClient::<MockReceiptFileCodec>::from_receipt_reader(
569 reader,
570 encoded_byte_len,
571 None,
572 )
573 .await
574 .unwrap();
575
576 assert_eq!(2, total_receipts);
578 assert_eq!(0, first_block);
579 assert!(receipts[0].is_empty());
580 assert_eq!(receipt_block_1().receipt, receipts[1][0].clone());
581 assert!(receipts[2].is_empty());
582 assert_eq!(receipt_block_3().receipt, receipts[3][0].clone());
583 }
584
585 #[tokio::test]
586 async fn two_receipts_same_block() {
587 init_test_tracing();
588
589 let mut encoded_receipts = MOCK_RECEIPT_BLOCK_NO_TRANSACTIONS.to_vec();
591 encoded_receipts.extend_from_slice(MOCK_RECEIPT_ENCODED_BLOCK_1);
593 encoded_receipts.extend_from_slice(MOCK_RECEIPT_ENCODED_BLOCK_2);
595 encoded_receipts.extend_from_slice(MOCK_RECEIPT_ENCODED_BLOCK_2);
596 encoded_receipts.extend_from_slice(MOCK_RECEIPT_ENCODED_BLOCK_3);
598
599 let encoded_byte_len = encoded_receipts.len() as u64;
600 let reader = &mut &encoded_receipts[..];
601
602 let DecodedFileChunk {
603 file_client: ReceiptFileClient { receipts, first_block, total_receipts, .. },
604 ..
605 } = ReceiptFileClient::<MockReceiptFileCodec>::from_receipt_reader(
606 reader,
607 encoded_byte_len,
608 None,
609 )
610 .await
611 .unwrap();
612
613 assert_eq!(4, total_receipts);
615 assert_eq!(0, first_block);
616 assert!(receipts[0].is_empty());
617 assert_eq!(receipt_block_1().receipt, receipts[1][0].clone());
618 assert_eq!(receipt_block_2().receipt, receipts[2][0].clone());
619 assert_eq!(receipt_block_2().receipt, receipts[2][1].clone());
620 assert_eq!(receipt_block_3().receipt, receipts[3][0].clone());
621 }
622}