1use std::{fmt, io};
2
3use futures::Future;
4use reth_primitives::Receipt;
5use tokio::io::AsyncReadExt;
6use tokio_stream::StreamExt;
7use tokio_util::codec::{Decoder, FramedRead};
8use tracing::{trace, warn};
9
10use crate::{DecodedFileChunk, FileClientError};
11
12pub trait ReceiptDecoder: Decoder<Item = Option<ReceiptWithBlockNumber<Self::Receipt>>> {
14 type Receipt;
16}
17
18impl<T, R> ReceiptDecoder for T
19where
20 T: Decoder<Item = Option<ReceiptWithBlockNumber<R>>>,
21{
22 type Receipt = R;
23}
24
25#[derive(Debug)]
28pub struct ReceiptFileClient<D: ReceiptDecoder> {
29 pub receipts: Vec<Vec<D::Receipt>>,
31 pub first_block: u64,
33 pub total_receipts: usize,
35}
36
37pub trait FromReceiptReader {
39 type Error: From<io::Error>;
41
42 fn from_receipt_reader<B>(
44 reader: B,
45 num_bytes: u64,
46 prev_chunk_highest_block: Option<u64>,
47 ) -> impl Future<Output = Result<DecodedFileChunk<Self>, Self::Error>>
48 where
49 Self: Sized,
50 B: AsyncReadExt + Unpin;
51}
52
53impl<D> FromReceiptReader for ReceiptFileClient<D>
54where
55 D: ReceiptDecoder<Error = FileClientError> + fmt::Debug + Default,
56{
57 type Error = D::Error;
58
59 fn from_receipt_reader<B>(
62 reader: B,
63 num_bytes: u64,
64 prev_chunk_highest_block: Option<u64>,
65 ) -> impl Future<Output = Result<DecodedFileChunk<Self>, Self::Error>>
66 where
67 B: AsyncReadExt + Unpin,
68 {
69 let mut receipts = Vec::default();
70
71 let mut stream = FramedRead::with_capacity(reader, D::default(), num_bytes as usize);
73
74 trace!(target: "downloaders::file",
75 target_num_bytes=num_bytes,
76 capacity=stream.read_buffer().capacity(),
77 codec=?D::default(),
78 "init decode stream"
79 );
80
81 let mut remaining_bytes = vec![];
82
83 let mut log_interval = 0;
84 let mut log_interval_start_block = 0;
85
86 let mut block_number = 0;
87 let mut total_receipts = 0;
88 let mut receipts_for_block = vec![];
89 let mut first_block = None;
90
91 async move {
92 while let Some(receipt_res) = stream.next().await {
93 let receipt = match receipt_res {
94 Ok(receipt) => receipt,
95 Err(FileClientError::Rlp(err, bytes)) => {
96 trace!(target: "downloaders::file",
97 %err,
98 bytes_len=bytes.len(),
99 "partial receipt returned from decoding chunk"
100 );
101
102 remaining_bytes = bytes;
103
104 break
105 }
106 Err(err) => return Err(err),
107 };
108
109 match receipt {
110 Some(ReceiptWithBlockNumber { receipt, number }) => {
111 if block_number > number {
112 warn!(target: "downloaders::file", previous_block_number = block_number, "skipping receipt from a lower block: {number}");
113 continue
114 }
115
116 total_receipts += 1;
117
118 if first_block.is_none() {
119 first_block = Some(number);
120 block_number = number;
121 }
122
123 if block_number == number {
124 receipts_for_block.push(receipt);
125 } else {
126 receipts.push(receipts_for_block);
127
128 block_number = number;
130 receipts_for_block = vec![receipt];
131 }
132 }
133 None => {
134 match first_block {
135 Some(num) => {
136 receipts.push(receipts_for_block);
139 block_number = num + receipts.len() as u64;
141 }
142 None => {
143 if let Some(highest_block) = prev_chunk_highest_block {
145 block_number = highest_block + 1;
147 } else {
148 block_number = 0;
151 }
152 first_block = Some(block_number);
153 }
154 }
155
156 receipts_for_block = vec![];
157 }
158 }
159
160 if log_interval == 0 {
161 trace!(target: "downloaders::file",
162 block_number,
163 total_receipts,
164 "read first receipt"
165 );
166 log_interval_start_block = block_number;
167 } else if log_interval % 100_000 == 0 {
168 trace!(target: "downloaders::file",
169 blocks=?log_interval_start_block..=block_number,
170 total_receipts,
171 "read receipts from file"
172 );
173 log_interval_start_block = block_number + 1;
174 }
175 log_interval += 1;
176 }
177
178 trace!(target: "downloaders::file",
179 blocks=?log_interval_start_block..=block_number,
180 total_receipts,
181 "read receipts from file"
182 );
183
184 receipts.push(receipts_for_block);
186
187 trace!(target: "downloaders::file",
188 blocks = receipts.len(),
189 total_receipts,
190 "Initialized receipt file client"
191 );
192
193 Ok(DecodedFileChunk {
194 file_client: Self {
195 receipts,
196 first_block: first_block.unwrap_or_default(),
197 total_receipts,
198 },
199 remaining_bytes,
200 highest_block: Some(block_number),
201 })
202 }
203 }
204}
205
206#[derive(Debug, PartialEq, Eq)]
208pub struct ReceiptWithBlockNumber<R = Receipt> {
209 pub receipt: R,
211 pub number: u64,
213}
214
215#[cfg(test)]
216mod test {
217 use alloy_primitives::{
218 address, b256,
219 bytes::{Buf, BytesMut},
220 hex, Bytes, Log, LogData,
221 };
222 use alloy_rlp::{Decodable, RlpDecodable};
223 use reth_primitives::{Receipt, TxType};
224 use reth_tracing::init_test_tracing;
225 use tokio_util::codec::Decoder;
226
227 use super::{FromReceiptReader, ReceiptFileClient, ReceiptWithBlockNumber};
228 use crate::{DecodedFileChunk, FileClientError};
229
230 #[derive(Debug, PartialEq, Eq, RlpDecodable)]
231 struct MockReceipt {
232 tx_type: u8,
233 status: u64,
234 cumulative_gas_used: u64,
235 logs: Vec<Log>,
236 block_number: u64,
237 }
238
239 #[derive(Debug, PartialEq, Eq, RlpDecodable)]
240 #[rlp(trailing)]
241 struct MockReceiptContainer(Option<MockReceipt>);
242
243 impl TryFrom<MockReceipt> for ReceiptWithBlockNumber {
244 type Error = FileClientError;
245 fn try_from(exported_receipt: MockReceipt) -> Result<Self, Self::Error> {
246 let MockReceipt { tx_type, status, cumulative_gas_used, logs, block_number: number } =
247 exported_receipt;
248
249 let receipt = Receipt {
250 tx_type: TxType::try_from(tx_type.to_be_bytes()[0])
251 .map_err(|err| FileClientError::Rlp(err.into(), vec![tx_type]))?,
252 success: status != 0,
253 cumulative_gas_used,
254 logs,
255 };
256
257 Ok(Self { receipt, number })
258 }
259 }
260
261 #[derive(Debug, Default)]
262 struct MockReceiptFileCodec;
263
264 impl Decoder for MockReceiptFileCodec {
265 type Item = Option<ReceiptWithBlockNumber>;
266 type Error = FileClientError;
267
268 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
269 if src.is_empty() {
270 return Ok(None)
271 }
272
273 let buf_slice = &mut src.as_ref();
274 let receipt = MockReceiptContainer::decode(buf_slice)
275 .map_err(|err| Self::Error::Rlp(err, src.to_vec()))?
276 .0;
277 src.advance(src.len() - buf_slice.len());
278
279 Ok(Some(receipt.map(|receipt| receipt.try_into()).transpose()?))
280 }
281 }
282
283 const MOCK_RECEIPT_BLOCK_NO_TRANSACTIONS: &[u8] = &hex!("c0");
285
286 const MOCK_RECEIPT_ENCODED_BLOCK_1: &[u8] = &hex!("f901a4f901a1800183031843f90197f89b948ce8c13d816fe6daf12d6fd9e4952e1fc88850aef863a00109fc6f55cf40689f02fbaad7af7fe7bbac8a3d2186600afc7d3e10cac6027ba00000000000000000000000000000000000000000000000000000000000014218a000000000000000000000000070b17c0fe982ab4a7ac17a4c25485643151a1f2da000000000000000000000000000000000000000000000000000000000618d8837f89c948ce8c13d816fe6daf12d6fd9e4952e1fc88850aef884a092e98423f8adac6e64d0608e519fd1cefb861498385c6dee70d58fc926ddc68ba000000000000000000000000000000000000000000000000000000000d0e3ebf0a00000000000000000000000000000000000000000000000000000000000014218a000000000000000000000000070b17c0fe982ab4a7ac17a4c25485643151a1f2d80f85a948ce8c13d816fe6daf12d6fd9e4952e1fc88850aef842a0fe25c73e3b9089fac37d55c4c7efcba6f04af04cebd2fc4d6d7dbb07e1e5234fa000000000000000000000000000000000000000000000007edc6ca0bb683480008001");
287
288 const MOCK_RECEIPT_ENCODED_BLOCK_2: &[u8] = &hex!("f90106f9010380018301c60df8faf89c948ce8c13d816fe6daf12d6fd9e4952e1fc88850aef884a092e98423f8adac6e64d0608e519fd1cefb861498385c6dee70d58fc926ddc68da000000000000000000000000000000000000000000000000000000000d0ea0e40a00000000000000000000000000000000000000000000000000000000000014218a0000000000000000000000000e5e7492282fd1e3bfac337a0beccd29b15b7b24080f85a948ce8c13d816fe6daf12d6fd9e4952e1fc88850aef842a0fe25c73e3b9089fac37d55c4c7efcba6f04af04cebd2fc4d6d7dbb07e1e5234ea000000000000000000000000000000000000000000000007eda7867e0c7d480008002");
289
290 const MOCK_RECEIPT_ENCODED_BLOCK_3: &[u8] = &hex!("f90106f9010380018301c60df8faf89c948ce8c13d816fe6daf12d6fd9e4952e1fc88850aef884a092e98423f8adac6e64d0608e519fd1cefb861498385c6dee70d58fc926ddc68da000000000000000000000000000000000000000000000000000000000d101e54ba00000000000000000000000000000000000000000000000000000000000014218a0000000000000000000000000fa011d8d6c26f13abe2cefed38226e401b2b8a9980f85a948ce8c13d816fe6daf12d6fd9e4952e1fc88850aef842a0fe25c73e3b9089fac37d55c4c7efcba6f04af04cebd2fc4d6d7dbb07e1e5234ea000000000000000000000000000000000000000000000007ed8842f06277480008003");
291
292 fn mock_receipt_1() -> MockReceipt {
293 let receipt = receipt_block_1();
294 MockReceipt {
295 tx_type: receipt.receipt.tx_type as u8,
296 status: receipt.receipt.success as u64,
297
298 cumulative_gas_used: receipt.receipt.cumulative_gas_used,
299 logs: receipt.receipt.logs,
300 block_number: 1,
301 }
302 }
303
304 fn mock_receipt_2() -> MockReceipt {
305 let receipt = receipt_block_2();
306 MockReceipt {
307 tx_type: receipt.receipt.tx_type as u8,
308 status: receipt.receipt.success as u64,
309
310 cumulative_gas_used: receipt.receipt.cumulative_gas_used,
311 logs: receipt.receipt.logs,
312 block_number: 2,
313 }
314 }
315
316 fn mock_receipt_3() -> MockReceipt {
317 let receipt = receipt_block_3();
318 MockReceipt {
319 tx_type: receipt.receipt.tx_type as u8,
320 status: receipt.receipt.success as u64,
321
322 cumulative_gas_used: receipt.receipt.cumulative_gas_used,
323 logs: receipt.receipt.logs,
324 block_number: 3,
325 }
326 }
327
328 fn receipt_block_1() -> ReceiptWithBlockNumber {
329 let log_1 = Log {
330 address: address!("0x8ce8c13d816fe6daf12d6fd9e4952e1fc88850ae"),
331 data: LogData::new(
332 vec![
333 b256!("0x0109fc6f55cf40689f02fbaad7af7fe7bbac8a3d2186600afc7d3e10cac6027b"),
334 b256!("0x0000000000000000000000000000000000000000000000000000000000014218"),
335 b256!("0x00000000000000000000000070b17c0fe982ab4a7ac17a4c25485643151a1f2d"),
336 ],
337 Bytes::from(hex!(
338 "00000000000000000000000000000000000000000000000000000000618d8837"
339 )),
340 )
341 .unwrap(),
342 };
343
344 let log_2 = Log {
345 address: address!("0x8ce8c13d816fe6daf12d6fd9e4952e1fc88850ae"),
346 data: LogData::new(
347 vec![
348 b256!("0x92e98423f8adac6e64d0608e519fd1cefb861498385c6dee70d58fc926ddc68b"),
349 b256!("0x00000000000000000000000000000000000000000000000000000000d0e3ebf0"),
350 b256!("0x0000000000000000000000000000000000000000000000000000000000014218"),
351 b256!("0x00000000000000000000000070b17c0fe982ab4a7ac17a4c25485643151a1f2d"),
352 ],
353 Bytes::default(),
354 )
355 .unwrap(),
356 };
357
358 let log_3 = Log {
359 address: address!("0x8ce8c13d816fe6daf12d6fd9e4952e1fc88850ae"),
360 data: LogData::new(
361 vec![
362 b256!("0xfe25c73e3b9089fac37d55c4c7efcba6f04af04cebd2fc4d6d7dbb07e1e5234f"),
363 b256!("0x00000000000000000000000000000000000000000000007edc6ca0bb68348000"),
364 ],
365 Bytes::default(),
366 )
367 .unwrap(),
368 };
369
370 let mut receipt = Receipt {
372 tx_type: TxType::Legacy,
373 success: true,
374 cumulative_gas_used: 202819,
375 logs: vec![],
376 };
377 receipt.logs = vec![log_1, log_2, log_3];
378
379 ReceiptWithBlockNumber { receipt, number: 1 }
380 }
381
382 fn receipt_block_2() -> ReceiptWithBlockNumber {
383 let log_1 = Log {
384 address: address!("0x8ce8c13d816fe6daf12d6fd9e4952e1fc88850ae"),
385 data: LogData::new(
386 vec![
387 b256!("0x92e98423f8adac6e64d0608e519fd1cefb861498385c6dee70d58fc926ddc68d"),
388 b256!("0x00000000000000000000000000000000000000000000000000000000d0ea0e40"),
389 b256!("0x0000000000000000000000000000000000000000000000000000000000014218"),
390 b256!("0x000000000000000000000000e5e7492282fd1e3bfac337a0beccd29b15b7b240"),
391 ],
392 Bytes::default(),
393 )
394 .unwrap(),
395 };
396
397 let log_2 = Log {
398 address: address!("0x8ce8c13d816fe6daf12d6fd9e4952e1fc88850ae"),
399 data: LogData::new(
400 vec![
401 b256!("0xfe25c73e3b9089fac37d55c4c7efcba6f04af04cebd2fc4d6d7dbb07e1e5234e"),
402 b256!("0x00000000000000000000000000000000000000000000007eda7867e0c7d48000"),
403 ],
404 Bytes::default(),
405 )
406 .unwrap(),
407 };
408
409 let mut receipt = Receipt {
410 tx_type: TxType::Legacy,
411 success: true,
412 cumulative_gas_used: 116237,
413 logs: vec![],
414 };
415 receipt.logs = vec![log_1, log_2];
416
417 ReceiptWithBlockNumber { receipt, number: 2 }
418 }
419
420 fn receipt_block_3() -> ReceiptWithBlockNumber {
421 let log_1 = Log {
422 address: address!("0x8ce8c13d816fe6daf12d6fd9e4952e1fc88850ae"),
423 data: LogData::new(
424 vec![
425 b256!("0x92e98423f8adac6e64d0608e519fd1cefb861498385c6dee70d58fc926ddc68d"),
426 b256!("0x00000000000000000000000000000000000000000000000000000000d101e54b"),
427 b256!("0x0000000000000000000000000000000000000000000000000000000000014218"),
428 b256!("0x000000000000000000000000fa011d8d6c26f13abe2cefed38226e401b2b8a99"),
429 ],
430 Bytes::default(),
431 )
432 .unwrap(),
433 };
434
435 let log_2 = Log {
436 address: address!("0x8ce8c13d816fe6daf12d6fd9e4952e1fc88850ae"),
437 data: LogData::new(
438 vec![
439 b256!("0xfe25c73e3b9089fac37d55c4c7efcba6f04af04cebd2fc4d6d7dbb07e1e5234e"),
440 b256!("0x00000000000000000000000000000000000000000000007ed8842f0627748000"),
441 ],
442 Bytes::default(),
443 )
444 .unwrap(),
445 };
446
447 let mut receipt = Receipt {
448 tx_type: TxType::Legacy,
449 success: true,
450 cumulative_gas_used: 116237,
451 ..Default::default()
452 };
453 receipt.logs = vec![log_1, log_2];
454
455 ReceiptWithBlockNumber { receipt, number: 3 }
456 }
457
458 #[test]
459 fn decode_mock_receipt() {
460 let receipt1 = mock_receipt_1();
461 let decoded1 = MockReceiptContainer::decode(&mut &MOCK_RECEIPT_ENCODED_BLOCK_1[..])
462 .unwrap()
463 .0
464 .unwrap();
465 assert_eq!(receipt1, decoded1);
466
467 let receipt2 = mock_receipt_2();
468 let decoded2 = MockReceiptContainer::decode(&mut &MOCK_RECEIPT_ENCODED_BLOCK_2[..])
469 .unwrap()
470 .0
471 .unwrap();
472 assert_eq!(receipt2, decoded2);
473
474 let receipt3 = mock_receipt_3();
475 let decoded3 = MockReceiptContainer::decode(&mut &MOCK_RECEIPT_ENCODED_BLOCK_3[..])
476 .unwrap()
477 .0
478 .unwrap();
479 assert_eq!(receipt3, decoded3);
480 }
481
482 #[test]
483 fn receipts_codec() {
484 let mut receipt_1_to_3 = MOCK_RECEIPT_ENCODED_BLOCK_1.to_vec();
487 receipt_1_to_3.extend_from_slice(MOCK_RECEIPT_ENCODED_BLOCK_2);
488 receipt_1_to_3.extend_from_slice(MOCK_RECEIPT_ENCODED_BLOCK_3);
489
490 let encoded = &mut BytesMut::from(&receipt_1_to_3[..]);
491
492 let mut codec = MockReceiptFileCodec;
493
494 let first_decoded_receipt = codec.decode(encoded).unwrap().unwrap().unwrap();
497
498 assert_eq!(receipt_block_1(), first_decoded_receipt);
499
500 let second_decoded_receipt = codec.decode(encoded).unwrap().unwrap().unwrap();
501
502 assert_eq!(receipt_block_2(), second_decoded_receipt);
503
504 let third_decoded_receipt = codec.decode(encoded).unwrap().unwrap().unwrap();
505
506 assert_eq!(receipt_block_3(), third_decoded_receipt);
507 }
508
509 #[tokio::test]
510 async fn receipt_file_client_ovm_codec() {
511 init_test_tracing();
512
513 let mut encoded_receipts = MOCK_RECEIPT_BLOCK_NO_TRANSACTIONS.to_vec();
515 encoded_receipts.extend_from_slice(MOCK_RECEIPT_ENCODED_BLOCK_1);
517 encoded_receipts.extend_from_slice(MOCK_RECEIPT_ENCODED_BLOCK_2);
518 encoded_receipts.extend_from_slice(MOCK_RECEIPT_BLOCK_NO_TRANSACTIONS);
520
521 let encoded_byte_len = encoded_receipts.len() as u64;
522 let reader = &mut &encoded_receipts[..];
523
524 let DecodedFileChunk {
525 file_client: ReceiptFileClient { receipts, first_block, total_receipts, .. },
526 ..
527 } = ReceiptFileClient::<MockReceiptFileCodec>::from_receipt_reader(
528 reader,
529 encoded_byte_len,
530 None,
531 )
532 .await
533 .unwrap();
534
535 assert_eq!(2, total_receipts);
537 assert_eq!(0, first_block);
538 assert!(receipts[0].is_empty());
539 assert_eq!(receipt_block_1().receipt, receipts[1][0].clone());
540 assert_eq!(receipt_block_2().receipt, receipts[2][0].clone());
541 assert!(receipts[3].is_empty());
542 }
543
544 #[tokio::test]
545 async fn no_receipts_middle_block() {
546 init_test_tracing();
547
548 let mut encoded_receipts = MOCK_RECEIPT_BLOCK_NO_TRANSACTIONS.to_vec();
550 encoded_receipts.extend_from_slice(MOCK_RECEIPT_ENCODED_BLOCK_1);
552 encoded_receipts.extend_from_slice(MOCK_RECEIPT_BLOCK_NO_TRANSACTIONS);
554 encoded_receipts.extend_from_slice(MOCK_RECEIPT_ENCODED_BLOCK_3);
556
557 let encoded_byte_len = encoded_receipts.len() as u64;
558 let reader = &mut &encoded_receipts[..];
559
560 let DecodedFileChunk {
561 file_client: ReceiptFileClient { receipts, first_block, total_receipts, .. },
562 ..
563 } = ReceiptFileClient::<MockReceiptFileCodec>::from_receipt_reader(
564 reader,
565 encoded_byte_len,
566 None,
567 )
568 .await
569 .unwrap();
570
571 assert_eq!(2, total_receipts);
573 assert_eq!(0, first_block);
574 assert!(receipts[0].is_empty());
575 assert_eq!(receipt_block_1().receipt, receipts[1][0].clone());
576 assert!(receipts[2].is_empty());
577 assert_eq!(receipt_block_3().receipt, receipts[3][0].clone());
578 }
579
580 #[tokio::test]
581 async fn two_receipts_same_block() {
582 init_test_tracing();
583
584 let mut encoded_receipts = MOCK_RECEIPT_BLOCK_NO_TRANSACTIONS.to_vec();
586 encoded_receipts.extend_from_slice(MOCK_RECEIPT_ENCODED_BLOCK_1);
588 encoded_receipts.extend_from_slice(MOCK_RECEIPT_ENCODED_BLOCK_2);
590 encoded_receipts.extend_from_slice(MOCK_RECEIPT_ENCODED_BLOCK_2);
591 encoded_receipts.extend_from_slice(MOCK_RECEIPT_ENCODED_BLOCK_3);
593
594 let encoded_byte_len = encoded_receipts.len() as u64;
595 let reader = &mut &encoded_receipts[..];
596
597 let DecodedFileChunk {
598 file_client: ReceiptFileClient { receipts, first_block, total_receipts, .. },
599 ..
600 } = ReceiptFileClient::<MockReceiptFileCodec>::from_receipt_reader(
601 reader,
602 encoded_byte_len,
603 None,
604 )
605 .await
606 .unwrap();
607
608 assert_eq!(4, total_receipts);
610 assert_eq!(0, first_block);
611 assert!(receipts[0].is_empty());
612 assert_eq!(receipt_block_1().receipt, receipts[1][0].clone());
613 assert_eq!(receipt_block_2().receipt, receipts[2][0].clone());
614 assert_eq!(receipt_block_2().receipt, receipts[2][1].clone());
615 assert_eq!(receipt_block_3().receipt, receipts[3][0].clone());
616 }
617}