reth_rpc_eth_types/cache/
mod.rs

1//! Async caching support for eth RPC
2
3use super::{EthStateCacheConfig, MultiConsumerLruCache};
4use alloy_eips::BlockHashOrNumber;
5use alloy_primitives::B256;
6use futures::{future::Either, Stream, StreamExt};
7use reth_chain_state::CanonStateNotification;
8use reth_errors::{ProviderError, ProviderResult};
9use reth_execution_types::Chain;
10use reth_primitives_traits::{Block, BlockBody, NodePrimitives, RecoveredBlock};
11use reth_storage_api::{BlockReader, TransactionVariant};
12use reth_tasks::{TaskSpawner, TokioTaskExecutor};
13use schnellru::{ByLength, Limiter};
14use std::{
15    future::Future,
16    pin::Pin,
17    sync::Arc,
18    task::{Context, Poll},
19};
20use tokio::sync::{
21    mpsc::{unbounded_channel, UnboundedSender},
22    oneshot, Semaphore,
23};
24use tokio_stream::wrappers::UnboundedReceiverStream;
25
26pub mod config;
27pub mod db;
28pub mod metrics;
29pub mod multi_consumer;
30
31/// The type that can send the response to a requested [`RecoveredBlock`]
32type BlockTransactionsResponseSender<T> = oneshot::Sender<ProviderResult<Option<Vec<T>>>>;
33
34/// The type that can send the response to a requested [`RecoveredBlock`]
35type BlockWithSendersResponseSender<B> =
36    oneshot::Sender<ProviderResult<Option<Arc<RecoveredBlock<B>>>>>;
37
38/// The type that can send the response to the requested receipts of a block.
39type ReceiptsResponseSender<R> = oneshot::Sender<ProviderResult<Option<Arc<Vec<R>>>>>;
40
41/// The type that can send the response to a requested header
42type HeaderResponseSender<H> = oneshot::Sender<ProviderResult<H>>;
43
44type BlockLruCache<B, L> = MultiConsumerLruCache<
45    B256,
46    Arc<RecoveredBlock<B>>,
47    L,
48    Either<
49        BlockWithSendersResponseSender<B>,
50        BlockTransactionsResponseSender<<<B as Block>::Body as BlockBody>::Transaction>,
51    >,
52>;
53
54type ReceiptsLruCache<R, L> =
55    MultiConsumerLruCache<B256, Arc<Vec<R>>, L, ReceiptsResponseSender<R>>;
56
57type HeaderLruCache<H, L> = MultiConsumerLruCache<B256, H, L, HeaderResponseSender<H>>;
58
59/// Provides async access to cached eth data
60///
61/// This is the frontend for the async caching service which manages cached data on a different
62/// task.
63#[derive(Debug)]
64pub struct EthStateCache<B: Block, R> {
65    to_service: UnboundedSender<CacheAction<B, R>>,
66}
67
68impl<B: Block, R> Clone for EthStateCache<B, R> {
69    fn clone(&self) -> Self {
70        Self { to_service: self.to_service.clone() }
71    }
72}
73
74impl<B: Block, R: Send + Sync> EthStateCache<B, R> {
75    /// Creates and returns both [`EthStateCache`] frontend and the memory bound service.
76    fn create<Provider, Tasks>(
77        provider: Provider,
78        action_task_spawner: Tasks,
79        max_blocks: u32,
80        max_receipts: u32,
81        max_headers: u32,
82        max_concurrent_db_operations: usize,
83    ) -> (Self, EthStateCacheService<Provider, Tasks>)
84    where
85        Provider: BlockReader<Block = B, Receipt = R>,
86    {
87        let (to_service, rx) = unbounded_channel();
88        let service = EthStateCacheService {
89            provider,
90            full_block_cache: BlockLruCache::new(max_blocks, "blocks"),
91            receipts_cache: ReceiptsLruCache::new(max_receipts, "receipts"),
92            headers_cache: HeaderLruCache::new(max_headers, "headers"),
93            action_tx: to_service.clone(),
94            action_rx: UnboundedReceiverStream::new(rx),
95            action_task_spawner,
96            rate_limiter: Arc::new(Semaphore::new(max_concurrent_db_operations)),
97        };
98        let cache = Self { to_service };
99        (cache, service)
100    }
101
102    /// Creates a new async LRU backed cache service task and spawns it to a new task via
103    /// [`tokio::spawn`].
104    ///
105    /// See also [`Self::spawn_with`]
106    pub fn spawn<Provider>(provider: Provider, config: EthStateCacheConfig) -> Self
107    where
108        Provider: BlockReader<Block = B, Receipt = R> + Clone + Unpin + 'static,
109    {
110        Self::spawn_with(provider, config, TokioTaskExecutor::default())
111    }
112
113    /// Creates a new async LRU backed cache service task and spawns it to a new task via the given
114    /// spawner.
115    ///
116    /// The cache is memory limited by the given max bytes values.
117    pub fn spawn_with<Provider, Tasks>(
118        provider: Provider,
119        config: EthStateCacheConfig,
120        executor: Tasks,
121    ) -> Self
122    where
123        Provider: BlockReader<Block = B, Receipt = R> + Clone + Unpin + 'static,
124        Tasks: TaskSpawner + Clone + 'static,
125    {
126        let EthStateCacheConfig {
127            max_blocks,
128            max_receipts,
129            max_headers,
130            max_concurrent_db_requests,
131        } = config;
132        let (this, service) = Self::create(
133            provider,
134            executor.clone(),
135            max_blocks,
136            max_receipts,
137            max_headers,
138            max_concurrent_db_requests,
139        );
140        executor.spawn_critical("eth state cache", Box::pin(service));
141        this
142    }
143
144    /// Requests the  [`RecoveredBlock`] for the block hash
145    ///
146    /// Returns `None` if the block does not exist.
147    pub async fn get_recovered_block(
148        &self,
149        block_hash: B256,
150    ) -> ProviderResult<Option<Arc<RecoveredBlock<B>>>> {
151        let (response_tx, rx) = oneshot::channel();
152        let _ = self.to_service.send(CacheAction::GetBlockWithSenders { block_hash, response_tx });
153        rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
154    }
155
156    /// Requests the receipts for the block hash
157    ///
158    /// Returns `None` if the block was not found.
159    pub async fn get_receipts(&self, block_hash: B256) -> ProviderResult<Option<Arc<Vec<R>>>> {
160        let (response_tx, rx) = oneshot::channel();
161        let _ = self.to_service.send(CacheAction::GetReceipts { block_hash, response_tx });
162        rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
163    }
164
165    /// Fetches both receipts and block for the given block hash.
166    pub async fn get_block_and_receipts(
167        &self,
168        block_hash: B256,
169    ) -> ProviderResult<Option<(Arc<RecoveredBlock<B>>, Arc<Vec<R>>)>> {
170        let block = self.get_recovered_block(block_hash);
171        let receipts = self.get_receipts(block_hash);
172
173        let (block, receipts) = futures::try_join!(block, receipts)?;
174
175        Ok(block.zip(receipts))
176    }
177
178    /// Requests the header for the given hash.
179    ///
180    /// Returns an error if the header is not found.
181    pub async fn get_header(&self, block_hash: B256) -> ProviderResult<B::Header> {
182        let (response_tx, rx) = oneshot::channel();
183        let _ = self.to_service.send(CacheAction::GetHeader { block_hash, response_tx });
184        rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
185    }
186}
187
188/// A task than manages caches for data required by the `eth` rpc implementation.
189///
190/// It provides a caching layer on top of the given
191/// [`StateProvider`](reth_storage_api::StateProvider) and keeps data fetched via the provider in
192/// memory in an LRU cache. If the requested data is missing in the cache it is fetched and inserted
193/// into the cache afterwards. While fetching data from disk is sync, this service is async since
194/// requests and data is shared via channels.
195///
196/// This type is an endless future that listens for incoming messages from the user facing
197/// [`EthStateCache`] via a channel. If the requested data is not cached then it spawns a new task
198/// that does the IO and sends the result back to it. This way the caching service only
199/// handles messages and does LRU lookups and never blocking IO.
200///
201/// Caution: The channel for the data is _unbounded_ it is assumed that this is mainly used by the
202/// `reth_rpc::EthApi` which is typically invoked by the RPC server, which already uses
203/// permits to limit concurrent requests.
204#[must_use = "Type does nothing unless spawned"]
205pub(crate) struct EthStateCacheService<
206    Provider,
207    Tasks,
208    LimitBlocks = ByLength,
209    LimitReceipts = ByLength,
210    LimitHeaders = ByLength,
211> where
212    Provider: BlockReader,
213    LimitBlocks: Limiter<B256, Arc<RecoveredBlock<Provider::Block>>>,
214    LimitReceipts: Limiter<B256, Arc<Vec<Provider::Receipt>>>,
215    LimitHeaders: Limiter<B256, Provider::Header>,
216{
217    /// The type used to lookup data from disk
218    provider: Provider,
219    /// The LRU cache for full blocks grouped by their block hash.
220    full_block_cache: BlockLruCache<Provider::Block, LimitBlocks>,
221    /// The LRU cache for block receipts grouped by the block hash.
222    receipts_cache: ReceiptsLruCache<Provider::Receipt, LimitReceipts>,
223    /// The LRU cache for headers.
224    ///
225    /// Headers are cached because they are required to populate the environment for execution
226    /// (evm).
227    headers_cache: HeaderLruCache<Provider::Header, LimitHeaders>,
228    /// Sender half of the action channel.
229    action_tx: UnboundedSender<CacheAction<Provider::Block, Provider::Receipt>>,
230    /// Receiver half of the action channel.
231    action_rx: UnboundedReceiverStream<CacheAction<Provider::Block, Provider::Receipt>>,
232    /// The type that's used to spawn tasks that do the actual work
233    action_task_spawner: Tasks,
234    /// Rate limiter for spawned fetch tasks.
235    ///
236    /// This restricts the max concurrent fetch tasks at the same time.
237    rate_limiter: Arc<Semaphore>,
238}
239
240impl<Provider, Tasks> EthStateCacheService<Provider, Tasks>
241where
242    Provider: BlockReader + Clone + Unpin + 'static,
243    Tasks: TaskSpawner + Clone + 'static,
244{
245    fn on_new_block(
246        &mut self,
247        block_hash: B256,
248        res: ProviderResult<Option<Arc<RecoveredBlock<Provider::Block>>>>,
249    ) {
250        if let Some(queued) = self.full_block_cache.remove(&block_hash) {
251            // send the response to queued senders
252            for tx in queued {
253                match tx {
254                    Either::Left(block_with_senders) => {
255                        let _ = block_with_senders.send(res.clone());
256                    }
257                    Either::Right(transaction_tx) => {
258                        let _ = transaction_tx.send(res.clone().map(|maybe_block| {
259                            maybe_block.map(|block| block.body().transactions().to_vec())
260                        }));
261                    }
262                }
263            }
264        }
265
266        // cache good block
267        if let Ok(Some(block)) = res {
268            self.full_block_cache.insert(block_hash, block);
269        }
270    }
271
272    fn on_new_receipts(
273        &mut self,
274        block_hash: B256,
275        res: ProviderResult<Option<Arc<Vec<Provider::Receipt>>>>,
276    ) {
277        if let Some(queued) = self.receipts_cache.remove(&block_hash) {
278            // send the response to queued senders
279            for tx in queued {
280                let _ = tx.send(res.clone());
281            }
282        }
283
284        // cache good receipts
285        if let Ok(Some(receipts)) = res {
286            self.receipts_cache.insert(block_hash, receipts);
287        }
288    }
289
290    fn on_reorg_block(
291        &mut self,
292        block_hash: B256,
293        res: ProviderResult<Option<RecoveredBlock<Provider::Block>>>,
294    ) {
295        let res = res.map(|b| b.map(Arc::new));
296        if let Some(queued) = self.full_block_cache.remove(&block_hash) {
297            // send the response to queued senders
298            for tx in queued {
299                match tx {
300                    Either::Left(block_with_senders) => {
301                        let _ = block_with_senders.send(res.clone());
302                    }
303                    Either::Right(transaction_tx) => {
304                        let _ = transaction_tx.send(res.clone().map(|maybe_block| {
305                            maybe_block.map(|block| block.body().transactions().to_vec())
306                        }));
307                    }
308                }
309            }
310        }
311    }
312
313    fn on_reorg_receipts(
314        &mut self,
315        block_hash: B256,
316        res: ProviderResult<Option<Arc<Vec<Provider::Receipt>>>>,
317    ) {
318        if let Some(queued) = self.receipts_cache.remove(&block_hash) {
319            // send the response to queued senders
320            for tx in queued {
321                let _ = tx.send(res.clone());
322            }
323        }
324    }
325
326    /// Shrinks the queues but leaves some space for the next requests
327    fn shrink_queues(&mut self) {
328        let min_capacity = 2;
329        self.full_block_cache.shrink_to(min_capacity);
330        self.receipts_cache.shrink_to(min_capacity);
331        self.headers_cache.shrink_to(min_capacity);
332    }
333
334    fn update_cached_metrics(&self) {
335        self.full_block_cache.update_cached_metrics();
336        self.receipts_cache.update_cached_metrics();
337        self.headers_cache.update_cached_metrics();
338    }
339}
340
341impl<Provider, Tasks> Future for EthStateCacheService<Provider, Tasks>
342where
343    Provider: BlockReader + Clone + Unpin + 'static,
344    Tasks: TaskSpawner + Clone + 'static,
345{
346    type Output = ();
347
348    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
349        let this = self.get_mut();
350
351        loop {
352            let Poll::Ready(action) = this.action_rx.poll_next_unpin(cx) else {
353                // shrink queues if we don't have any work to do
354                this.shrink_queues();
355                return Poll::Pending;
356            };
357
358            match action {
359                None => {
360                    unreachable!("can't close")
361                }
362                Some(action) => {
363                    match action {
364                        CacheAction::GetBlockWithSenders { block_hash, response_tx } => {
365                            if let Some(block) = this.full_block_cache.get(&block_hash).cloned() {
366                                let _ = response_tx.send(Ok(Some(block)));
367                                continue
368                            }
369
370                            // block is not in the cache, request it if this is the first consumer
371                            if this.full_block_cache.queue(block_hash, Either::Left(response_tx)) {
372                                let provider = this.provider.clone();
373                                let action_tx = this.action_tx.clone();
374                                let rate_limiter = this.rate_limiter.clone();
375                                let mut action_sender =
376                                    ActionSender::new(CacheKind::Block, block_hash, action_tx);
377                                this.action_task_spawner.spawn_blocking(Box::pin(async move {
378                                    // Acquire permit
379                                    let _permit = rate_limiter.acquire().await;
380                                    // Only look in the database to prevent situations where we
381                                    // looking up the tree is blocking
382                                    let block_sender = provider
383                                        .sealed_block_with_senders(
384                                            BlockHashOrNumber::Hash(block_hash),
385                                            TransactionVariant::WithHash,
386                                        )
387                                        .map(|maybe_block| maybe_block.map(Arc::new));
388                                    action_sender.send_block(block_sender);
389                                }));
390                            }
391                        }
392                        CacheAction::GetReceipts { block_hash, response_tx } => {
393                            // check if block is cached
394                            if let Some(receipts) = this.receipts_cache.get(&block_hash).cloned() {
395                                let _ = response_tx.send(Ok(Some(receipts)));
396                                continue
397                            }
398
399                            // block is not in the cache, request it if this is the first consumer
400                            if this.receipts_cache.queue(block_hash, response_tx) {
401                                let provider = this.provider.clone();
402                                let action_tx = this.action_tx.clone();
403                                let rate_limiter = this.rate_limiter.clone();
404                                let mut action_sender =
405                                    ActionSender::new(CacheKind::Receipt, block_hash, action_tx);
406                                this.action_task_spawner.spawn_blocking(Box::pin(async move {
407                                    // Acquire permit
408                                    let _permit = rate_limiter.acquire().await;
409                                    let res = provider
410                                        .receipts_by_block(block_hash.into())
411                                        .map(|maybe_receipts| maybe_receipts.map(Arc::new));
412
413                                    action_sender.send_receipts(res);
414                                }));
415                            }
416                        }
417                        CacheAction::GetHeader { block_hash, response_tx } => {
418                            // check if the header is cached
419                            if let Some(header) = this.headers_cache.get(&block_hash).cloned() {
420                                let _ = response_tx.send(Ok(header));
421                                continue
422                            }
423
424                            // it's possible we have the entire block cached
425                            if let Some(block) = this.full_block_cache.get(&block_hash) {
426                                let _ = response_tx.send(Ok(block.clone_header()));
427                                continue
428                            }
429
430                            // header is not in the cache, request it if this is the first
431                            // consumer
432                            if this.headers_cache.queue(block_hash, response_tx) {
433                                let provider = this.provider.clone();
434                                let action_tx = this.action_tx.clone();
435                                let rate_limiter = this.rate_limiter.clone();
436                                let mut action_sender =
437                                    ActionSender::new(CacheKind::Header, block_hash, action_tx);
438                                this.action_task_spawner.spawn_blocking(Box::pin(async move {
439                                    // Acquire permit
440                                    let _permit = rate_limiter.acquire().await;
441                                    let header = provider.header(&block_hash).and_then(|header| {
442                                        header.ok_or_else(|| {
443                                            ProviderError::HeaderNotFound(block_hash.into())
444                                        })
445                                    });
446                                    action_sender.send_header(header);
447                                }));
448                            }
449                        }
450                        CacheAction::ReceiptsResult { block_hash, res } => {
451                            this.on_new_receipts(block_hash, res);
452                        }
453                        CacheAction::BlockWithSendersResult { block_hash, res } => match res {
454                            Ok(Some(block_with_senders)) => {
455                                this.on_new_block(block_hash, Ok(Some(block_with_senders)));
456                            }
457                            Ok(None) => {
458                                this.on_new_block(block_hash, Ok(None));
459                            }
460                            Err(e) => {
461                                this.on_new_block(block_hash, Err(e));
462                            }
463                        },
464                        CacheAction::HeaderResult { block_hash, res } => {
465                            let res = *res;
466                            if let Some(queued) = this.headers_cache.remove(&block_hash) {
467                                // send the response to queued senders
468                                for tx in queued {
469                                    let _ = tx.send(res.clone());
470                                }
471                            }
472
473                            // cache good header
474                            if let Ok(data) = res {
475                                this.headers_cache.insert(block_hash, data);
476                            }
477                        }
478                        CacheAction::CacheNewCanonicalChain { chain_change } => {
479                            for block in chain_change.blocks {
480                                this.on_new_block(block.hash(), Ok(Some(Arc::new(block))));
481                            }
482
483                            for block_receipts in chain_change.receipts {
484                                this.on_new_receipts(
485                                    block_receipts.block_hash,
486                                    Ok(Some(Arc::new(block_receipts.receipts))),
487                                );
488                            }
489                        }
490                        CacheAction::RemoveReorgedChain { chain_change } => {
491                            for block in chain_change.blocks {
492                                this.on_reorg_block(block.hash(), Ok(Some(block)));
493                            }
494
495                            for block_receipts in chain_change.receipts {
496                                this.on_reorg_receipts(
497                                    block_receipts.block_hash,
498                                    Ok(Some(Arc::new(block_receipts.receipts))),
499                                );
500                            }
501                        }
502                    };
503                    this.update_cached_metrics();
504                }
505            }
506        }
507    }
508}
509
510/// All message variants sent through the channel
511enum CacheAction<B: Block, R> {
512    GetBlockWithSenders { block_hash: B256, response_tx: BlockWithSendersResponseSender<B> },
513    GetHeader { block_hash: B256, response_tx: HeaderResponseSender<B::Header> },
514    GetReceipts { block_hash: B256, response_tx: ReceiptsResponseSender<R> },
515    BlockWithSendersResult { block_hash: B256, res: ProviderResult<Option<Arc<RecoveredBlock<B>>>> },
516    ReceiptsResult { block_hash: B256, res: ProviderResult<Option<Arc<Vec<R>>>> },
517    HeaderResult { block_hash: B256, res: Box<ProviderResult<B::Header>> },
518    CacheNewCanonicalChain { chain_change: ChainChange<B, R> },
519    RemoveReorgedChain { chain_change: ChainChange<B, R> },
520}
521
522struct BlockReceipts<R> {
523    block_hash: B256,
524    receipts: Vec<R>,
525}
526
527/// A change of the canonical chain
528struct ChainChange<B: Block, R> {
529    blocks: Vec<RecoveredBlock<B>>,
530    receipts: Vec<BlockReceipts<R>>,
531}
532
533impl<B: Block, R: Clone> ChainChange<B, R> {
534    fn new<N>(chain: Arc<Chain<N>>) -> Self
535    where
536        N: NodePrimitives<Block = B, Receipt = R>,
537    {
538        let (blocks, receipts): (Vec<_>, Vec<_>) = chain
539            .blocks_and_receipts()
540            .map(|(block, receipts)| {
541                let block_receipts =
542                    BlockReceipts { block_hash: block.hash(), receipts: receipts.clone() };
543                (block.clone(), block_receipts)
544            })
545            .unzip();
546        Self { blocks, receipts }
547    }
548}
549
550/// Identifier for the caches.
551#[derive(Copy, Clone, Debug)]
552enum CacheKind {
553    Block,
554    Receipt,
555    Header,
556}
557
558/// Drop aware sender struct that ensures a response is always emitted even if the db task panics
559/// before a result could be sent.
560///
561/// This type wraps a sender and in case the sender is still present on drop emit an error response.
562#[derive(Debug)]
563struct ActionSender<B: Block, R: Send + Sync> {
564    kind: CacheKind,
565    blockhash: B256,
566    tx: Option<UnboundedSender<CacheAction<B, R>>>,
567}
568
569impl<R: Send + Sync, B: Block> ActionSender<B, R> {
570    const fn new(kind: CacheKind, blockhash: B256, tx: UnboundedSender<CacheAction<B, R>>) -> Self {
571        Self { kind, blockhash, tx: Some(tx) }
572    }
573
574    fn send_block(&mut self, block_sender: Result<Option<Arc<RecoveredBlock<B>>>, ProviderError>) {
575        if let Some(tx) = self.tx.take() {
576            let _ = tx.send(CacheAction::BlockWithSendersResult {
577                block_hash: self.blockhash,
578                res: block_sender,
579            });
580        }
581    }
582
583    fn send_receipts(&mut self, receipts: Result<Option<Arc<Vec<R>>>, ProviderError>) {
584        if let Some(tx) = self.tx.take() {
585            let _ =
586                tx.send(CacheAction::ReceiptsResult { block_hash: self.blockhash, res: receipts });
587        }
588    }
589
590    fn send_header(&mut self, header: Result<<B as Block>::Header, ProviderError>) {
591        if let Some(tx) = self.tx.take() {
592            let _ = tx.send(CacheAction::HeaderResult {
593                block_hash: self.blockhash,
594                res: Box::new(header),
595            });
596        }
597    }
598}
599impl<R: Send + Sync, B: Block> Drop for ActionSender<B, R> {
600    fn drop(&mut self) {
601        if let Some(tx) = self.tx.take() {
602            let msg = match self.kind {
603                CacheKind::Block => CacheAction::BlockWithSendersResult {
604                    block_hash: self.blockhash,
605                    res: Err(ProviderError::CacheServiceUnavailable),
606                },
607                CacheKind::Receipt => CacheAction::ReceiptsResult {
608                    block_hash: self.blockhash,
609                    res: Err(ProviderError::CacheServiceUnavailable),
610                },
611                CacheKind::Header => CacheAction::HeaderResult {
612                    block_hash: self.blockhash,
613                    res: Box::new(Err(ProviderError::CacheServiceUnavailable)),
614                },
615            };
616            let _ = tx.send(msg);
617        }
618    }
619}
620
621/// Awaits for new chain events and directly inserts them into the cache so they're available
622/// immediately before they need to be fetched from disk.
623///
624/// Reorged blocks are removed from the cache.
625pub async fn cache_new_blocks_task<St, N: NodePrimitives>(
626    eth_state_cache: EthStateCache<N::Block, N::Receipt>,
627    mut events: St,
628) where
629    St: Stream<Item = CanonStateNotification<N>> + Unpin + 'static,
630{
631    while let Some(event) = events.next().await {
632        if let Some(reverted) = event.reverted() {
633            let chain_change = ChainChange::new(reverted);
634
635            let _ =
636                eth_state_cache.to_service.send(CacheAction::RemoveReorgedChain { chain_change });
637        }
638
639        let chain_change = ChainChange::new(event.committed());
640
641        let _ =
642            eth_state_cache.to_service.send(CacheAction::CacheNewCanonicalChain { chain_change });
643    }
644}