polars_arrow/mmap/
mod.rs

1#![allow(unsafe_op_in_unsafe_fn)]
2//! Memory maps regions defined on the IPC format into [`Array`].
3use std::collections::VecDeque;
4use std::sync::Arc;
5
6mod array;
7
8use arrow_format::ipc::planus::ReadAsRoot;
9use arrow_format::ipc::{Block, DictionaryBatchRef, MessageRef, RecordBatchRef};
10use polars_error::{PolarsResult, polars_bail, polars_err, to_compute_err};
11use polars_utils::pl_str::PlSmallStr;
12
13use crate::array::Array;
14use crate::datatypes::{ArrowDataType, ArrowSchema, Field};
15use crate::io::ipc::read::file::{get_dictionary_batch, get_record_batch};
16use crate::io::ipc::read::{
17    Dictionaries, FileMetadata, IpcBuffer, Node, OutOfSpecKind, first_dict_field,
18};
19use crate::io::ipc::{CONTINUATION_MARKER, IpcField};
20use crate::record_batch::RecordBatchT;
21
22fn read_message(
23    mut bytes: &[u8],
24    block: arrow_format::ipc::Block,
25) -> PolarsResult<(MessageRef<'_>, usize)> {
26    let offset: usize = block.offset.try_into().map_err(
27        |_err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::NegativeFooterLength),
28    )?;
29
30    let block_length: usize = block.meta_data_length.try_into().map_err(
31        |_err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::NegativeFooterLength),
32    )?;
33
34    bytes = &bytes[offset..];
35    let mut message_length = bytes[..4].try_into().unwrap();
36    bytes = &bytes[4..];
37
38    if message_length == CONTINUATION_MARKER {
39        // continuation marker encountered, read message next
40        message_length = bytes[..4].try_into().unwrap();
41        bytes = &bytes[4..];
42    };
43
44    let message_length: usize = i32::from_le_bytes(message_length).try_into().map_err(
45        |_err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::NegativeFooterLength),
46    )?;
47
48    let message = arrow_format::ipc::MessageRef::read_as_root(&bytes[..message_length])
49        .map_err(|err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::InvalidFlatbufferMessage(err)))?;
50
51    Ok((message, offset + block_length))
52}
53
54fn get_buffers_nodes(batch: RecordBatchRef) -> PolarsResult<(VecDeque<IpcBuffer>, VecDeque<Node>)> {
55    let compression = batch.compression().map_err(to_compute_err)?;
56    if compression.is_some() {
57        polars_bail!(ComputeError: "memory_map can only be done on uncompressed IPC files")
58    }
59
60    let buffers = batch
61        .buffers()
62        .map_err(|err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::InvalidFlatbufferBuffers(err)))?
63        .ok_or_else(|| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::MissingMessageBuffers))?;
64    let buffers = buffers.iter().collect::<VecDeque<_>>();
65
66    let field_nodes = batch
67        .nodes()
68        .map_err(|err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::InvalidFlatbufferNodes(err)))?
69        .ok_or_else(|| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::MissingMessageNodes))?;
70    let field_nodes = field_nodes.iter().collect::<VecDeque<_>>();
71
72    Ok((buffers, field_nodes))
73}
74
75pub(crate) unsafe fn mmap_record<T: AsRef<[u8]>>(
76    fields: &ArrowSchema,
77    ipc_fields: &[IpcField],
78    data: Arc<T>,
79    batch: RecordBatchRef,
80    offset: usize,
81    dictionaries: &Dictionaries,
82) -> PolarsResult<RecordBatchT<Box<dyn Array>>> {
83    let (mut buffers, mut field_nodes) = get_buffers_nodes(batch)?;
84    let mut variadic_buffer_counts = batch
85        .variadic_buffer_counts()
86        .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferRecordBatches(err)))?
87        .map(|v| v.iter().map(|v| v as usize).collect::<VecDeque<usize>>())
88        .unwrap_or_else(VecDeque::new);
89
90    let length = batch
91        .length()
92        .map_err(|_| polars_err!(oos = OutOfSpecKind::MissingData))
93        .unwrap()
94        .try_into()
95        .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
96
97    fields
98        .iter_values()
99        .map(|f| &f.dtype)
100        .cloned()
101        .zip(ipc_fields)
102        .map(|(dtype, ipc_field)| {
103            array::mmap(
104                data.clone(),
105                offset,
106                dtype,
107                ipc_field,
108                dictionaries,
109                &mut field_nodes,
110                &mut variadic_buffer_counts,
111                &mut buffers,
112            )
113        })
114        .collect::<PolarsResult<_>>()
115        .and_then(|arr| {
116            RecordBatchT::try_new(
117                length,
118                Arc::new(fields.iter_values().cloned().collect()),
119                arr,
120            )
121        })
122}
123
124/// Memory maps an record batch from an IPC file into a [`RecordBatchT`].
125/// # Errors
126/// This function errors when:
127/// * The IPC file is not valid
128/// * the buffers on the file are un-aligned with their corresponding data. This can happen when:
129///     * the file was written with 8-bit alignment
130///     * the file contains type decimal 128 or 256
131/// # Safety
132/// The caller must ensure that `data` contains a valid buffers, for example:
133/// * Offsets in variable-sized containers must be in-bounds and increasing
134/// * Utf8 data is valid
135pub unsafe fn mmap_unchecked<T: AsRef<[u8]>>(
136    metadata: &FileMetadata,
137    dictionaries: &Dictionaries,
138    data: Arc<T>,
139    chunk: usize,
140) -> PolarsResult<RecordBatchT<Box<dyn Array>>> {
141    let block = metadata.blocks[chunk];
142
143    let (message, offset) = read_message(data.as_ref().as_ref(), block)?;
144    let batch = get_record_batch(message)?;
145    mmap_record(
146        &metadata.schema,
147        &metadata.ipc_schema.fields,
148        data.clone(),
149        batch,
150        offset,
151        dictionaries,
152    )
153}
154
155unsafe fn mmap_dictionary<T: AsRef<[u8]>>(
156    schema: &ArrowSchema,
157    ipc_fields: &[IpcField],
158    data: Arc<T>,
159    block: Block,
160    dictionaries: &mut Dictionaries,
161) -> PolarsResult<()> {
162    let (message, offset) = read_message(data.as_ref().as_ref(), block)?;
163    let batch = get_dictionary_batch(&message)?;
164    mmap_dictionary_from_batch(schema, ipc_fields, &data, batch, dictionaries, offset)
165}
166
167pub(crate) unsafe fn mmap_dictionary_from_batch<T: AsRef<[u8]>>(
168    schema: &ArrowSchema,
169    ipc_fields: &[IpcField],
170    data: &Arc<T>,
171    batch: DictionaryBatchRef,
172    dictionaries: &mut Dictionaries,
173    offset: usize,
174) -> PolarsResult<()> {
175    let id = batch
176        .id()
177        .map_err(|err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::InvalidFlatbufferId(err)))?;
178    let (first_field, first_ipc_field) = first_dict_field(id, schema, ipc_fields)?;
179
180    let batch = batch
181        .data()
182        .map_err(|err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::InvalidFlatbufferData(err)))?
183        .ok_or_else(|| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::MissingData))?;
184
185    let value_type = if let ArrowDataType::Dictionary(_, value_type, _) =
186        first_field.dtype.to_logical_type()
187    {
188        value_type.as_ref()
189    } else {
190        polars_bail!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::InvalidIdDataType {requested_id: id} )
191    };
192
193    // Make a fake schema for the dictionary batch.
194    let field = Field::new(PlSmallStr::EMPTY, value_type.clone(), false);
195
196    let chunk = mmap_record(
197        &std::iter::once((field.name.clone(), field)).collect(),
198        std::slice::from_ref(first_ipc_field),
199        data.clone(),
200        batch,
201        offset,
202        dictionaries,
203    )?;
204
205    dictionaries.insert(id, chunk.into_arrays().pop().unwrap());
206
207    Ok(())
208}
209
210/// Memory maps dictionaries from an IPC file into
211/// # Safety
212/// The caller must ensure that `data` contains a valid buffers, for example:
213/// * Offsets in variable-sized containers must be in-bounds and increasing
214/// * Utf8 data is valid
215pub unsafe fn mmap_dictionaries_unchecked<T: AsRef<[u8]>>(
216    metadata: &FileMetadata,
217    data: Arc<T>,
218) -> PolarsResult<Dictionaries> {
219    mmap_dictionaries_unchecked2(
220        metadata.schema.as_ref(),
221        &metadata.ipc_schema.fields,
222        metadata.dictionaries.as_ref(),
223        data,
224    )
225}
226
227pub(crate) unsafe fn mmap_dictionaries_unchecked2<T: AsRef<[u8]>>(
228    schema: &ArrowSchema,
229    ipc_fields: &[IpcField],
230    dictionaries: Option<&Vec<arrow_format::ipc::Block>>,
231    data: Arc<T>,
232) -> PolarsResult<Dictionaries> {
233    let blocks = if let Some(blocks) = &dictionaries {
234        blocks
235    } else {
236        return Ok(Default::default());
237    };
238
239    let mut dictionaries = Default::default();
240
241    blocks.iter().cloned().try_for_each(|block| {
242        mmap_dictionary(schema, ipc_fields, data.clone(), block, &mut dictionaries)
243    })?;
244    Ok(dictionaries)
245}