reth_nippy_jar/
cursor.rs

1use crate::{
2    compression::{Compression, Compressors, Zstd},
3    DataReader, NippyJar, NippyJarError, NippyJarHeader, RefRow,
4};
5use std::{ops::Range, sync::Arc};
6use zstd::bulk::Decompressor;
7
8/// Simple cursor implementation to retrieve data from [`NippyJar`].
9#[derive(Clone)]
10pub struct NippyJarCursor<'a, H = ()> {
11    /// [`NippyJar`] which holds most of the required configuration to read from the file.
12    jar: &'a NippyJar<H>,
13    /// Data and offset reader.
14    reader: Arc<DataReader>,
15    /// Internal buffer to unload data to without reallocating memory on each retrieval.
16    internal_buffer: Vec<u8>,
17    /// Cursor row position.
18    row: u64,
19}
20
21impl<H: NippyJarHeader> std::fmt::Debug for NippyJarCursor<'_, H> {
22    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23        f.debug_struct("NippyJarCursor").field("config", &self.jar).finish_non_exhaustive()
24    }
25}
26
27impl<'a, H: NippyJarHeader> NippyJarCursor<'a, H> {
28    /// Creates a new instance of [`NippyJarCursor`] for the given [`NippyJar`].
29    pub fn new(jar: &'a NippyJar<H>) -> Result<Self, NippyJarError> {
30        let max_row_size = jar.max_row_size;
31        Ok(Self {
32            jar,
33            reader: Arc::new(jar.open_data_reader()?),
34            // Makes sure that we have enough buffer capacity to decompress any row of data.
35            internal_buffer: Vec::with_capacity(max_row_size),
36            row: 0,
37        })
38    }
39
40    /// Creates a new instance of [`NippyJarCursor`] with the specified [`NippyJar`] and data
41    /// reader.
42    pub fn with_reader(
43        jar: &'a NippyJar<H>,
44        reader: Arc<DataReader>,
45    ) -> Result<Self, NippyJarError> {
46        let max_row_size = jar.max_row_size;
47        Ok(Self {
48            jar,
49            reader,
50            // Makes sure that we have enough buffer capacity to decompress any row of data.
51            internal_buffer: Vec::with_capacity(max_row_size),
52            row: 0,
53        })
54    }
55
56    /// Returns a reference to the related [`NippyJar`]
57    pub const fn jar(&self) -> &NippyJar<H> {
58        self.jar
59    }
60
61    /// Returns current row index of the cursor
62    pub const fn row_index(&self) -> u64 {
63        self.row
64    }
65
66    /// Resets cursor to the beginning.
67    pub const fn reset(&mut self) {
68        self.row = 0;
69    }
70
71    /// Returns a row by its number.
72    pub fn row_by_number(&mut self, row: usize) -> Result<Option<RefRow<'_>>, NippyJarError> {
73        self.row = row as u64;
74        self.next_row()
75    }
76
77    /// Returns the current value and advances the row.
78    pub fn next_row(&mut self) -> Result<Option<RefRow<'_>>, NippyJarError> {
79        self.internal_buffer.clear();
80
81        if self.row as usize >= self.jar.rows {
82            // Has reached the end
83            return Ok(None)
84        }
85
86        let mut row = Vec::with_capacity(self.jar.columns);
87
88        // Retrieve all column values from the row
89        for column in 0..self.jar.columns {
90            self.read_value(column, &mut row)?;
91        }
92
93        self.row += 1;
94
95        Ok(Some(
96            row.into_iter()
97                .map(|v| match v {
98                    ValueRange::Mmap(range) => self.reader.data(range),
99                    ValueRange::Internal(range) => &self.internal_buffer[range],
100                })
101                .collect(),
102        ))
103    }
104
105    /// Returns a row by its number by using a `mask` to only read certain columns from the row.
106    pub fn row_by_number_with_cols(
107        &mut self,
108        row: usize,
109        mask: usize,
110    ) -> Result<Option<RefRow<'_>>, NippyJarError> {
111        self.row = row as u64;
112        self.next_row_with_cols(mask)
113    }
114
115    /// Returns the current value and advances the row.
116    ///
117    /// Uses a `mask` to only read certain columns from the row.
118    pub fn next_row_with_cols(&mut self, mask: usize) -> Result<Option<RefRow<'_>>, NippyJarError> {
119        self.internal_buffer.clear();
120
121        if self.row as usize >= self.jar.rows {
122            // Has reached the end
123            return Ok(None)
124        }
125
126        let columns = self.jar.columns;
127        let mut row = Vec::with_capacity(columns);
128
129        for column in 0..columns {
130            if mask & (1 << column) != 0 {
131                self.read_value(column, &mut row)?
132            }
133        }
134        self.row += 1;
135
136        Ok(Some(
137            row.into_iter()
138                .map(|v| match v {
139                    ValueRange::Mmap(range) => self.reader.data(range),
140                    ValueRange::Internal(range) => &self.internal_buffer[range],
141                })
142                .collect(),
143        ))
144    }
145
146    /// Takes the column index and reads the range value for the corresponding column.
147    fn read_value(
148        &mut self,
149        column: usize,
150        row: &mut Vec<ValueRange>,
151    ) -> Result<(), NippyJarError> {
152        // Find out the offset of the column value
153        let offset_pos = self.row as usize * self.jar.columns + column;
154        let value_offset = self.reader.offset(offset_pos)? as usize;
155
156        let column_offset_range = if self.jar.rows * self.jar.columns == offset_pos + 1 {
157            // It's the last column of the last row
158            value_offset..self.reader.size()
159        } else {
160            let next_value_offset = self.reader.offset(offset_pos + 1)? as usize;
161            value_offset..next_value_offset
162        };
163
164        if let Some(compression) = self.jar.compressor() {
165            let from = self.internal_buffer.len();
166            match compression {
167                Compressors::Zstd(z) if z.use_dict => {
168                    // If we are here, then for sure we have the necessary dictionaries and they're
169                    // loaded (happens during deserialization). Otherwise, there's an issue
170                    // somewhere else and we can't recover here anyway.
171                    let dictionaries = z.dictionaries.as_ref().expect("dictionaries to exist")
172                        [column]
173                        .loaded()
174                        .expect("dictionary to be loaded");
175                    let mut decompressor = Decompressor::with_prepared_dictionary(dictionaries)?;
176                    Zstd::decompress_with_dictionary(
177                        self.reader.data(column_offset_range),
178                        &mut self.internal_buffer,
179                        &mut decompressor,
180                    )?;
181                }
182                _ => {
183                    // Uses the chosen default decompressor
184                    compression.decompress_to(
185                        self.reader.data(column_offset_range),
186                        &mut self.internal_buffer,
187                    )?;
188                }
189            }
190            let to = self.internal_buffer.len();
191
192            row.push(ValueRange::Internal(from..to));
193        } else {
194            // Not compressed
195            row.push(ValueRange::Mmap(column_offset_range));
196        }
197
198        Ok(())
199    }
200}
201
202/// Helper type that stores the range of the decompressed column value either on a `mmap` slice or
203/// on the internal buffer.
204enum ValueRange {
205    Mmap(Range<usize>),
206    Internal(Range<usize>),
207}