1#![allow(unsafe_op_in_unsafe_fn)]
2use std::collections::VecDeque;
4use std::sync::Arc;
5
6mod array;
7
8use arrow_format::ipc::planus::ReadAsRoot;
9use arrow_format::ipc::{Block, DictionaryBatchRef, MessageRef, RecordBatchRef};
10use polars_error::{PolarsResult, polars_bail, polars_err, to_compute_err};
11use polars_utils::pl_str::PlSmallStr;
12
13use crate::array::Array;
14use crate::datatypes::{ArrowDataType, ArrowSchema, Field};
15use crate::io::ipc::read::file::{get_dictionary_batch, get_record_batch};
16use crate::io::ipc::read::{
17 Dictionaries, FileMetadata, IpcBuffer, Node, OutOfSpecKind, first_dict_field,
18};
19use crate::io::ipc::{CONTINUATION_MARKER, IpcField};
20use crate::record_batch::RecordBatchT;
21
22fn read_message(
23 mut bytes: &[u8],
24 block: arrow_format::ipc::Block,
25) -> PolarsResult<(MessageRef<'_>, usize)> {
26 let offset: usize = block.offset.try_into().map_err(
27 |_err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::NegativeFooterLength),
28 )?;
29
30 let block_length: usize = block.meta_data_length.try_into().map_err(
31 |_err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::NegativeFooterLength),
32 )?;
33
34 bytes = &bytes[offset..];
35 let mut message_length = bytes[..4].try_into().unwrap();
36 bytes = &bytes[4..];
37
38 if message_length == CONTINUATION_MARKER {
39 message_length = bytes[..4].try_into().unwrap();
41 bytes = &bytes[4..];
42 };
43
44 let message_length: usize = i32::from_le_bytes(message_length).try_into().map_err(
45 |_err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::NegativeFooterLength),
46 )?;
47
48 let message = arrow_format::ipc::MessageRef::read_as_root(&bytes[..message_length])
49 .map_err(|err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::InvalidFlatbufferMessage(err)))?;
50
51 Ok((message, offset + block_length))
52}
53
54fn get_buffers_nodes(batch: RecordBatchRef) -> PolarsResult<(VecDeque<IpcBuffer>, VecDeque<Node>)> {
55 let compression = batch.compression().map_err(to_compute_err)?;
56 if compression.is_some() {
57 polars_bail!(ComputeError: "memory_map can only be done on uncompressed IPC files")
58 }
59
60 let buffers = batch
61 .buffers()
62 .map_err(|err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::InvalidFlatbufferBuffers(err)))?
63 .ok_or_else(|| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::MissingMessageBuffers))?;
64 let buffers = buffers.iter().collect::<VecDeque<_>>();
65
66 let field_nodes = batch
67 .nodes()
68 .map_err(|err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::InvalidFlatbufferNodes(err)))?
69 .ok_or_else(|| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::MissingMessageNodes))?;
70 let field_nodes = field_nodes.iter().collect::<VecDeque<_>>();
71
72 Ok((buffers, field_nodes))
73}
74
75pub(crate) unsafe fn mmap_record<T: AsRef<[u8]>>(
76 fields: &ArrowSchema,
77 ipc_fields: &[IpcField],
78 data: Arc<T>,
79 batch: RecordBatchRef,
80 offset: usize,
81 dictionaries: &Dictionaries,
82) -> PolarsResult<RecordBatchT<Box<dyn Array>>> {
83 let (mut buffers, mut field_nodes) = get_buffers_nodes(batch)?;
84 let mut variadic_buffer_counts = batch
85 .variadic_buffer_counts()
86 .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferRecordBatches(err)))?
87 .map(|v| v.iter().map(|v| v as usize).collect::<VecDeque<usize>>())
88 .unwrap_or_else(VecDeque::new);
89
90 let length = batch
91 .length()
92 .map_err(|_| polars_err!(oos = OutOfSpecKind::MissingData))
93 .unwrap()
94 .try_into()
95 .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
96
97 fields
98 .iter_values()
99 .map(|f| &f.dtype)
100 .cloned()
101 .zip(ipc_fields)
102 .map(|(dtype, ipc_field)| {
103 array::mmap(
104 data.clone(),
105 offset,
106 dtype,
107 ipc_field,
108 dictionaries,
109 &mut field_nodes,
110 &mut variadic_buffer_counts,
111 &mut buffers,
112 )
113 })
114 .collect::<PolarsResult<_>>()
115 .and_then(|arr| {
116 RecordBatchT::try_new(
117 length,
118 Arc::new(fields.iter_values().cloned().collect()),
119 arr,
120 )
121 })
122}
123
124pub unsafe fn mmap_unchecked<T: AsRef<[u8]>>(
136 metadata: &FileMetadata,
137 dictionaries: &Dictionaries,
138 data: Arc<T>,
139 chunk: usize,
140) -> PolarsResult<RecordBatchT<Box<dyn Array>>> {
141 let block = metadata.blocks[chunk];
142
143 let (message, offset) = read_message(data.as_ref().as_ref(), block)?;
144 let batch = get_record_batch(message)?;
145 mmap_record(
146 &metadata.schema,
147 &metadata.ipc_schema.fields,
148 data.clone(),
149 batch,
150 offset,
151 dictionaries,
152 )
153}
154
155unsafe fn mmap_dictionary<T: AsRef<[u8]>>(
156 schema: &ArrowSchema,
157 ipc_fields: &[IpcField],
158 data: Arc<T>,
159 block: Block,
160 dictionaries: &mut Dictionaries,
161) -> PolarsResult<()> {
162 let (message, offset) = read_message(data.as_ref().as_ref(), block)?;
163 let batch = get_dictionary_batch(&message)?;
164 mmap_dictionary_from_batch(schema, ipc_fields, &data, batch, dictionaries, offset)
165}
166
167pub(crate) unsafe fn mmap_dictionary_from_batch<T: AsRef<[u8]>>(
168 schema: &ArrowSchema,
169 ipc_fields: &[IpcField],
170 data: &Arc<T>,
171 batch: DictionaryBatchRef,
172 dictionaries: &mut Dictionaries,
173 offset: usize,
174) -> PolarsResult<()> {
175 let id = batch
176 .id()
177 .map_err(|err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::InvalidFlatbufferId(err)))?;
178 let (first_field, first_ipc_field) = first_dict_field(id, schema, ipc_fields)?;
179
180 let batch = batch
181 .data()
182 .map_err(|err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::InvalidFlatbufferData(err)))?
183 .ok_or_else(|| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::MissingData))?;
184
185 let value_type = if let ArrowDataType::Dictionary(_, value_type, _) =
186 first_field.dtype.to_logical_type()
187 {
188 value_type.as_ref()
189 } else {
190 polars_bail!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::InvalidIdDataType {requested_id: id} )
191 };
192
193 let field = Field::new(PlSmallStr::EMPTY, value_type.clone(), false);
195
196 let chunk = mmap_record(
197 &std::iter::once((field.name.clone(), field)).collect(),
198 std::slice::from_ref(first_ipc_field),
199 data.clone(),
200 batch,
201 offset,
202 dictionaries,
203 )?;
204
205 dictionaries.insert(id, chunk.into_arrays().pop().unwrap());
206
207 Ok(())
208}
209
210pub unsafe fn mmap_dictionaries_unchecked<T: AsRef<[u8]>>(
216 metadata: &FileMetadata,
217 data: Arc<T>,
218) -> PolarsResult<Dictionaries> {
219 mmap_dictionaries_unchecked2(
220 metadata.schema.as_ref(),
221 &metadata.ipc_schema.fields,
222 metadata.dictionaries.as_ref(),
223 data,
224 )
225}
226
227pub(crate) unsafe fn mmap_dictionaries_unchecked2<T: AsRef<[u8]>>(
228 schema: &ArrowSchema,
229 ipc_fields: &[IpcField],
230 dictionaries: Option<&Vec<arrow_format::ipc::Block>>,
231 data: Arc<T>,
232) -> PolarsResult<Dictionaries> {
233 let blocks = if let Some(blocks) = &dictionaries {
234 blocks
235 } else {
236 return Ok(Default::default());
237 };
238
239 let mut dictionaries = Default::default();
240
241 blocks.iter().cloned().try_for_each(|block| {
242 mmap_dictionary(schema, ipc_fields, data.clone(), block, &mut dictionaries)
243 })?;
244 Ok(dictionaries)
245}