polars_arrow/compute/
concatenate.rs

1use std::sync::Arc;
2
3use hashbrown::hash_map::Entry;
4use polars_error::{PolarsResult, polars_bail};
5use polars_utils::aliases::{InitHashMaps, PlHashMap};
6use polars_utils::itertools::Itertools;
7use polars_utils::vec::PushUnchecked;
8
9use crate::array::*;
10use crate::bitmap::{Bitmap, BitmapBuilder};
11use crate::buffer::Buffer;
12use crate::datatypes::PhysicalType;
13use crate::offset::Offsets;
14use crate::types::{NativeType, Offset};
15use crate::with_match_primitive_type_full;
16
17/// Concatenate multiple [`Array`] of the same type into a single [`Array`].
18pub fn concatenate(arrays: &[&dyn Array]) -> PolarsResult<Box<dyn Array>> {
19    if arrays.is_empty() {
20        polars_bail!(InvalidOperation: "concat requires input of at least one array")
21    }
22
23    if arrays
24        .iter()
25        .any(|array| array.dtype() != arrays[0].dtype())
26    {
27        polars_bail!(InvalidOperation: "It is not possible to concatenate arrays of different data types.")
28    }
29
30    concatenate_unchecked(arrays)
31}
32
33fn len_null_count<A: AsRef<dyn Array>>(arrays: &[A]) -> (usize, usize) {
34    let mut len = 0;
35    let mut null_count = 0;
36    for arr in arrays {
37        let arr = arr.as_ref();
38        len += arr.len();
39        null_count += arr.null_count();
40    }
41    (len, null_count)
42}
43
44/// Concatenate the validities of multiple [Array]s into a single Bitmap.
45pub fn concatenate_validities<A: AsRef<dyn Array>>(arrays: &[A]) -> Option<Bitmap> {
46    let (len, null_count) = len_null_count(arrays);
47    concatenate_validities_with_len_null_count(arrays, len, null_count)
48}
49
50fn concatenate_validities_with_len_null_count<A: AsRef<dyn Array>>(
51    arrays: &[A],
52    len: usize,
53    null_count: usize,
54) -> Option<Bitmap> {
55    if null_count == 0 {
56        return None;
57    }
58
59    let mut bitmap = BitmapBuilder::with_capacity(len);
60    for arr in arrays {
61        let arr = arr.as_ref();
62        if arr.null_count() == arr.len() {
63            bitmap.extend_constant(arr.len(), false);
64        } else if arr.null_count() == 0 {
65            bitmap.extend_constant(arr.len(), true);
66        } else {
67            bitmap.extend_from_bitmap(arr.validity().unwrap());
68        }
69    }
70    bitmap.into_opt_validity()
71}
72
73/// Concatenate multiple [`Array`] of the same type into a single [`Array`].
74/// All arrays must be of the same dtype or a panic can occur.
75pub fn concatenate_unchecked<A: AsRef<dyn Array>>(arrays: &[A]) -> PolarsResult<Box<dyn Array>> {
76    if arrays.is_empty() {
77        polars_bail!(InvalidOperation: "concat requires input of at least one array")
78    }
79
80    if arrays.len() == 1 {
81        return Ok(arrays[0].as_ref().to_boxed());
82    }
83
84    use PhysicalType::*;
85    match arrays[0].as_ref().dtype().to_physical_type() {
86        Null => Ok(Box::new(concatenate_null(arrays))),
87        Boolean => Ok(Box::new(concatenate_bool(arrays))),
88        Primitive(ptype) => {
89            with_match_primitive_type_full!(ptype, |$T| {
90                Ok(Box::new(concatenate_primitive::<$T, _>(arrays)))
91            })
92        },
93        Binary => Ok(Box::new(concatenate_binary::<i32, _>(arrays)?)),
94        LargeBinary => Ok(Box::new(concatenate_binary::<i64, _>(arrays)?)),
95        Utf8 => Ok(Box::new(concatenate_utf8::<i32, _>(arrays)?)),
96        LargeUtf8 => Ok(Box::new(concatenate_utf8::<i64, _>(arrays)?)),
97        BinaryView => Ok(Box::new(concatenate_view::<[u8], _>(arrays))),
98        Utf8View => Ok(Box::new(concatenate_view::<str, _>(arrays))),
99        List => Ok(Box::new(concatenate_list::<i32, _>(arrays)?)),
100        LargeList => Ok(Box::new(concatenate_list::<i64, _>(arrays)?)),
101        FixedSizeBinary => Ok(Box::new(concatenate_fixed_size_binary(arrays)?)),
102        FixedSizeList => Ok(Box::new(concatenate_fixed_size_list(arrays)?)),
103        Struct => Ok(Box::new(concatenate_struct(arrays)?)),
104        Union => unimplemented!(),
105        Map => unimplemented!(),
106        Dictionary(_) => unimplemented!(),
107    }
108}
109
110fn concatenate_null<A: AsRef<dyn Array>>(arrays: &[A]) -> NullArray {
111    let dtype = arrays[0].as_ref().dtype().clone();
112    let total_len = arrays.iter().map(|arr| arr.as_ref().len()).sum();
113    NullArray::new(dtype, total_len)
114}
115
116fn concatenate_bool<A: AsRef<dyn Array>>(arrays: &[A]) -> BooleanArray {
117    let dtype = arrays[0].as_ref().dtype().clone();
118    let (total_len, null_count) = len_null_count(arrays);
119    let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
120
121    let mut bitmap = BitmapBuilder::with_capacity(total_len);
122    for arr in arrays {
123        let arr: &BooleanArray = arr.as_ref().as_any().downcast_ref().unwrap();
124        bitmap.extend_from_bitmap(arr.values());
125    }
126    BooleanArray::new(dtype, bitmap.freeze(), validity)
127}
128
129fn concatenate_primitive<T: NativeType, A: AsRef<dyn Array>>(arrays: &[A]) -> PrimitiveArray<T> {
130    let dtype = arrays[0].as_ref().dtype().clone();
131    let (total_len, null_count) = len_null_count(arrays);
132    let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
133
134    let mut out = Vec::with_capacity(total_len);
135    for arr in arrays {
136        let arr: &PrimitiveArray<T> = arr.as_ref().as_any().downcast_ref().unwrap();
137        out.extend_from_slice(arr.values());
138    }
139    unsafe { PrimitiveArray::new_unchecked(dtype, Buffer::from(out), validity) }
140}
141
142fn concatenate_binary<O: Offset, A: AsRef<dyn Array>>(
143    arrays: &[A],
144) -> PolarsResult<BinaryArray<O>> {
145    let dtype = arrays[0].as_ref().dtype().clone();
146    let (total_len, null_count) = len_null_count(arrays);
147    let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
148
149    let total_bytes = arrays
150        .iter()
151        .map(|arr| {
152            let arr: &BinaryArray<O> = arr.as_ref().as_any().downcast_ref().unwrap();
153            arr.get_values_size()
154        })
155        .sum();
156
157    let mut values = Vec::with_capacity(total_bytes);
158    let mut offsets = Offsets::<O>::with_capacity(total_len);
159
160    for arr in arrays {
161        let arr: &BinaryArray<O> = arr.as_ref().as_any().downcast_ref().unwrap();
162        let first_offset = arr.offsets().first().to_usize();
163        let last_offset = arr.offsets().last().to_usize();
164        values.extend_from_slice(&arr.values()[first_offset..last_offset]);
165        for len in arr.offsets().lengths() {
166            offsets.try_push(len)?;
167        }
168    }
169
170    Ok(unsafe { BinaryArray::new(dtype, offsets.into(), values.into(), validity) })
171}
172
173fn concatenate_utf8<O: Offset, A: AsRef<dyn Array>>(arrays: &[A]) -> PolarsResult<Utf8Array<O>> {
174    let dtype = arrays[0].as_ref().dtype().clone();
175    let (total_len, null_count) = len_null_count(arrays);
176    let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
177
178    let total_bytes = arrays
179        .iter()
180        .map(|arr| {
181            let arr: &Utf8Array<O> = arr.as_ref().as_any().downcast_ref().unwrap();
182            arr.get_values_size()
183        })
184        .sum();
185
186    let mut bytes = Vec::with_capacity(total_bytes);
187    let mut offsets = Offsets::<O>::with_capacity(total_len);
188
189    for arr in arrays {
190        let arr: &Utf8Array<O> = arr.as_ref().as_any().downcast_ref().unwrap();
191        let first_offset = arr.offsets().first().to_usize();
192        let last_offset = arr.offsets().last().to_usize();
193        bytes.extend_from_slice(&arr.values()[first_offset..last_offset]);
194        for len in arr.offsets().lengths() {
195            offsets.try_push(len)?;
196        }
197    }
198
199    Ok(unsafe { Utf8Array::new_unchecked(dtype, offsets.into(), bytes.into(), validity) })
200}
201
202fn concatenate_view<V: ViewType + ?Sized, A: AsRef<dyn Array>>(
203    arrays: &[A],
204) -> BinaryViewArrayGeneric<V> {
205    let dtype = arrays[0].as_ref().dtype().clone();
206    let (total_len, null_count) = len_null_count(arrays);
207    if total_len == 0 {
208        return BinaryViewArrayGeneric::new_empty(dtype);
209    }
210    let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
211
212    let first_arr: &BinaryViewArrayGeneric<V> = arrays[0].as_ref().as_any().downcast_ref().unwrap();
213    let mut total_nondedup_buffers = first_arr.data_buffers().len();
214    let mut max_arr_bufferset_len = 0;
215    let mut all_same_bufs = true;
216    for arr in arrays {
217        let arr: &BinaryViewArrayGeneric<V> = arr.as_ref().as_any().downcast_ref().unwrap();
218        max_arr_bufferset_len = max_arr_bufferset_len.max(arr.data_buffers().len());
219        total_nondedup_buffers += arr.data_buffers().len();
220        // Fat pointer equality, checks both start and length.
221        all_same_bufs &= std::ptr::eq(
222            Arc::as_ptr(arr.data_buffers()),
223            Arc::as_ptr(first_arr.data_buffers()),
224        );
225    }
226
227    let mut total_bytes_len = 0;
228    let mut views = Vec::with_capacity(total_len);
229
230    let mut total_buffer_len = 0;
231    let buffers = if all_same_bufs {
232        total_buffer_len = first_arr.total_buffer_len();
233        for arr in arrays {
234            let arr: &BinaryViewArrayGeneric<V> = arr.as_ref().as_any().downcast_ref().unwrap();
235            views.extend_from_slice(arr.views());
236            total_bytes_len += arr.total_bytes_len();
237        }
238        Arc::clone(first_arr.data_buffers())
239
240    // There might be way more buffers than elements, so we only dedup if there
241    // is at least one element per buffer on average.
242    } else if total_len > total_nondedup_buffers {
243        assert!(arrays.len() < u32::MAX as usize);
244
245        let mut dedup_buffers = Vec::with_capacity(total_nondedup_buffers);
246        let mut global_dedup_buffer_idx = PlHashMap::with_capacity(total_nondedup_buffers);
247        let mut local_dedup_buffer_idx = Vec::new();
248        local_dedup_buffer_idx.resize(max_arr_bufferset_len, (0, u32::MAX));
249
250        for (arr_idx, arr) in arrays.iter().enumerate() {
251            let arr: &BinaryViewArrayGeneric<V> = arr.as_ref().as_any().downcast_ref().unwrap();
252
253            unsafe {
254                for mut view in arr.views().iter().copied() {
255                    if view.length > View::MAX_INLINE_SIZE {
256                        // Translate from old array-local buffer idx to global deduped buffer idx.
257                        let (mut new_buffer_idx, cache_tag) =
258                            *local_dedup_buffer_idx.get_unchecked(view.buffer_idx as usize);
259                        if cache_tag != arr_idx as u32 {
260                            // This buffer index wasn't seen before for this array, do a dedup lookup.
261                            let buffer = arr.data_buffers().get_unchecked(view.buffer_idx as usize);
262                            let buf_id = (buffer.as_slice().as_ptr(), buffer.len());
263                            let idx = match global_dedup_buffer_idx.entry(buf_id) {
264                                Entry::Occupied(o) => *o.get(),
265                                Entry::Vacant(v) => {
266                                    let idx = dedup_buffers.len() as u32;
267                                    dedup_buffers.push(buffer.clone());
268                                    total_buffer_len += buffer.len();
269                                    v.insert(idx);
270                                    idx
271                                },
272                            };
273
274                            // Cache result for future lookups.
275                            *local_dedup_buffer_idx.get_unchecked_mut(view.buffer_idx as usize) =
276                                (idx, arr_idx as u32);
277                            new_buffer_idx = idx;
278                        }
279                        view.buffer_idx = new_buffer_idx;
280                    }
281
282                    total_bytes_len += view.length as usize;
283                    views.push_unchecked(view);
284                }
285            }
286        }
287
288        dedup_buffers.into_iter().collect()
289    } else {
290        // Only very few of the total number of buffers is referenced, simply
291        // create a new direct buffer.
292        for arr in arrays {
293            let arr: &BinaryViewArrayGeneric<V> = arr.as_ref().as_any().downcast_ref().unwrap();
294            total_buffer_len += arr
295                .len_iter()
296                .map(|l| if l > 12 { l as usize } else { 0 })
297                .sum::<usize>();
298        }
299
300        let mut unprocessed_buffer_len = total_buffer_len;
301        let mut new_buffers: Vec<Vec<u8>> = vec![Vec::with_capacity(
302            unprocessed_buffer_len.min(u32::MAX as usize),
303        )];
304        for arr in arrays {
305            let arr: &BinaryViewArrayGeneric<V> = arr.as_ref().as_any().downcast_ref().unwrap();
306            let buffers = arr.data_buffers();
307
308            unsafe {
309                for mut view in arr.views().iter().copied() {
310                    total_bytes_len += view.length as usize;
311                    if view.length > 12 {
312                        if new_buffers.last().unwrap_unchecked().len() + view.length as usize
313                            >= u32::MAX as usize
314                        {
315                            new_buffers.push(Vec::with_capacity(
316                                unprocessed_buffer_len.min(u32::MAX as usize),
317                            ));
318                        }
319                        let new_offset = new_buffers.last().unwrap_unchecked().len() as u32;
320                        new_buffers
321                            .last_mut()
322                            .unwrap_unchecked()
323                            .extend_from_slice(view.get_slice_unchecked(buffers));
324                        view.offset = new_offset;
325                        view.buffer_idx = new_buffers.len() as u32 - 1;
326                        unprocessed_buffer_len -= view.length as usize;
327                    }
328                    views.push_unchecked(view);
329                }
330            }
331        }
332
333        new_buffers.into_iter().map(Buffer::from).collect()
334    };
335
336    unsafe {
337        BinaryViewArrayGeneric::new_unchecked(
338            dtype,
339            views.into(),
340            buffers,
341            validity,
342            total_bytes_len,
343            total_buffer_len,
344        )
345    }
346}
347
348fn concatenate_list<O: Offset, A: AsRef<dyn Array>>(arrays: &[A]) -> PolarsResult<ListArray<O>> {
349    let dtype = arrays[0].as_ref().dtype().clone();
350    let (total_len, null_count) = len_null_count(arrays);
351    let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
352
353    let mut num_sliced = 0;
354    let mut offsets = Offsets::<O>::with_capacity(total_len);
355    for arr in arrays {
356        let arr: &ListArray<O> = arr.as_ref().as_any().downcast_ref().unwrap();
357        for len in arr.offsets().lengths() {
358            offsets.try_push(len)?;
359        }
360        let first_offset = arr.offsets().first().to_usize();
361        let offset_range = arr.offsets().range().to_usize();
362        num_sliced += (first_offset != 0 || offset_range != arr.values().len()) as usize;
363    }
364
365    let values = if num_sliced > 0 {
366        let inner_sliced_arrays = arrays
367            .iter()
368            .map(|arr| {
369                let arr: &ListArray<O> = arr.as_ref().as_any().downcast_ref().unwrap();
370                let first_offset = arr.offsets().first().to_usize();
371                let offset_range = arr.offsets().range().to_usize();
372                if first_offset != 0 || offset_range != arr.values().len() {
373                    arr.values().sliced(first_offset, offset_range)
374                } else {
375                    arr.values().to_boxed()
376                }
377            })
378            .collect_vec();
379        concatenate_unchecked(&inner_sliced_arrays[..])?
380    } else {
381        let inner_arrays = arrays
382            .iter()
383            .map(|arr| {
384                let arr: &ListArray<O> = arr.as_ref().as_any().downcast_ref().unwrap();
385                &**arr.values()
386            })
387            .collect_vec();
388        concatenate_unchecked(&inner_arrays)?
389    };
390
391    Ok(ListArray::new(dtype, offsets.into(), values, validity))
392}
393
394fn concatenate_fixed_size_binary<A: AsRef<dyn Array>>(
395    arrays: &[A],
396) -> PolarsResult<FixedSizeBinaryArray> {
397    let dtype = arrays[0].as_ref().dtype().clone();
398    let (total_len, null_count) = len_null_count(arrays);
399    let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
400
401    let total_bytes = arrays
402        .iter()
403        .map(|arr| {
404            let arr: &FixedSizeBinaryArray = arr.as_ref().as_any().downcast_ref().unwrap();
405            arr.values().len()
406        })
407        .sum();
408
409    let mut bytes = Vec::with_capacity(total_bytes);
410    for arr in arrays {
411        let arr: &FixedSizeBinaryArray = arr.as_ref().as_any().downcast_ref().unwrap();
412        bytes.extend_from_slice(arr.values());
413    }
414
415    Ok(FixedSizeBinaryArray::new(dtype, bytes.into(), validity))
416}
417
418fn concatenate_fixed_size_list<A: AsRef<dyn Array>>(
419    arrays: &[A],
420) -> PolarsResult<FixedSizeListArray> {
421    let dtype = arrays[0].as_ref().dtype().clone();
422    let (total_len, null_count) = len_null_count(arrays);
423
424    let inner_arrays = arrays
425        .iter()
426        .map(|arr| {
427            let arr: &FixedSizeListArray = arr.as_ref().as_any().downcast_ref().unwrap();
428            &**arr.values()
429        })
430        .collect_vec();
431    let values = concatenate_unchecked(&inner_arrays)?;
432    let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
433    Ok(FixedSizeListArray::new(dtype, total_len, values, validity))
434}
435
436fn concatenate_struct<A: AsRef<dyn Array>>(arrays: &[A]) -> PolarsResult<StructArray> {
437    let dtype = arrays[0].as_ref().dtype().clone();
438    let (total_len, null_count) = len_null_count(arrays);
439    let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
440
441    let first_arr: &StructArray = arrays[0].as_ref().as_any().downcast_ref().unwrap();
442    let num_fields = first_arr.values().len();
443
444    let mut inner_arrays = Vec::with_capacity(arrays.len());
445    let values = (0..num_fields)
446        .map(|f| {
447            inner_arrays.clear();
448            for arr in arrays {
449                let arr: &StructArray = arr.as_ref().as_any().downcast_ref().unwrap();
450                inner_arrays.push(&arr.values()[f]);
451            }
452            concatenate_unchecked(&inner_arrays)
453        })
454        .try_collect_vec()?;
455
456    Ok(StructArray::new(dtype, total_len, values, validity))
457}