polars_arrow/io/ipc/write/
schema.rs

1use arrow_format::ipc::planus::Builder;
2
3use super::super::IpcField;
4use crate::datatypes::{
5    ArrowDataType, ArrowSchema, Field, IntegerType, IntervalUnit, Metadata, TimeUnit, UnionMode,
6};
7use crate::io::ipc::endianness::is_native_little_endian;
8
9/// Converts a [ArrowSchema] and [IpcField]s to a flatbuffers-encoded [arrow_format::ipc::Message].
10pub fn schema_to_bytes(
11    schema: &ArrowSchema,
12    ipc_fields: &[IpcField],
13    custom_metadata: Option<&Metadata>,
14) -> Vec<u8> {
15    let schema = serialize_schema(schema, ipc_fields, custom_metadata);
16
17    let message = arrow_format::ipc::Message {
18        version: arrow_format::ipc::MetadataVersion::V5,
19        header: Some(arrow_format::ipc::MessageHeader::Schema(Box::new(schema))),
20        body_length: 0,
21        custom_metadata: None, // todo: allow writing custom metadata
22    };
23    let mut builder = Builder::new();
24    let footer_data = builder.finish(&message, None);
25    footer_data.to_vec()
26}
27
28pub fn serialize_schema(
29    schema: &ArrowSchema,
30    ipc_fields: &[IpcField],
31    custom_schema_metadata: Option<&Metadata>,
32) -> arrow_format::ipc::Schema {
33    let endianness = if is_native_little_endian() {
34        arrow_format::ipc::Endianness::Little
35    } else {
36        arrow_format::ipc::Endianness::Big
37    };
38
39    let fields = schema
40        .iter_values()
41        .zip(ipc_fields.iter())
42        .map(|(field, ipc_field)| serialize_field(field, ipc_field))
43        .collect::<Vec<_>>();
44
45    let custom_metadata = custom_schema_metadata.and_then(|custom_meta| {
46        let as_kv = custom_meta
47            .iter()
48            .map(|(key, val)| key_value(key.clone().into_string(), val.clone().into_string()))
49            .collect::<Vec<_>>();
50        (!as_kv.is_empty()).then_some(as_kv)
51    });
52
53    arrow_format::ipc::Schema {
54        endianness,
55        fields: Some(fields),
56        custom_metadata,
57        features: None, // todo add this one
58    }
59}
60
61fn key_value(key: impl Into<String>, val: impl Into<String>) -> arrow_format::ipc::KeyValue {
62    arrow_format::ipc::KeyValue {
63        key: Some(key.into()),
64        value: Some(val.into()),
65    }
66}
67
68fn write_metadata(metadata: &Metadata, kv_vec: &mut Vec<arrow_format::ipc::KeyValue>) {
69    for (k, v) in metadata {
70        if k.as_str() != "ARROW:extension:name" && k.as_str() != "ARROW:extension:metadata" {
71            kv_vec.push(key_value(k.clone().into_string(), v.clone().into_string()));
72        }
73    }
74}
75
76fn write_extension(
77    name: &str,
78    metadata: Option<&str>,
79    kv_vec: &mut Vec<arrow_format::ipc::KeyValue>,
80) {
81    if let Some(metadata) = metadata {
82        kv_vec.push(key_value("ARROW:extension:metadata".to_string(), metadata));
83    }
84
85    kv_vec.push(key_value("ARROW:extension:name".to_string(), name));
86}
87
88/// Create an IPC Field from an Arrow Field
89pub(crate) fn serialize_field(field: &Field, ipc_field: &IpcField) -> arrow_format::ipc::Field {
90    // custom metadata.
91    let mut kv_vec = vec![];
92    if let ArrowDataType::Extension(ext) = field.dtype() {
93        write_extension(
94            &ext.name,
95            ext.metadata.as_ref().map(|x| x.as_str()),
96            &mut kv_vec,
97        );
98    }
99
100    let type_ = serialize_type(field.dtype());
101    let children = serialize_children(field.dtype(), ipc_field);
102
103    let dictionary = if let ArrowDataType::Dictionary(index_type, inner, is_ordered) = field.dtype()
104    {
105        if let ArrowDataType::Extension(ext) = inner.as_ref() {
106            write_extension(
107                ext.name.as_str(),
108                ext.metadata.as_ref().map(|x| x.as_str()),
109                &mut kv_vec,
110            );
111        }
112        Some(serialize_dictionary(
113            index_type,
114            ipc_field
115                .dictionary_id
116                .expect("All Dictionary types have `dict_id`"),
117            *is_ordered,
118        ))
119    } else {
120        None
121    };
122
123    if let Some(metadata) = &field.metadata {
124        write_metadata(metadata, &mut kv_vec);
125    }
126
127    let custom_metadata = if !kv_vec.is_empty() {
128        Some(kv_vec)
129    } else {
130        None
131    };
132
133    arrow_format::ipc::Field {
134        name: Some(field.name.to_string()),
135        nullable: field.is_nullable,
136        type_: Some(type_),
137        dictionary: dictionary.map(Box::new),
138        children: Some(children),
139        custom_metadata,
140    }
141}
142
143fn serialize_time_unit(unit: &TimeUnit) -> arrow_format::ipc::TimeUnit {
144    match unit {
145        TimeUnit::Second => arrow_format::ipc::TimeUnit::Second,
146        TimeUnit::Millisecond => arrow_format::ipc::TimeUnit::Millisecond,
147        TimeUnit::Microsecond => arrow_format::ipc::TimeUnit::Microsecond,
148        TimeUnit::Nanosecond => arrow_format::ipc::TimeUnit::Nanosecond,
149    }
150}
151
152fn serialize_type(dtype: &ArrowDataType) -> arrow_format::ipc::Type {
153    use ArrowDataType::*;
154    use arrow_format::ipc;
155    match dtype {
156        Null => ipc::Type::Null(Box::new(ipc::Null {})),
157        Boolean => ipc::Type::Bool(Box::new(ipc::Bool {})),
158        UInt8 => ipc::Type::Int(Box::new(ipc::Int {
159            bit_width: 8,
160            is_signed: false,
161        })),
162        UInt16 => ipc::Type::Int(Box::new(ipc::Int {
163            bit_width: 16,
164            is_signed: false,
165        })),
166        UInt32 => ipc::Type::Int(Box::new(ipc::Int {
167            bit_width: 32,
168            is_signed: false,
169        })),
170        UInt64 => ipc::Type::Int(Box::new(ipc::Int {
171            bit_width: 64,
172            is_signed: false,
173        })),
174        Int8 => ipc::Type::Int(Box::new(ipc::Int {
175            bit_width: 8,
176            is_signed: true,
177        })),
178        Int16 => ipc::Type::Int(Box::new(ipc::Int {
179            bit_width: 16,
180            is_signed: true,
181        })),
182        Int32 => ipc::Type::Int(Box::new(ipc::Int {
183            bit_width: 32,
184            is_signed: true,
185        })),
186        Int64 => ipc::Type::Int(Box::new(ipc::Int {
187            bit_width: 64,
188            is_signed: true,
189        })),
190        Int128 => ipc::Type::Int(Box::new(ipc::Int {
191            bit_width: 128,
192            is_signed: true,
193        })),
194        Float16 => ipc::Type::FloatingPoint(Box::new(ipc::FloatingPoint {
195            precision: ipc::Precision::Half,
196        })),
197        Float32 => ipc::Type::FloatingPoint(Box::new(ipc::FloatingPoint {
198            precision: ipc::Precision::Single,
199        })),
200        Float64 => ipc::Type::FloatingPoint(Box::new(ipc::FloatingPoint {
201            precision: ipc::Precision::Double,
202        })),
203        Decimal(precision, scale) => ipc::Type::Decimal(Box::new(ipc::Decimal {
204            precision: *precision as i32,
205            scale: *scale as i32,
206            bit_width: 128,
207        })),
208        Decimal32(precision, scale) => ipc::Type::Decimal(Box::new(ipc::Decimal {
209            precision: *precision as i32,
210            scale: *scale as i32,
211            bit_width: 32,
212        })),
213        Decimal64(precision, scale) => ipc::Type::Decimal(Box::new(ipc::Decimal {
214            precision: *precision as i32,
215            scale: *scale as i32,
216            bit_width: 64,
217        })),
218        Decimal256(precision, scale) => ipc::Type::Decimal(Box::new(ipc::Decimal {
219            precision: *precision as i32,
220            scale: *scale as i32,
221            bit_width: 256,
222        })),
223        Binary => ipc::Type::Binary(Box::new(ipc::Binary {})),
224        LargeBinary => ipc::Type::LargeBinary(Box::new(ipc::LargeBinary {})),
225        Utf8 => ipc::Type::Utf8(Box::new(ipc::Utf8 {})),
226        LargeUtf8 => ipc::Type::LargeUtf8(Box::new(ipc::LargeUtf8 {})),
227        FixedSizeBinary(size) => ipc::Type::FixedSizeBinary(Box::new(ipc::FixedSizeBinary {
228            byte_width: *size as i32,
229        })),
230        Date32 => ipc::Type::Date(Box::new(ipc::Date {
231            unit: ipc::DateUnit::Day,
232        })),
233        Date64 => ipc::Type::Date(Box::new(ipc::Date {
234            unit: ipc::DateUnit::Millisecond,
235        })),
236        Duration(unit) => ipc::Type::Duration(Box::new(ipc::Duration {
237            unit: serialize_time_unit(unit),
238        })),
239        Time32(unit) => ipc::Type::Time(Box::new(ipc::Time {
240            unit: serialize_time_unit(unit),
241            bit_width: 32,
242        })),
243        Time64(unit) => ipc::Type::Time(Box::new(ipc::Time {
244            unit: serialize_time_unit(unit),
245            bit_width: 64,
246        })),
247        Timestamp(unit, tz) => ipc::Type::Timestamp(Box::new(ipc::Timestamp {
248            unit: serialize_time_unit(unit),
249            timezone: tz.as_ref().map(|x| x.to_string()),
250        })),
251        Interval(unit) => ipc::Type::Interval(Box::new(ipc::Interval {
252            unit: match unit {
253                IntervalUnit::YearMonth => ipc::IntervalUnit::YearMonth,
254                IntervalUnit::DayTime => ipc::IntervalUnit::DayTime,
255                IntervalUnit::MonthDayNano => ipc::IntervalUnit::MonthDayNano,
256            },
257        })),
258        List(_) => ipc::Type::List(Box::new(ipc::List {})),
259        LargeList(_) => ipc::Type::LargeList(Box::new(ipc::LargeList {})),
260        FixedSizeList(_, size) => ipc::Type::FixedSizeList(Box::new(ipc::FixedSizeList {
261            list_size: *size as i32,
262        })),
263        Union(u) => ipc::Type::Union(Box::new(ipc::Union {
264            mode: match u.mode {
265                UnionMode::Dense => ipc::UnionMode::Dense,
266                UnionMode::Sparse => ipc::UnionMode::Sparse,
267            },
268            type_ids: u.ids.clone(),
269        })),
270        Map(_, keys_sorted) => ipc::Type::Map(Box::new(ipc::Map {
271            keys_sorted: *keys_sorted,
272        })),
273        Struct(_) => ipc::Type::Struct(Box::new(ipc::Struct {})),
274        Dictionary(_, v, _) => serialize_type(v),
275        Extension(ext) => serialize_type(&ext.inner),
276        Utf8View => ipc::Type::Utf8View(Box::new(ipc::Utf8View {})),
277        BinaryView => ipc::Type::BinaryView(Box::new(ipc::BinaryView {})),
278        Unknown => unimplemented!(),
279    }
280}
281
282fn serialize_children(
283    dtype: &ArrowDataType,
284    ipc_field: &IpcField,
285) -> Vec<arrow_format::ipc::Field> {
286    use ArrowDataType::*;
287    match dtype {
288        Null
289        | Boolean
290        | Int8
291        | Int16
292        | Int32
293        | Int64
294        | UInt8
295        | UInt16
296        | UInt32
297        | UInt64
298        | Int128
299        | Float16
300        | Float32
301        | Float64
302        | Timestamp(_, _)
303        | Date32
304        | Date64
305        | Time32(_)
306        | Time64(_)
307        | Duration(_)
308        | Interval(_)
309        | Binary
310        | FixedSizeBinary(_)
311        | LargeBinary
312        | Utf8
313        | LargeUtf8
314        | Utf8View
315        | BinaryView
316        | Decimal(_, _)
317        | Decimal32(_, _)
318        | Decimal64(_, _)
319        | Decimal256(_, _) => vec![],
320        FixedSizeList(inner, _) | LargeList(inner) | List(inner) | Map(inner, _) => {
321            vec![serialize_field(inner, &ipc_field.fields[0])]
322        },
323        Struct(fields) => fields
324            .iter()
325            .zip(ipc_field.fields.iter())
326            .map(|(field, ipc)| serialize_field(field, ipc))
327            .collect(),
328        Union(u) => u
329            .fields
330            .iter()
331            .zip(ipc_field.fields.iter())
332            .map(|(field, ipc)| serialize_field(field, ipc))
333            .collect(),
334        Dictionary(_, inner, _) => serialize_children(inner, ipc_field),
335        Extension(ext) => serialize_children(&ext.inner, ipc_field),
336        Unknown => unimplemented!(),
337    }
338}
339
340/// Create an IPC dictionary encoding
341pub(crate) fn serialize_dictionary(
342    index_type: &IntegerType,
343    dict_id: i64,
344    dict_is_ordered: bool,
345) -> arrow_format::ipc::DictionaryEncoding {
346    use IntegerType::*;
347    let is_signed = match index_type {
348        Int8 | Int16 | Int32 | Int64 | Int128 => true,
349        UInt8 | UInt16 | UInt32 | UInt64 => false,
350    };
351
352    let bit_width = match index_type {
353        Int8 | UInt8 => 8,
354        Int16 | UInt16 => 16,
355        Int32 | UInt32 => 32,
356        Int64 | UInt64 => 64,
357        Int128 => 128,
358    };
359
360    let index_type = arrow_format::ipc::Int {
361        bit_width,
362        is_signed,
363    };
364
365    arrow_format::ipc::DictionaryEncoding {
366        id: dict_id,
367        index_type: Some(Box::new(index_type)),
368        is_ordered: dict_is_ordered,
369        dictionary_kind: arrow_format::ipc::DictionaryKind::DenseArray,
370    }
371}