1use std::sync::Arc;
2
3use hashbrown::hash_map::Entry;
4use polars_error::{PolarsResult, polars_bail};
5use polars_utils::aliases::{InitHashMaps, PlHashMap};
6use polars_utils::itertools::Itertools;
7use polars_utils::vec::PushUnchecked;
8
9use crate::array::*;
10use crate::bitmap::{Bitmap, BitmapBuilder};
11use crate::buffer::Buffer;
12use crate::datatypes::PhysicalType;
13use crate::offset::Offsets;
14use crate::types::{NativeType, Offset};
15use crate::with_match_primitive_type_full;
16
17pub fn concatenate(arrays: &[&dyn Array]) -> PolarsResult<Box<dyn Array>> {
19 if arrays.is_empty() {
20 polars_bail!(InvalidOperation: "concat requires input of at least one array")
21 }
22
23 if arrays
24 .iter()
25 .any(|array| array.dtype() != arrays[0].dtype())
26 {
27 polars_bail!(InvalidOperation: "It is not possible to concatenate arrays of different data types.")
28 }
29
30 concatenate_unchecked(arrays)
31}
32
33fn len_null_count<A: AsRef<dyn Array>>(arrays: &[A]) -> (usize, usize) {
34 let mut len = 0;
35 let mut null_count = 0;
36 for arr in arrays {
37 let arr = arr.as_ref();
38 len += arr.len();
39 null_count += arr.null_count();
40 }
41 (len, null_count)
42}
43
44pub fn concatenate_validities<A: AsRef<dyn Array>>(arrays: &[A]) -> Option<Bitmap> {
46 let (len, null_count) = len_null_count(arrays);
47 concatenate_validities_with_len_null_count(arrays, len, null_count)
48}
49
50fn concatenate_validities_with_len_null_count<A: AsRef<dyn Array>>(
51 arrays: &[A],
52 len: usize,
53 null_count: usize,
54) -> Option<Bitmap> {
55 if null_count == 0 {
56 return None;
57 }
58
59 let mut bitmap = BitmapBuilder::with_capacity(len);
60 for arr in arrays {
61 let arr = arr.as_ref();
62 if arr.null_count() == arr.len() {
63 bitmap.extend_constant(arr.len(), false);
64 } else if arr.null_count() == 0 {
65 bitmap.extend_constant(arr.len(), true);
66 } else {
67 bitmap.extend_from_bitmap(arr.validity().unwrap());
68 }
69 }
70 bitmap.into_opt_validity()
71}
72
73pub fn concatenate_unchecked<A: AsRef<dyn Array>>(arrays: &[A]) -> PolarsResult<Box<dyn Array>> {
76 if arrays.is_empty() {
77 polars_bail!(InvalidOperation: "concat requires input of at least one array")
78 }
79
80 if arrays.len() == 1 {
81 return Ok(arrays[0].as_ref().to_boxed());
82 }
83
84 use PhysicalType::*;
85 match arrays[0].as_ref().dtype().to_physical_type() {
86 Null => Ok(Box::new(concatenate_null(arrays))),
87 Boolean => Ok(Box::new(concatenate_bool(arrays))),
88 Primitive(ptype) => {
89 with_match_primitive_type_full!(ptype, |$T| {
90 Ok(Box::new(concatenate_primitive::<$T, _>(arrays)))
91 })
92 },
93 Binary => Ok(Box::new(concatenate_binary::<i32, _>(arrays)?)),
94 LargeBinary => Ok(Box::new(concatenate_binary::<i64, _>(arrays)?)),
95 Utf8 => Ok(Box::new(concatenate_utf8::<i32, _>(arrays)?)),
96 LargeUtf8 => Ok(Box::new(concatenate_utf8::<i64, _>(arrays)?)),
97 BinaryView => Ok(Box::new(concatenate_view::<[u8], _>(arrays))),
98 Utf8View => Ok(Box::new(concatenate_view::<str, _>(arrays))),
99 List => Ok(Box::new(concatenate_list::<i32, _>(arrays)?)),
100 LargeList => Ok(Box::new(concatenate_list::<i64, _>(arrays)?)),
101 FixedSizeBinary => Ok(Box::new(concatenate_fixed_size_binary(arrays)?)),
102 FixedSizeList => Ok(Box::new(concatenate_fixed_size_list(arrays)?)),
103 Struct => Ok(Box::new(concatenate_struct(arrays)?)),
104 Union => unimplemented!(),
105 Map => unimplemented!(),
106 Dictionary(_) => unimplemented!(),
107 }
108}
109
110fn concatenate_null<A: AsRef<dyn Array>>(arrays: &[A]) -> NullArray {
111 let dtype = arrays[0].as_ref().dtype().clone();
112 let total_len = arrays.iter().map(|arr| arr.as_ref().len()).sum();
113 NullArray::new(dtype, total_len)
114}
115
116fn concatenate_bool<A: AsRef<dyn Array>>(arrays: &[A]) -> BooleanArray {
117 let dtype = arrays[0].as_ref().dtype().clone();
118 let (total_len, null_count) = len_null_count(arrays);
119 let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
120
121 let mut bitmap = BitmapBuilder::with_capacity(total_len);
122 for arr in arrays {
123 let arr: &BooleanArray = arr.as_ref().as_any().downcast_ref().unwrap();
124 bitmap.extend_from_bitmap(arr.values());
125 }
126 BooleanArray::new(dtype, bitmap.freeze(), validity)
127}
128
129fn concatenate_primitive<T: NativeType, A: AsRef<dyn Array>>(arrays: &[A]) -> PrimitiveArray<T> {
130 let dtype = arrays[0].as_ref().dtype().clone();
131 let (total_len, null_count) = len_null_count(arrays);
132 let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
133
134 let mut out = Vec::with_capacity(total_len);
135 for arr in arrays {
136 let arr: &PrimitiveArray<T> = arr.as_ref().as_any().downcast_ref().unwrap();
137 out.extend_from_slice(arr.values());
138 }
139 unsafe { PrimitiveArray::new_unchecked(dtype, Buffer::from(out), validity) }
140}
141
142fn concatenate_binary<O: Offset, A: AsRef<dyn Array>>(
143 arrays: &[A],
144) -> PolarsResult<BinaryArray<O>> {
145 let dtype = arrays[0].as_ref().dtype().clone();
146 let (total_len, null_count) = len_null_count(arrays);
147 let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
148
149 let total_bytes = arrays
150 .iter()
151 .map(|arr| {
152 let arr: &BinaryArray<O> = arr.as_ref().as_any().downcast_ref().unwrap();
153 arr.get_values_size()
154 })
155 .sum();
156
157 let mut values = Vec::with_capacity(total_bytes);
158 let mut offsets = Offsets::<O>::with_capacity(total_len);
159
160 for arr in arrays {
161 let arr: &BinaryArray<O> = arr.as_ref().as_any().downcast_ref().unwrap();
162 let first_offset = arr.offsets().first().to_usize();
163 let last_offset = arr.offsets().last().to_usize();
164 values.extend_from_slice(&arr.values()[first_offset..last_offset]);
165 for len in arr.offsets().lengths() {
166 offsets.try_push(len)?;
167 }
168 }
169
170 Ok(unsafe { BinaryArray::new(dtype, offsets.into(), values.into(), validity) })
171}
172
173fn concatenate_utf8<O: Offset, A: AsRef<dyn Array>>(arrays: &[A]) -> PolarsResult<Utf8Array<O>> {
174 let dtype = arrays[0].as_ref().dtype().clone();
175 let (total_len, null_count) = len_null_count(arrays);
176 let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
177
178 let total_bytes = arrays
179 .iter()
180 .map(|arr| {
181 let arr: &Utf8Array<O> = arr.as_ref().as_any().downcast_ref().unwrap();
182 arr.get_values_size()
183 })
184 .sum();
185
186 let mut bytes = Vec::with_capacity(total_bytes);
187 let mut offsets = Offsets::<O>::with_capacity(total_len);
188
189 for arr in arrays {
190 let arr: &Utf8Array<O> = arr.as_ref().as_any().downcast_ref().unwrap();
191 let first_offset = arr.offsets().first().to_usize();
192 let last_offset = arr.offsets().last().to_usize();
193 bytes.extend_from_slice(&arr.values()[first_offset..last_offset]);
194 for len in arr.offsets().lengths() {
195 offsets.try_push(len)?;
196 }
197 }
198
199 Ok(unsafe { Utf8Array::new_unchecked(dtype, offsets.into(), bytes.into(), validity) })
200}
201
202fn concatenate_view<V: ViewType + ?Sized, A: AsRef<dyn Array>>(
203 arrays: &[A],
204) -> BinaryViewArrayGeneric<V> {
205 let dtype = arrays[0].as_ref().dtype().clone();
206 let (total_len, null_count) = len_null_count(arrays);
207 if total_len == 0 {
208 return BinaryViewArrayGeneric::new_empty(dtype);
209 }
210 let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
211
212 let first_arr: &BinaryViewArrayGeneric<V> = arrays[0].as_ref().as_any().downcast_ref().unwrap();
213 let mut total_nondedup_buffers = first_arr.data_buffers().len();
214 let mut max_arr_bufferset_len = 0;
215 let mut all_same_bufs = true;
216 for arr in arrays {
217 let arr: &BinaryViewArrayGeneric<V> = arr.as_ref().as_any().downcast_ref().unwrap();
218 max_arr_bufferset_len = max_arr_bufferset_len.max(arr.data_buffers().len());
219 total_nondedup_buffers += arr.data_buffers().len();
220 all_same_bufs &= std::ptr::eq(
222 Arc::as_ptr(arr.data_buffers()),
223 Arc::as_ptr(first_arr.data_buffers()),
224 );
225 }
226
227 let mut total_bytes_len = 0;
228 let mut views = Vec::with_capacity(total_len);
229
230 let mut total_buffer_len = 0;
231 let buffers = if all_same_bufs {
232 total_buffer_len = first_arr.total_buffer_len();
233 for arr in arrays {
234 let arr: &BinaryViewArrayGeneric<V> = arr.as_ref().as_any().downcast_ref().unwrap();
235 views.extend_from_slice(arr.views());
236 total_bytes_len += arr.total_bytes_len();
237 }
238 Arc::clone(first_arr.data_buffers())
239
240 } else if total_len > total_nondedup_buffers {
243 assert!(arrays.len() < u32::MAX as usize);
244
245 let mut dedup_buffers = Vec::with_capacity(total_nondedup_buffers);
246 let mut global_dedup_buffer_idx = PlHashMap::with_capacity(total_nondedup_buffers);
247 let mut local_dedup_buffer_idx = Vec::new();
248 local_dedup_buffer_idx.resize(max_arr_bufferset_len, (0, u32::MAX));
249
250 for (arr_idx, arr) in arrays.iter().enumerate() {
251 let arr: &BinaryViewArrayGeneric<V> = arr.as_ref().as_any().downcast_ref().unwrap();
252
253 unsafe {
254 for mut view in arr.views().iter().copied() {
255 if view.length > View::MAX_INLINE_SIZE {
256 let (mut new_buffer_idx, cache_tag) =
258 *local_dedup_buffer_idx.get_unchecked(view.buffer_idx as usize);
259 if cache_tag != arr_idx as u32 {
260 let buffer = arr.data_buffers().get_unchecked(view.buffer_idx as usize);
262 let buf_id = (buffer.as_slice().as_ptr(), buffer.len());
263 let idx = match global_dedup_buffer_idx.entry(buf_id) {
264 Entry::Occupied(o) => *o.get(),
265 Entry::Vacant(v) => {
266 let idx = dedup_buffers.len() as u32;
267 dedup_buffers.push(buffer.clone());
268 total_buffer_len += buffer.len();
269 v.insert(idx);
270 idx
271 },
272 };
273
274 *local_dedup_buffer_idx.get_unchecked_mut(view.buffer_idx as usize) =
276 (idx, arr_idx as u32);
277 new_buffer_idx = idx;
278 }
279 view.buffer_idx = new_buffer_idx;
280 }
281
282 total_bytes_len += view.length as usize;
283 views.push_unchecked(view);
284 }
285 }
286 }
287
288 dedup_buffers.into_iter().collect()
289 } else {
290 for arr in arrays {
293 let arr: &BinaryViewArrayGeneric<V> = arr.as_ref().as_any().downcast_ref().unwrap();
294 total_buffer_len += arr
295 .len_iter()
296 .map(|l| if l > 12 { l as usize } else { 0 })
297 .sum::<usize>();
298 }
299
300 let mut unprocessed_buffer_len = total_buffer_len;
301 let mut new_buffers: Vec<Vec<u8>> = vec![Vec::with_capacity(
302 unprocessed_buffer_len.min(u32::MAX as usize),
303 )];
304 for arr in arrays {
305 let arr: &BinaryViewArrayGeneric<V> = arr.as_ref().as_any().downcast_ref().unwrap();
306 let buffers = arr.data_buffers();
307
308 unsafe {
309 for mut view in arr.views().iter().copied() {
310 total_bytes_len += view.length as usize;
311 if view.length > 12 {
312 if new_buffers.last().unwrap_unchecked().len() + view.length as usize
313 >= u32::MAX as usize
314 {
315 new_buffers.push(Vec::with_capacity(
316 unprocessed_buffer_len.min(u32::MAX as usize),
317 ));
318 }
319 let new_offset = new_buffers.last().unwrap_unchecked().len() as u32;
320 new_buffers
321 .last_mut()
322 .unwrap_unchecked()
323 .extend_from_slice(view.get_slice_unchecked(buffers));
324 view.offset = new_offset;
325 view.buffer_idx = new_buffers.len() as u32 - 1;
326 unprocessed_buffer_len -= view.length as usize;
327 }
328 views.push_unchecked(view);
329 }
330 }
331 }
332
333 new_buffers.into_iter().map(Buffer::from).collect()
334 };
335
336 unsafe {
337 BinaryViewArrayGeneric::new_unchecked(
338 dtype,
339 views.into(),
340 buffers,
341 validity,
342 total_bytes_len,
343 total_buffer_len,
344 )
345 }
346}
347
348fn concatenate_list<O: Offset, A: AsRef<dyn Array>>(arrays: &[A]) -> PolarsResult<ListArray<O>> {
349 let dtype = arrays[0].as_ref().dtype().clone();
350 let (total_len, null_count) = len_null_count(arrays);
351 let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
352
353 let mut num_sliced = 0;
354 let mut offsets = Offsets::<O>::with_capacity(total_len);
355 for arr in arrays {
356 let arr: &ListArray<O> = arr.as_ref().as_any().downcast_ref().unwrap();
357 for len in arr.offsets().lengths() {
358 offsets.try_push(len)?;
359 }
360 let first_offset = arr.offsets().first().to_usize();
361 let offset_range = arr.offsets().range().to_usize();
362 num_sliced += (first_offset != 0 || offset_range != arr.values().len()) as usize;
363 }
364
365 let values = if num_sliced > 0 {
366 let inner_sliced_arrays = arrays
367 .iter()
368 .map(|arr| {
369 let arr: &ListArray<O> = arr.as_ref().as_any().downcast_ref().unwrap();
370 let first_offset = arr.offsets().first().to_usize();
371 let offset_range = arr.offsets().range().to_usize();
372 if first_offset != 0 || offset_range != arr.values().len() {
373 arr.values().sliced(first_offset, offset_range)
374 } else {
375 arr.values().to_boxed()
376 }
377 })
378 .collect_vec();
379 concatenate_unchecked(&inner_sliced_arrays[..])?
380 } else {
381 let inner_arrays = arrays
382 .iter()
383 .map(|arr| {
384 let arr: &ListArray<O> = arr.as_ref().as_any().downcast_ref().unwrap();
385 &**arr.values()
386 })
387 .collect_vec();
388 concatenate_unchecked(&inner_arrays)?
389 };
390
391 Ok(ListArray::new(dtype, offsets.into(), values, validity))
392}
393
394fn concatenate_fixed_size_binary<A: AsRef<dyn Array>>(
395 arrays: &[A],
396) -> PolarsResult<FixedSizeBinaryArray> {
397 let dtype = arrays[0].as_ref().dtype().clone();
398 let (total_len, null_count) = len_null_count(arrays);
399 let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
400
401 let total_bytes = arrays
402 .iter()
403 .map(|arr| {
404 let arr: &FixedSizeBinaryArray = arr.as_ref().as_any().downcast_ref().unwrap();
405 arr.values().len()
406 })
407 .sum();
408
409 let mut bytes = Vec::with_capacity(total_bytes);
410 for arr in arrays {
411 let arr: &FixedSizeBinaryArray = arr.as_ref().as_any().downcast_ref().unwrap();
412 bytes.extend_from_slice(arr.values());
413 }
414
415 Ok(FixedSizeBinaryArray::new(dtype, bytes.into(), validity))
416}
417
418fn concatenate_fixed_size_list<A: AsRef<dyn Array>>(
419 arrays: &[A],
420) -> PolarsResult<FixedSizeListArray> {
421 let dtype = arrays[0].as_ref().dtype().clone();
422 let (total_len, null_count) = len_null_count(arrays);
423
424 let inner_arrays = arrays
425 .iter()
426 .map(|arr| {
427 let arr: &FixedSizeListArray = arr.as_ref().as_any().downcast_ref().unwrap();
428 &**arr.values()
429 })
430 .collect_vec();
431 let values = concatenate_unchecked(&inner_arrays)?;
432 let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
433 Ok(FixedSizeListArray::new(dtype, total_len, values, validity))
434}
435
436fn concatenate_struct<A: AsRef<dyn Array>>(arrays: &[A]) -> PolarsResult<StructArray> {
437 let dtype = arrays[0].as_ref().dtype().clone();
438 let (total_len, null_count) = len_null_count(arrays);
439 let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
440
441 let first_arr: &StructArray = arrays[0].as_ref().as_any().downcast_ref().unwrap();
442 let num_fields = first_arr.values().len();
443
444 let mut inner_arrays = Vec::with_capacity(arrays.len());
445 let values = (0..num_fields)
446 .map(|f| {
447 inner_arrays.clear();
448 for arr in arrays {
449 let arr: &StructArray = arr.as_ref().as_any().downcast_ref().unwrap();
450 inner_arrays.push(&arr.values()[f]);
451 }
452 concatenate_unchecked(&inner_arrays)
453 })
454 .try_collect_vec()?;
455
456 Ok(StructArray::new(dtype, total_len, values, validity))
457}