diff --git a/Cargo.lock b/Cargo.lock index 65347bf..047ae48 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -912,6 +912,7 @@ version = "0.3.0" dependencies = [ "arrow-array", "arrow-buffer", + "arrow-cast", "arrow-schema", "arrow-select", "bytemuck", diff --git a/crates/qroissant-arrow/Cargo.toml b/crates/qroissant-arrow/Cargo.toml index 395c0d8..84c368d 100644 --- a/crates/qroissant-arrow/Cargo.toml +++ b/crates/qroissant-arrow/Cargo.toml @@ -12,6 +12,7 @@ path = "src/lib.rs" [dependencies] arrow-array = "58.0.0" arrow-buffer = "58.0.0" +arrow-cast = "58.0.0" arrow-schema = "58.0.0" arrow-select = "58.0.0" bytemuck = { version = "1", features = ["derive", "extern_crate_alloc"] } diff --git a/crates/qroissant-arrow/src/lib.rs b/crates/qroissant-arrow/src/lib.rs index d4e617a..807df23 100644 --- a/crates/qroissant-arrow/src/lib.rs +++ b/crates/qroissant-arrow/src/lib.rs @@ -14,6 +14,7 @@ pub use error::IngestionError; pub use ingestion::ingest_array; pub use ingestion::ingest_record_batch; pub use ingestion::ingest_record_batch_reader; +pub use options::HeterogeneousListMode; pub use options::ListProjection; pub use options::ProjectionOptions; pub use options::StringProjection; diff --git a/crates/qroissant-arrow/src/options.rs b/crates/qroissant-arrow/src/options.rs index 8938919..e16443c 100644 --- a/crates/qroissant-arrow/src/options.rs +++ b/crates/qroissant-arrow/src/options.rs @@ -52,6 +52,28 @@ pub enum UnionMode { Sparse, } +/// How heterogeneous q general lists are projected when child Arrow types differ. +/// +/// KDB returns mixed-typed columns (most commonly: temporal nulls of varying +/// precision interleaved in a single column) as q general lists. The +/// faithful representation is an Arrow Union, but consumers such as Polars +/// reject Union types outright. +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +pub enum HeterogeneousListMode { + /// Faithful: emit Arrow Union (Dense or Sparse per `union_mode`). + /// Lossless. Default. + #[default] + Union, + /// Lossy: when every arm of the union is a temporal type (Timestamp, + /// Date32, Date64, Time32, Time64, Duration), promote all values to + /// `Timestamp(Nanosecond, None)` and emit a flat array. Falls back to + /// Union if any arm is non-temporal or the cast fails. + /// + /// Use this when the downstream consumer cannot handle Arrow Union + /// (Polars) and the column is known to be temporal-only. + CoalesceTemporals, +} + /// Combined projection options threaded through `project()` / `project_table()`. #[derive(Clone, Debug, PartialEq, Eq)] pub struct ProjectionOptions { @@ -59,6 +81,9 @@ pub struct ProjectionOptions { pub string: StringProjection, pub list: ListProjection, pub union_mode: UnionMode, + /// How heterogeneous general lists are projected when child Arrow types + /// differ. Default: `Union` (faithful, lossless, may be rejected by Polars). + pub heterogeneous_list_mode: HeterogeneousListMode, /// When `true`, q infinity sentinels (e.g. `0Wi`, `0Wj`, `0w`) are mapped /// to Arrow nulls alongside the standard null sentinels. Default: `false`. pub treat_infinity_as_null: bool, @@ -77,6 +102,7 @@ impl Default for ProjectionOptions { string: StringProjection::default(), list: ListProjection::default(), union_mode: UnionMode::default(), + heterogeneous_list_mode: HeterogeneousListMode::default(), treat_infinity_as_null: false, parallel: true, assume_symbol_utf8: true, diff --git a/crates/qroissant-arrow/src/projection.rs b/crates/qroissant-arrow/src/projection.rs index 6219b9e..0f97f10 100644 --- a/crates/qroissant-arrow/src/projection.rs +++ b/crates/qroissant-arrow/src/projection.rs @@ -99,6 +99,7 @@ use rayon::prelude::*; use crate::error::ProjectionError; use crate::error::ProjectionResult; use crate::metadata::q_field; +use crate::options::HeterogeneousListMode; use crate::options::ListProjection; use crate::options::ProjectionOptions; use crate::options::StringProjection; @@ -964,11 +965,55 @@ fn project_list(list: &List, opts: &ProjectionOptions) -> ProjectionResult bool { + matches!( + dt, + DataType::Timestamp(_, _) + | DataType::Date32 + | DataType::Date64 + | DataType::Time32(_) + | DataType::Time64(_) + | DataType::Duration(_) + ) +} + +/// Attempts to coalesce a heterogeneous list of temporal Arrow arrays into a +/// flat `Timestamp(Nanosecond, None)` array. Returns `None` if any arm is +/// non-temporal or `arrow_cast::cast` rejects an arm. Callers fall back to +/// Union encoding in that case. +fn try_coalesce_temporals_to_ns(child_exports: &[ArrayExport]) -> Option { + let target = DataType::Timestamp(TimeUnit::Nanosecond, None); + let mut cast_arrays: Vec = Vec::with_capacity(child_exports.len()); + for export in child_exports { + let dt = export.array.data_type(); + if !is_temporal_dt(dt) { + return None; + } + match arrow_cast::cast(export.array.as_ref(), &target) { + Ok(arr) => cast_arrays.push(arr), + Err(_) => return None, + } + } + let refs: Vec<&dyn arrow_array::Array> = cast_arrays.iter().map(|a| a.as_ref()).collect(); + arrow_select::concat::concat(&refs).ok() +} + fn project_heterogeneous_list( values: &[Value], child_exports: Vec, opts: &ProjectionOptions, ) -> ProjectionResult { + // Opt-in: collapse all-temporal heterogeneous lists into Timestamp(ns). + // Polars and similar consumers cannot ingest Arrow Union; this lets users + // accept the lossy promotion in exchange for a flat, ingestible column. + if opts.heterogeneous_list_mode == HeterogeneousListMode::CoalesceTemporals + && let Some(flat) = try_coalesce_temporals_to_ns(&child_exports) + { + let dt = DataType::Timestamp(TimeUnit::Nanosecond, None); + let field = q_field(dt, false, "list", None, None, None); + return Ok(ArrayExport { array: flat, field }); + } + // Assign a type_id (i8) to each unique Arrow DataType in insertion order. let mut type_id_map: Vec<(DataType, i8)> = Vec::new(); let mut type_ids: Vec = Vec::with_capacity(values.len()); @@ -1158,3 +1203,70 @@ fn attribute_meta_str(a: Attribute) -> &'static str { Attribute::Grouped => "grouped", } } + +#[cfg(test)] +mod tests { + use super::*; + use qroissant_core::Atom; + use qroissant_core::List; + + fn mixed_temporal_list() -> Value { + Value::List(List::new( + Attribute::None, + vec![ + Value::Atom(Atom::Timestamp(0)), + Value::Atom(Atom::Datetime(0.0)), + ], + )) + } + + #[test] + fn heterogeneous_temporals_emit_union_by_default() { + let value = mixed_temporal_list(); + let opts = ProjectionOptions::default(); + let export = project(&value, &opts).expect("projection"); + assert!( + matches!(export.array.data_type(), DataType::Union(_, _)), + "expected Union under default mode, got {:?}", + export.array.data_type() + ); + } + + #[test] + fn heterogeneous_temporals_coalesce_to_timestamp_ns() { + let value = mixed_temporal_list(); + let opts = ProjectionOptions { + heterogeneous_list_mode: HeterogeneousListMode::CoalesceTemporals, + ..ProjectionOptions::default() + }; + let export = project(&value, &opts).expect("projection"); + assert_eq!( + export.array.data_type(), + &DataType::Timestamp(TimeUnit::Nanosecond, None), + "expected flat Timestamp(ns) under CoalesceTemporals" + ); + assert_eq!(export.array.len(), 2, "expected 2 rows, one per arm"); + } + + #[test] + fn coalesce_falls_back_to_union_for_non_temporal_arm() { + // Mixed temporal + int: not all-temporal, should fall through to Union. + let value = Value::List(List::new( + Attribute::None, + vec![ + Value::Atom(Atom::Timestamp(0)), + Value::Atom(Atom::Int(42)), + ], + )); + let opts = ProjectionOptions { + heterogeneous_list_mode: HeterogeneousListMode::CoalesceTemporals, + ..ProjectionOptions::default() + }; + let export = project(&value, &opts).expect("projection"); + assert!( + matches!(export.array.data_type(), DataType::Union(_, _)), + "non-temporal arm must force Union fallback, got {:?}", + export.array.data_type() + ); + } +} diff --git a/crates/qroissant-python/src/serde.rs b/crates/qroissant-python/src/serde.rs index 397faae..469eb06 100644 --- a/crates/qroissant-python/src/serde.rs +++ b/crates/qroissant-python/src/serde.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use pyo3::prelude::*; use pyo3::types::PyAny; use pyo3::types::PyBytes; +use qroissant_arrow::HeterogeneousListMode; use qroissant_arrow::ListProjection; use qroissant_arrow::ProjectionOptions; use qroissant_arrow::StringProjection; @@ -56,6 +57,11 @@ pub fn decode_options_to_proj_opts(opts: Option<&DecodeOptions>) -> Arc qroissant_arrow::UnionMode::Dense, crate::types::UnionMode::Sparse => qroissant_arrow::UnionMode::Sparse, }, + heterogeneous_list_mode: if opts.coalesce_temporals_value() { + HeterogeneousListMode::CoalesceTemporals + } else { + HeterogeneousListMode::Union + }, treat_infinity_as_null: opts.treat_infinity_as_null(), parallel: opts.parallel_value(), assume_symbol_utf8: opts.assume_symbol_utf8_value(), diff --git a/crates/qroissant-python/src/types.rs b/crates/qroissant-python/src/types.rs index 4f2eabd..1f54cdf 100644 --- a/crates/qroissant-python/src/types.rs +++ b/crates/qroissant-python/src/types.rs @@ -817,6 +817,7 @@ pub struct DecodeOptions { validate_compressed_trailing_bytes: bool, temporal_nulls: bool, treat_infinity_as_null: bool, + coalesce_temporals: bool, } #[pymethods] @@ -875,6 +876,11 @@ impl DecodeOptions { fn get_treat_infinity_as_null(&self) -> bool { self.treat_infinity_as_null } + + #[getter] + fn coalesce_temporals(&self) -> bool { + self.coalesce_temporals + } } impl Default for DecodeOptions { @@ -890,6 +896,7 @@ impl Default for DecodeOptions { validate_compressed_trailing_bytes: true, temporal_nulls: true, treat_infinity_as_null: false, + coalesce_temporals: false, } } } @@ -926,6 +933,10 @@ impl DecodeOptions { pub(crate) fn assume_symbol_utf8_value(&self) -> bool { self.assume_symbol_utf8 } + + pub(crate) fn coalesce_temporals_value(&self) -> bool { + self.coalesce_temporals + } } #[pyclass(module = "qroissant", frozen, eq)] @@ -996,6 +1007,12 @@ impl DecodeOptionsBuilder { next } + fn with_coalesce_temporals(&self, value: bool) -> Self { + let mut next = self.clone(); + next.options.coalesce_temporals = value; + next + } + fn build(&self) -> DecodeOptions { self.options.clone() } @@ -1049,6 +1066,11 @@ impl DecodeOptionsBuilder { fn treat_infinity_as_null(&self) -> bool { self.options.treat_infinity_as_null } + + #[getter] + fn coalesce_temporals(&self) -> bool { + self.options.coalesce_temporals + } } #[pyclass(module = "qroissant", frozen, eq)] diff --git a/python/qroissant/_config.pyi b/python/qroissant/_config.pyi index 304f569..22669a2 100644 --- a/python/qroissant/_config.pyi +++ b/python/qroissant/_config.pyi @@ -260,6 +260,14 @@ class DecodeOptions: def treat_infinity_as_null(self) -> bool: """Whether ±∞ sentinels are mapped to ``None`` in Arrow arrays.""" ... + @property + def coalesce_temporals(self) -> bool: + """Whether heterogeneous all-temporal lists are flattened to ``Timestamp(ns)`` + instead of being emitted as Arrow ``Union``. Use this when the + downstream consumer (e.g. Polars) cannot ingest ``Union`` types. + Lossy: mixed precisions are promoted to nanoseconds. + """ + ... class DecodeOptionsBuilder: @@ -303,6 +311,15 @@ class DecodeOptionsBuilder: def with_treat_infinity_as_null(self, value: bool, /) -> DecodeOptionsBuilder: """Set whether ±∞ sentinels are mapped to ``None`` in Arrow arrays.""" ... + def with_coalesce_temporals(self, value: bool, /) -> DecodeOptionsBuilder: + """Set whether heterogeneous all-temporal lists are flattened to + ``Timestamp(ns)`` instead of being emitted as Arrow ``Union``. + + Set to ``True`` for consumers (e.g. Polars) that reject Arrow union + types. Lossy: mixed precisions are promoted to nanoseconds. Default + ``False`` (faithful ``Union`` representation). + """ + ... def build(self) -> DecodeOptions: """Finalize the builder into an immutable :class:`DecodeOptions` instance.""" ...