feat(arrow): opt-in coalesce of heterogeneous temporal lists to Timestamp(ns)

KDB returns mixed-typed columns (most commonly: temporal nulls of
varying precision -- 0Np, 0Nz, 0Nd -- interleaved) as q general lists.
The faithful Arrow projection emits a `Union` of the per-element
DataTypes. Polars and most DataFrame consumers reject `Union` outright,
making such columns unusable downstream without a re-roundtrip dance.

Add `HeterogeneousListMode::CoalesceTemporals` (off by default) on
`ProjectionOptions`. When set, `project_heterogeneous_list` checks
whether every arm is temporal (Timestamp, Date32, Date64, Time32,
Time64, Duration) and, if so, casts each child to
`Timestamp(Nanosecond, None)` via `arrow_cast::cast`, concatenates, and
emits a flat array. Any non-temporal arm or cast error falls back to
the existing Union path, so the flag is safe to enable globally.

Plumbed through the Python `DecodeOptions` API as
`with_coalesce_temporals(bool)` with matching getter and pyi stub. The
default stays `False`; users opt in when they know the consumer
(Polars) can't handle Union and accept the lossy precision promotion.

Tests cover (a) default-Union, (b) all-temporal coalesce, and
(c) non-temporal fallback to Union.
This commit is contained in:
Cam Zalewski 2026-05-20 14:42:00 +01:00
parent aa2c0a2ec7
commit a1a621ddfd
8 changed files with 186 additions and 0 deletions

View file

@ -12,6 +12,7 @@ path = "src/lib.rs"
[dependencies]
arrow-array = "58.0.0"
arrow-buffer = "58.0.0"
arrow-cast = "58.0.0"
arrow-schema = "58.0.0"
arrow-select = "58.0.0"
bytemuck = { version = "1", features = ["derive", "extern_crate_alloc"] }

View file

@ -14,6 +14,7 @@ pub use error::IngestionError;
pub use ingestion::ingest_array;
pub use ingestion::ingest_record_batch;
pub use ingestion::ingest_record_batch_reader;
pub use options::HeterogeneousListMode;
pub use options::ListProjection;
pub use options::ProjectionOptions;
pub use options::StringProjection;

View file

@ -52,6 +52,28 @@ pub enum UnionMode {
Sparse,
}
/// How heterogeneous q general lists are projected when child Arrow types differ.
///
/// KDB returns mixed-typed columns (most commonly: temporal nulls of varying
/// precision interleaved in a single column) as q general lists. The
/// faithful representation is an Arrow Union, but consumers such as Polars
/// reject Union types outright.
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub enum HeterogeneousListMode {
/// Faithful: emit Arrow Union (Dense or Sparse per `union_mode`).
/// Lossless. Default.
#[default]
Union,
/// Lossy: when every arm of the union is a temporal type (Timestamp,
/// Date32, Date64, Time32, Time64, Duration), promote all values to
/// `Timestamp(Nanosecond, None)` and emit a flat array. Falls back to
/// Union if any arm is non-temporal or the cast fails.
///
/// Use this when the downstream consumer cannot handle Arrow Union
/// (Polars) and the column is known to be temporal-only.
CoalesceTemporals,
}
/// Combined projection options threaded through `project()` / `project_table()`.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ProjectionOptions {
@ -59,6 +81,9 @@ pub struct ProjectionOptions {
pub string: StringProjection,
pub list: ListProjection,
pub union_mode: UnionMode,
/// How heterogeneous general lists are projected when child Arrow types
/// differ. Default: `Union` (faithful, lossless, may be rejected by Polars).
pub heterogeneous_list_mode: HeterogeneousListMode,
/// When `true`, q infinity sentinels (e.g. `0Wi`, `0Wj`, `0w`) are mapped
/// to Arrow nulls alongside the standard null sentinels. Default: `false`.
pub treat_infinity_as_null: bool,
@ -77,6 +102,7 @@ impl Default for ProjectionOptions {
string: StringProjection::default(),
list: ListProjection::default(),
union_mode: UnionMode::default(),
heterogeneous_list_mode: HeterogeneousListMode::default(),
treat_infinity_as_null: false,
parallel: true,
assume_symbol_utf8: true,

View file

@ -99,6 +99,7 @@ use rayon::prelude::*;
use crate::error::ProjectionError;
use crate::error::ProjectionResult;
use crate::metadata::q_field;
use crate::options::HeterogeneousListMode;
use crate::options::ListProjection;
use crate::options::ProjectionOptions;
use crate::options::StringProjection;
@ -964,11 +965,55 @@ fn project_list(list: &List, opts: &ProjectionOptions) -> ProjectionResult<Array
// Heterogeneous list → Union projection
// ---------------------------------------------------------------------------
fn is_temporal_dt(dt: &DataType) -> bool {
matches!(
dt,
DataType::Timestamp(_, _)
| DataType::Date32
| DataType::Date64
| DataType::Time32(_)
| DataType::Time64(_)
| DataType::Duration(_)
)
}
/// Attempts to coalesce a heterogeneous list of temporal Arrow arrays into a
/// flat `Timestamp(Nanosecond, None)` array. Returns `None` if any arm is
/// non-temporal or `arrow_cast::cast` rejects an arm. Callers fall back to
/// Union encoding in that case.
fn try_coalesce_temporals_to_ns(child_exports: &[ArrayExport]) -> Option<ArrayRef> {
let target = DataType::Timestamp(TimeUnit::Nanosecond, None);
let mut cast_arrays: Vec<ArrayRef> = Vec::with_capacity(child_exports.len());
for export in child_exports {
let dt = export.array.data_type();
if !is_temporal_dt(dt) {
return None;
}
match arrow_cast::cast(export.array.as_ref(), &target) {
Ok(arr) => cast_arrays.push(arr),
Err(_) => return None,
}
}
let refs: Vec<&dyn arrow_array::Array> = cast_arrays.iter().map(|a| a.as_ref()).collect();
arrow_select::concat::concat(&refs).ok()
}
fn project_heterogeneous_list(
values: &[Value],
child_exports: Vec<ArrayExport>,
opts: &ProjectionOptions,
) -> ProjectionResult<ArrayExport> {
// Opt-in: collapse all-temporal heterogeneous lists into Timestamp(ns).
// Polars and similar consumers cannot ingest Arrow Union; this lets users
// accept the lossy promotion in exchange for a flat, ingestible column.
if opts.heterogeneous_list_mode == HeterogeneousListMode::CoalesceTemporals
&& let Some(flat) = try_coalesce_temporals_to_ns(&child_exports)
{
let dt = DataType::Timestamp(TimeUnit::Nanosecond, None);
let field = q_field(dt, false, "list", None, None, None);
return Ok(ArrayExport { array: flat, field });
}
// Assign a type_id (i8) to each unique Arrow DataType in insertion order.
let mut type_id_map: Vec<(DataType, i8)> = Vec::new();
let mut type_ids: Vec<i8> = Vec::with_capacity(values.len());
@ -1158,3 +1203,70 @@ fn attribute_meta_str(a: Attribute) -> &'static str {
Attribute::Grouped => "grouped",
}
}
#[cfg(test)]
mod tests {
use super::*;
use qroissant_core::Atom;
use qroissant_core::List;
fn mixed_temporal_list() -> Value {
Value::List(List::new(
Attribute::None,
vec![
Value::Atom(Atom::Timestamp(0)),
Value::Atom(Atom::Datetime(0.0)),
],
))
}
#[test]
fn heterogeneous_temporals_emit_union_by_default() {
let value = mixed_temporal_list();
let opts = ProjectionOptions::default();
let export = project(&value, &opts).expect("projection");
assert!(
matches!(export.array.data_type(), DataType::Union(_, _)),
"expected Union under default mode, got {:?}",
export.array.data_type()
);
}
#[test]
fn heterogeneous_temporals_coalesce_to_timestamp_ns() {
let value = mixed_temporal_list();
let opts = ProjectionOptions {
heterogeneous_list_mode: HeterogeneousListMode::CoalesceTemporals,
..ProjectionOptions::default()
};
let export = project(&value, &opts).expect("projection");
assert_eq!(
export.array.data_type(),
&DataType::Timestamp(TimeUnit::Nanosecond, None),
"expected flat Timestamp(ns) under CoalesceTemporals"
);
assert_eq!(export.array.len(), 2, "expected 2 rows, one per arm");
}
#[test]
fn coalesce_falls_back_to_union_for_non_temporal_arm() {
// Mixed temporal + int: not all-temporal, should fall through to Union.
let value = Value::List(List::new(
Attribute::None,
vec![
Value::Atom(Atom::Timestamp(0)),
Value::Atom(Atom::Int(42)),
],
));
let opts = ProjectionOptions {
heterogeneous_list_mode: HeterogeneousListMode::CoalesceTemporals,
..ProjectionOptions::default()
};
let export = project(&value, &opts).expect("projection");
assert!(
matches!(export.array.data_type(), DataType::Union(_, _)),
"non-temporal arm must force Union fallback, got {:?}",
export.array.data_type()
);
}
}

View file

@ -3,6 +3,7 @@ use std::sync::Arc;
use pyo3::prelude::*;
use pyo3::types::PyAny;
use pyo3::types::PyBytes;
use qroissant_arrow::HeterogeneousListMode;
use qroissant_arrow::ListProjection;
use qroissant_arrow::ProjectionOptions;
use qroissant_arrow::StringProjection;
@ -56,6 +57,11 @@ pub fn decode_options_to_proj_opts(opts: Option<&DecodeOptions>) -> Arc<Projecti
crate::types::UnionMode::Dense => qroissant_arrow::UnionMode::Dense,
crate::types::UnionMode::Sparse => qroissant_arrow::UnionMode::Sparse,
},
heterogeneous_list_mode: if opts.coalesce_temporals_value() {
HeterogeneousListMode::CoalesceTemporals
} else {
HeterogeneousListMode::Union
},
treat_infinity_as_null: opts.treat_infinity_as_null(),
parallel: opts.parallel_value(),
assume_symbol_utf8: opts.assume_symbol_utf8_value(),

View file

@ -817,6 +817,7 @@ pub struct DecodeOptions {
validate_compressed_trailing_bytes: bool,
temporal_nulls: bool,
treat_infinity_as_null: bool,
coalesce_temporals: bool,
}
#[pymethods]
@ -875,6 +876,11 @@ impl DecodeOptions {
fn get_treat_infinity_as_null(&self) -> bool {
self.treat_infinity_as_null
}
#[getter]
fn coalesce_temporals(&self) -> bool {
self.coalesce_temporals
}
}
impl Default for DecodeOptions {
@ -890,6 +896,7 @@ impl Default for DecodeOptions {
validate_compressed_trailing_bytes: true,
temporal_nulls: true,
treat_infinity_as_null: false,
coalesce_temporals: false,
}
}
}
@ -926,6 +933,10 @@ impl DecodeOptions {
pub(crate) fn assume_symbol_utf8_value(&self) -> bool {
self.assume_symbol_utf8
}
pub(crate) fn coalesce_temporals_value(&self) -> bool {
self.coalesce_temporals
}
}
#[pyclass(module = "qroissant", frozen, eq)]
@ -996,6 +1007,12 @@ impl DecodeOptionsBuilder {
next
}
fn with_coalesce_temporals(&self, value: bool) -> Self {
let mut next = self.clone();
next.options.coalesce_temporals = value;
next
}
fn build(&self) -> DecodeOptions {
self.options.clone()
}
@ -1049,6 +1066,11 @@ impl DecodeOptionsBuilder {
fn treat_infinity_as_null(&self) -> bool {
self.options.treat_infinity_as_null
}
#[getter]
fn coalesce_temporals(&self) -> bool {
self.options.coalesce_temporals
}
}
#[pyclass(module = "qroissant", frozen, eq)]