reth_ipc/
stream_codec.rs

1// Copyright (c) 2015-2017 Parity Technologies Limited
2//
3// Permission is hereby granted, free of charge, to any
4// person obtaining a copy of this software and associated
5// documentation files (the "Software"), to deal in the
6// Software without restriction, including without
7// limitation the rights to use, copy, modify, merge,
8// publish, distribute, sublicense, and/or sell copies of
9// the Software, and to permit persons to whom the Software
10// is furnished to do so, subject to the following
11// conditions:
12//
13// The above copyright notice and this permission notice
14// shall be included in all copies or substantial portions
15// of the Software.
16//
17// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
18// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
19// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
20// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
21// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
22// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
23// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
24// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25// DEALINGS IN THE SOFTWARE.
26
27// This basis of this file has been taken from the deprecated jsonrpc codebase:
28// https://github.com/paritytech/jsonrpc
29
30use bytes::{Buf, BytesMut};
31use std::{io, str};
32
33/// Separator for enveloping messages in streaming codecs
34#[derive(Debug, Clone)]
35pub enum Separator {
36    /// No envelope is expected between messages. Decoder will try to figure out
37    /// message boundaries by accumulating incoming bytes until valid JSON is formed.
38    /// Encoder will send messages without any boundaries between requests.
39    Empty,
40    /// Byte is used as a sentinel between messages
41    Byte(u8),
42}
43
44impl Default for Separator {
45    fn default() -> Self {
46        Self::Byte(b'\n')
47    }
48}
49
50/// Stream codec for streaming protocols (ipc, tcp)
51#[derive(Debug, Default)]
52pub struct StreamCodec {
53    incoming_separator: Separator,
54    outgoing_separator: Separator,
55}
56
57impl StreamCodec {
58    /// Default codec with streaming input data. Input can be both enveloped and not.
59    pub fn stream_incoming() -> Self {
60        Self::new(Separator::Empty, Default::default())
61    }
62
63    /// New custom stream codec
64    pub const fn new(incoming_separator: Separator, outgoing_separator: Separator) -> Self {
65        Self { incoming_separator, outgoing_separator }
66    }
67}
68
69#[inline]
70const fn is_whitespace(byte: u8) -> bool {
71    matches!(byte, 0x0D | 0x0A | 0x20 | 0x09)
72}
73
74impl tokio_util::codec::Decoder for StreamCodec {
75    type Item = String;
76    type Error = io::Error;
77
78    fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Item>> {
79        if let Separator::Byte(separator) = self.incoming_separator {
80            if let Some(i) = buf.as_ref().iter().position(|&b| b == separator) {
81                let line = buf.split_to(i);
82                let _ = buf.split_to(1);
83
84                match str::from_utf8(line.as_ref()) {
85                    Ok(s) => Ok(Some(s.to_string())),
86                    Err(_) => Err(io::Error::other("invalid UTF-8")),
87                }
88            } else {
89                Ok(None)
90            }
91        } else {
92            let mut depth = 0;
93            let mut in_str = false;
94            let mut is_escaped = false;
95            let mut start_idx = 0;
96            let mut whitespaces = 0;
97
98            for idx in 0..buf.as_ref().len() {
99                let byte = buf.as_ref()[idx];
100
101                if (byte == b'{' || byte == b'[') && !in_str {
102                    if depth == 0 {
103                        start_idx = idx;
104                    }
105                    depth += 1;
106                } else if (byte == b'}' || byte == b']') && !in_str {
107                    depth -= 1;
108                } else if byte == b'"' && !is_escaped {
109                    in_str = !in_str;
110                } else if is_whitespace(byte) {
111                    whitespaces += 1;
112                }
113                is_escaped = byte == b'\\' && !is_escaped && in_str;
114
115                if depth == 0 && idx != start_idx && idx - start_idx + 1 > whitespaces {
116                    if start_idx > 0 {
117                        buf.advance(start_idx);
118                    }
119                    let bts = buf.split_to(idx + 1 - start_idx);
120                    return match String::from_utf8(bts.into()) {
121                        Ok(val) => Ok(Some(val)),
122                        Err(_) => Ok(None),
123                    }
124                }
125            }
126            Ok(None)
127        }
128    }
129}
130
131impl tokio_util::codec::Encoder<String> for StreamCodec {
132    type Error = io::Error;
133
134    fn encode(&mut self, msg: String, buf: &mut BytesMut) -> io::Result<()> {
135        let mut payload = msg.into_bytes();
136        if let Separator::Byte(separator) = self.outgoing_separator {
137            payload.push(separator);
138        }
139        buf.extend_from_slice(&payload);
140        Ok(())
141    }
142}
143
144#[cfg(test)]
145mod tests {
146    use super::*;
147    use bytes::BufMut;
148    use tokio_util::codec::Decoder;
149
150    #[test]
151    fn simple_encode() {
152        let mut buf = BytesMut::with_capacity(2048);
153        buf.put_slice(b"{ test: 1 }{ test: 2 }{ test: 3 }");
154
155        let mut codec = StreamCodec::stream_incoming();
156
157        let request = codec
158            .decode(&mut buf)
159            .expect("There should be no error in simple test")
160            .expect("There should be at least one request in simple test");
161
162        assert_eq!(request, "{ test: 1 }");
163    }
164
165    #[test]
166    fn escape() {
167        let mut buf = BytesMut::with_capacity(2048);
168        buf.put_slice(br#"{ test: "\"\\" }{ test: "\ " }{ test: "\}" }[ test: "\]" ]"#);
169
170        let mut codec = StreamCodec::stream_incoming();
171
172        let request = codec
173            .decode(&mut buf)
174            .expect("There should be no error in first escape test")
175            .expect("There should be a request in first escape test");
176
177        assert_eq!(request, r#"{ test: "\"\\" }"#);
178
179        let request2 = codec
180            .decode(&mut buf)
181            .expect("There should be no error in 2nd escape test")
182            .expect("There should be a request in 2nd escape test");
183        assert_eq!(request2, r#"{ test: "\ " }"#);
184
185        let request3 = codec
186            .decode(&mut buf)
187            .expect("There should be no error in 3rd escape test")
188            .expect("There should be a request in 3rd escape test");
189        assert_eq!(request3, r#"{ test: "\}" }"#);
190
191        let request4 = codec
192            .decode(&mut buf)
193            .expect("There should be no error in 4th escape test")
194            .expect("There should be a request in 4th escape test");
195        assert_eq!(request4, r#"[ test: "\]" ]"#);
196    }
197
198    #[test]
199    fn whitespace() {
200        let mut buf = BytesMut::with_capacity(2048);
201        buf.put_slice(b"{ test: 1 }\n\n\n\n{ test: 2 }\n\r{\n test: 3 }  ");
202
203        let mut codec = StreamCodec::stream_incoming();
204
205        let request = codec
206            .decode(&mut buf)
207            .expect("There should be no error in first whitespace test")
208            .expect("There should be a request in first whitespace test");
209
210        assert_eq!(request, "{ test: 1 }");
211
212        let request2 = codec
213            .decode(&mut buf)
214            .expect("There should be no error in first 2nd test")
215            .expect("There should be a request in 2nd whitespace test");
216        assert_eq!(request2, "{ test: 2 }");
217
218        let request3 = codec
219            .decode(&mut buf)
220            .expect("There should be no error in first 3rd test")
221            .expect("There should be a request in 3rd whitespace test");
222        assert_eq!(request3, "{\n test: 3 }");
223
224        let request4 = codec.decode(&mut buf).expect("There should be no error in first 4th test");
225        assert!(
226            request4.is_none(),
227            "There should be no 4th request because it contains only whitespaces"
228        );
229    }
230
231    #[test]
232    fn fragmented_encode() {
233        let mut buf = BytesMut::with_capacity(2048);
234        buf.put_slice(b"{ test: 1 }{ test: 2 }{ tes");
235
236        let mut codec = StreamCodec::stream_incoming();
237
238        let request = codec
239            .decode(&mut buf)
240            .expect("There should be no error in first fragmented test")
241            .expect("There should be at least one request in first fragmented test");
242        assert_eq!(request, "{ test: 1 }");
243        codec
244            .decode(&mut buf)
245            .expect("There should be no error in second fragmented test")
246            .expect("There should be at least one request in second fragmented test");
247        assert_eq!(String::from_utf8(buf.as_ref().to_vec()).unwrap(), "{ tes");
248
249        buf.put_slice(b"t: 3 }");
250        let request = codec
251            .decode(&mut buf)
252            .expect("There should be no error in third fragmented test")
253            .expect("There should be at least one request in third fragmented test");
254        assert_eq!(request, "{ test: 3 }");
255    }
256
257    #[test]
258    fn huge() {
259        let request = r#"{
260			"jsonrpc":"2.0",
261			"method":"say_hello",
262			"params": [
263				42,
264				0,
265				{
266					"from":"0xb60e8dd61c5d32be8058bb8eb970870f07233155",
267					"gas":"0x2dc6c0",
268					"data":"0x606060405260003411156010576002565b6001805433600160a060020a0319918216811790925560028054909116909117905561291f806100406000396000f3606060405236156100e55760e060020a600035046304029f2381146100ed5780630a1273621461015f57806317c1dd87146102335780631f9ea25d14610271578063266fa0e91461029357806349593f5314610429578063569aa0d8146104fc57806359a4669f14610673578063647a4d5f14610759578063656104f5146108095780636e9febfe1461082b57806370de8c6e1461090d57806371bde852146109ed5780638f30435d14610ab4578063916dbc1714610da35780639f5a7cd414610eef578063c91540f614610fe6578063eae99e1c146110b5578063fedc2a281461115a575b61122d610002565b61122d6004808035906020019082018035906020019191908080601f01602080910402602001604051908101604052809392919081815260200183838082843750949650509335935050604435915050606435600154600090600160a060020a03908116339091161461233357610002565b61122f6004808035906020019082018035906020019191908080601f016020809104026020016040519081016040528093929190818152602001838380828437509496505093359350506044359150506064355b60006000600060005086604051808280519060200190808383829060006004602084601f0104600f02600301f1509050019150509081526020016040518091039020600050905042816005016000508560ff1660028110156100025760040201835060010154604060020a90046001604060020a0316116115df576115d6565b6112416004355b604080516001604060020a038316408152606060020a33600160a060020a031602602082015290519081900360340190205b919050565b61122d600435600254600160a060020a0390811633909116146128e357610002565b61125e6004808035906020019082018035906020019191908080601f01602080910402602001604051908101604052809392919081815260200183838082843750949650509335935050505060006000600060006000600060005087604051808280519060200190808383829060006004602084601f0104600f02600301f1509050019150509081526020016040518091039020600050905080600001600050600087600160a060020a0316815260200190815260200160002060005060000160059054906101000a90046001604060020a03169450845080600001600050600087600160a060020a03168152602001908152602001600020600050600001600d9054906101000a90046001604060020a03169350835080600001600050600087600160a060020a0316815260200190815260200160002060005060000160009054906101000a900460ff169250825080600001600050600087600160a060020a0316815260200190815260200160002060005060000160019054906101000a900463ffffffff16915081505092959194509250565b61122d6004808035906020019082018035906020019191908080601f01602080910402602001604051908101604052809392919081815260200183838082843750949650509335935050604435915050606435608435600060006000600060005088604051808280519060200190808383829060006004602084601f0104600f02600301f15090500191505090815260200160405180910390206000509250346000141515611c0e5760405133600160a060020a0316908290349082818181858883f193505050501515611c1a57610002565b6112996004808035906020019082018035906020019191908080601f01602080910402602001604051908101604052809392919081815260200183838082843750949650509335935050604435915050600060006000600060006000600060006000508a604051808280519060200190808383829060006004602084601f0104600f02600301f15090500191505090815260200160405180910390206000509050806001016000508960ff16600281101561000257600160a060020a038a168452828101600101602052604084205463ffffffff1698506002811015610002576040842054606060020a90046001604060020a031697506002811015610002576040842054640100000000900463ffffffff169650600281101561000257604084206001015495506002811015610002576040842054604060020a900463ffffffff169450600281101561000257505060409091205495999498509296509094509260a060020a90046001604060020a0316919050565b61122d6004808035906020019082018035906020019191908080601f016020809104026020016040519081016040528093929190818152602001838380828437509496505050505050506000600060005082604051808280519060200190808383829060006004602084601f0104600f02600301f15090500191505090815260200160405180910390206000509050348160050160005082600d0160009054906101000a900460ff1660ff16600281101561000257600402830160070180546001608060020a0381169093016001608060020a03199390931692909217909155505b5050565b6112e26004808035906020019082018035906020019191908080601f01602080910003423423094734987103498712093847102938740192387401349857109487501938475"
269				}
270			]
271		}"#;
272
273        let mut buf = BytesMut::with_capacity(65536);
274        buf.put_slice(request.as_bytes());
275
276        let mut codec = StreamCodec::stream_incoming();
277
278        let parsed_request = codec
279            .decode(&mut buf)
280            .expect("There should be no error in huge test")
281            .expect("There should be at least one request huge test");
282        assert_eq!(request, parsed_request);
283    }
284
285    #[test]
286    fn simple_line_codec() {
287        let mut buf = BytesMut::with_capacity(2048);
288        buf.put_slice(b"{ test: 1 }\n{ test: 2 }\n{ test: 3 }");
289
290        let mut codec = StreamCodec::default();
291
292        let request = codec
293            .decode(&mut buf)
294            .expect("There should be no error in simple test")
295            .expect("There should be at least one request in simple test");
296        let request2 = codec
297            .decode(&mut buf)
298            .expect("There should be no error in simple test")
299            .expect("There should be at least one request in simple test");
300
301        assert_eq!(request, "{ test: 1 }");
302        assert_eq!(request2, "{ test: 2 }");
303    }
304
305    #[test]
306    fn serde_json_accepts_whitespace_wrapped_json() {
307        let json = "   { \"key\": \"value\" }   ";
308
309        #[derive(serde::Deserialize, Debug, PartialEq)]
310        struct Obj {
311            key: String,
312        }
313
314        let parsed: Result<Obj, _> = serde_json::from_str(json);
315        assert!(parsed.is_ok(), "serde_json should accept whitespace-wrapped JSON");
316        assert_eq!(parsed.unwrap(), Obj { key: "value".into() });
317    }
318}