1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
use crate::{
    compression::{Compression, Compressors, Zstd},
    DataReader, NippyJar, NippyJarError, NippyJarHeader, RefRow,
};
use std::{ops::Range, sync::Arc};
use zstd::bulk::Decompressor;

/// Simple cursor implementation to retrieve data from [`NippyJar`].
#[derive(Clone)]
pub struct NippyJarCursor<'a, H = ()> {
    /// [`NippyJar`] which holds most of the required configuration to read from the file.
    jar: &'a NippyJar<H>,
    /// Data and offset reader.
    reader: Arc<DataReader>,
    /// Internal buffer to unload data to without reallocating memory on each retrieval.
    internal_buffer: Vec<u8>,
    /// Cursor row position.
    row: u64,
}

impl<'a, H: NippyJarHeader> std::fmt::Debug for NippyJarCursor<'a, H> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("NippyJarCursor").field("config", &self.jar).finish_non_exhaustive()
    }
}

impl<'a, H: NippyJarHeader> NippyJarCursor<'a, H> {
    pub fn new(jar: &'a NippyJar<H>) -> Result<Self, NippyJarError> {
        let max_row_size = jar.max_row_size;
        Ok(NippyJarCursor {
            jar,
            reader: Arc::new(jar.open_data_reader()?),
            // Makes sure that we have enough buffer capacity to decompress any row of data.
            internal_buffer: Vec::with_capacity(max_row_size),
            row: 0,
        })
    }

    pub fn with_reader(
        jar: &'a NippyJar<H>,
        reader: Arc<DataReader>,
    ) -> Result<Self, NippyJarError> {
        let max_row_size = jar.max_row_size;
        Ok(NippyJarCursor {
            jar,
            reader,
            // Makes sure that we have enough buffer capacity to decompress any row of data.
            internal_buffer: Vec::with_capacity(max_row_size),
            row: 0,
        })
    }

    /// Returns a reference to the related [`NippyJar`]
    pub const fn jar(&self) -> &NippyJar<H> {
        self.jar
    }

    /// Returns current row index of the cursor
    pub const fn row_index(&self) -> u64 {
        self.row
    }

    /// Resets cursor to the beginning.
    pub fn reset(&mut self) {
        self.row = 0;
    }

    /// Returns a row by its number.
    pub fn row_by_number(&mut self, row: usize) -> Result<Option<RefRow<'_>>, NippyJarError> {
        self.row = row as u64;
        self.next_row()
    }

    /// Returns the current value and advances the row.
    pub fn next_row(&mut self) -> Result<Option<RefRow<'_>>, NippyJarError> {
        self.internal_buffer.clear();

        if self.row as usize >= self.jar.rows {
            // Has reached the end
            return Ok(None)
        }

        let mut row = Vec::with_capacity(self.jar.columns);

        // Retrieve all column values from the row
        for column in 0..self.jar.columns {
            self.read_value(column, &mut row)?;
        }

        self.row += 1;

        Ok(Some(
            row.into_iter()
                .map(|v| match v {
                    ValueRange::Mmap(range) => self.reader.data(range),
                    ValueRange::Internal(range) => &self.internal_buffer[range],
                })
                .collect(),
        ))
    }

    /// Returns a row by its number by using a `mask` to only read certain columns from the row.
    pub fn row_by_number_with_cols(
        &mut self,
        row: usize,
        mask: usize,
    ) -> Result<Option<RefRow<'_>>, NippyJarError> {
        self.row = row as u64;
        self.next_row_with_cols(mask)
    }

    /// Returns the current value and advances the row.
    ///
    /// Uses a `mask` to only read certain columns from the row.
    pub fn next_row_with_cols(&mut self, mask: usize) -> Result<Option<RefRow<'_>>, NippyJarError> {
        self.internal_buffer.clear();

        if self.row as usize >= self.jar.rows {
            // Has reached the end
            return Ok(None)
        }

        let columns = self.jar.columns;
        let mut row = Vec::with_capacity(columns);

        for column in 0..columns {
            if mask & (1 << column) != 0 {
                self.read_value(column, &mut row)?
            }
        }
        self.row += 1;

        Ok(Some(
            row.into_iter()
                .map(|v| match v {
                    ValueRange::Mmap(range) => self.reader.data(range),
                    ValueRange::Internal(range) => &self.internal_buffer[range],
                })
                .collect(),
        ))
    }

    /// Takes the column index and reads the range value for the corresponding column.
    fn read_value(
        &mut self,
        column: usize,
        row: &mut Vec<ValueRange>,
    ) -> Result<(), NippyJarError> {
        // Find out the offset of the column value
        let offset_pos = self.row as usize * self.jar.columns + column;
        let value_offset = self.reader.offset(offset_pos)? as usize;

        let column_offset_range = if self.jar.rows * self.jar.columns == offset_pos + 1 {
            // It's the last column of the last row
            value_offset..self.reader.size()
        } else {
            let next_value_offset = self.reader.offset(offset_pos + 1)? as usize;
            value_offset..next_value_offset
        };

        if let Some(compression) = self.jar.compressor() {
            let from = self.internal_buffer.len();
            match compression {
                Compressors::Zstd(z) if z.use_dict => {
                    // If we are here, then for sure we have the necessary dictionaries and they're
                    // loaded (happens during deserialization). Otherwise, there's an issue
                    // somewhere else and we can't recover here anyway.
                    let dictionaries = z.dictionaries.as_ref().expect("dictionaries to exist")
                        [column]
                        .loaded()
                        .expect("dictionary to be loaded");
                    let mut decompressor = Decompressor::with_prepared_dictionary(dictionaries)?;
                    Zstd::decompress_with_dictionary(
                        self.reader.data(column_offset_range),
                        &mut self.internal_buffer,
                        &mut decompressor,
                    )?;
                }
                _ => {
                    // Uses the chosen default decompressor
                    compression.decompress_to(
                        self.reader.data(column_offset_range),
                        &mut self.internal_buffer,
                    )?;
                }
            }
            let to = self.internal_buffer.len();

            row.push(ValueRange::Internal(from..to));
        } else {
            // Not compressed
            row.push(ValueRange::Mmap(column_offset_range));
        }

        Ok(())
    }
}

/// Helper type that stores the range of the decompressed column value either on a `mmap` slice or
/// on the internal buffer.
enum ValueRange {
    Mmap(Range<usize>),
    Internal(Range<usize>),
}