reth_node_builder/launch/
exex.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
//! Support for launching execution extensions.

use std::{fmt, fmt::Debug};

use futures::future;
use reth_chain_state::ForkChoiceSubscriptions;
use reth_chainspec::EthChainSpec;
use reth_exex::{
    ExExContext, ExExHandle, ExExManager, ExExManagerHandle, ExExNotificationSource, Wal,
    DEFAULT_EXEX_MANAGER_CAPACITY,
};
use reth_node_api::{FullNodeComponents, NodeTypes};
use reth_primitives::Head;
use reth_provider::CanonStateSubscriptions;
use reth_tracing::tracing::{debug, info};
use tracing::Instrument;

use crate::{common::WithConfigs, exex::BoxedLaunchExEx};

/// Can launch execution extensions.
pub struct ExExLauncher<Node: FullNodeComponents> {
    head: Head,
    extensions: Vec<(String, Box<dyn BoxedLaunchExEx<Node>>)>,
    components: Node,
    config_container: WithConfigs<<Node::Types as NodeTypes>::ChainSpec>,
}

impl<Node: FullNodeComponents + Clone> ExExLauncher<Node> {
    /// Create a new `ExExLauncher` with the given extensions.
    pub const fn new(
        head: Head,
        components: Node,
        extensions: Vec<(String, Box<dyn BoxedLaunchExEx<Node>>)>,
        config_container: WithConfigs<<Node::Types as NodeTypes>::ChainSpec>,
    ) -> Self {
        Self { head, extensions, components, config_container }
    }

    /// Launches all execution extensions.
    ///
    /// Spawns all extensions and returns the handle to the exex manager if any extensions are
    /// installed.
    pub async fn launch(self) -> eyre::Result<Option<ExExManagerHandle>> {
        let Self { head, extensions, components, config_container } = self;

        if extensions.is_empty() {
            // nothing to launch
            return Ok(None)
        }

        info!(target: "reth::cli", "Loading ExEx Write-Ahead Log...");
        let exex_wal = Wal::new(
            config_container
                .config
                .datadir
                .clone()
                .resolve_datadir(config_container.config.chain.chain())
                .exex_wal(),
        )?;

        let mut exex_handles = Vec::with_capacity(extensions.len());
        let mut exexes = Vec::with_capacity(extensions.len());

        for (id, exex) in extensions {
            // create a new exex handle
            let (handle, events, notifications) = ExExHandle::new(
                id.clone(),
                head,
                components.provider().clone(),
                components.block_executor().clone(),
                exex_wal.handle(),
            );
            exex_handles.push(handle);

            // create the launch context for the exex
            let context = ExExContext {
                head,
                config: config_container.config.clone(),
                reth_config: config_container.toml_config.clone(),
                components: components.clone(),
                events,
                notifications,
            };

            let executor = components.task_executor().clone();
            exexes.push(async move {
                debug!(target: "reth::cli", id, "spawning exex");
                let span = reth_tracing::tracing::info_span!("exex", id);

                // init the exex
                let exex = exex.launch(context).instrument(span.clone()).await.unwrap();

                // spawn it as a crit task
                executor.spawn_critical(
                    "exex",
                    async move {
                        info!(target: "reth::cli", "ExEx started");
                        match exex.await {
                            Ok(_) => panic!("ExEx {id} finished. ExExes should run indefinitely"),
                            Err(err) => panic!("ExEx {id} crashed: {err}"),
                        }
                    }
                    .instrument(span),
                );
            });
        }

        future::join_all(exexes).await;

        // spawn exex manager
        debug!(target: "reth::cli", "spawning exex manager");
        let exex_manager = ExExManager::new(
            components.provider().clone(),
            exex_handles,
            DEFAULT_EXEX_MANAGER_CAPACITY,
            exex_wal,
            components.provider().finalized_block_stream(),
        );
        let exex_manager_handle = exex_manager.handle();
        components.task_executor().spawn_critical("exex manager", async move {
            exex_manager.await.expect("exex manager crashed");
        });

        // send notifications from the blockchain tree to exex manager
        let mut canon_state_notifications = components.provider().subscribe_to_canonical_state();
        let mut handle = exex_manager_handle.clone();
        components.task_executor().spawn_critical(
            "exex manager blockchain tree notifications",
            async move {
                while let Ok(notification) = canon_state_notifications.recv().await {
                    handle
                        .send_async(ExExNotificationSource::BlockchainTree, notification.into())
                        .await
                        .expect("blockchain tree notification could not be sent to exex manager");
                }
            },
        );

        info!(target: "reth::cli", "ExEx Manager started");

        Ok(Some(exex_manager_handle))
    }
}

impl<Node: FullNodeComponents> Debug for ExExLauncher<Node> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("ExExLauncher")
            .field("head", &self.head)
            .field("extensions", &self.extensions.iter().map(|(id, _)| id).collect::<Vec<_>>())
            .field("components", &"...")
            .field("config_container", &self.config_container)
            .finish()
    }
}