Compare commits
No commits in common. "a1a621ddfd78fac123b08e17a5c85a9736e979d4" and "f24af467ec4ba1dad0e599a28d5bc51f9aa1156e" have entirely different histories.
a1a621ddfd
...
f24af467ec
9 changed files with 1 additions and 186 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
|
@ -912,7 +912,6 @@ version = "0.3.0"
|
|||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
"arrow-cast",
|
||||
"arrow-schema",
|
||||
"arrow-select",
|
||||
"bytemuck",
|
||||
|
|
|
|||
|
|
@ -12,7 +12,6 @@ path = "src/lib.rs"
|
|||
[dependencies]
|
||||
arrow-array = "58.0.0"
|
||||
arrow-buffer = "58.0.0"
|
||||
arrow-cast = "58.0.0"
|
||||
arrow-schema = "58.0.0"
|
||||
arrow-select = "58.0.0"
|
||||
bytemuck = { version = "1", features = ["derive", "extern_crate_alloc"] }
|
||||
|
|
|
|||
|
|
@ -14,7 +14,6 @@ pub use error::IngestionError;
|
|||
pub use ingestion::ingest_array;
|
||||
pub use ingestion::ingest_record_batch;
|
||||
pub use ingestion::ingest_record_batch_reader;
|
||||
pub use options::HeterogeneousListMode;
|
||||
pub use options::ListProjection;
|
||||
pub use options::ProjectionOptions;
|
||||
pub use options::StringProjection;
|
||||
|
|
|
|||
|
|
@ -52,28 +52,6 @@ pub enum UnionMode {
|
|||
Sparse,
|
||||
}
|
||||
|
||||
/// How heterogeneous q general lists are projected when child Arrow types differ.
|
||||
///
|
||||
/// KDB returns mixed-typed columns (most commonly: temporal nulls of varying
|
||||
/// precision interleaved in a single column) as q general lists. The
|
||||
/// faithful representation is an Arrow Union, but consumers such as Polars
|
||||
/// reject Union types outright.
|
||||
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
|
||||
pub enum HeterogeneousListMode {
|
||||
/// Faithful: emit Arrow Union (Dense or Sparse per `union_mode`).
|
||||
/// Lossless. Default.
|
||||
#[default]
|
||||
Union,
|
||||
/// Lossy: when every arm of the union is a temporal type (Timestamp,
|
||||
/// Date32, Date64, Time32, Time64, Duration), promote all values to
|
||||
/// `Timestamp(Nanosecond, None)` and emit a flat array. Falls back to
|
||||
/// Union if any arm is non-temporal or the cast fails.
|
||||
///
|
||||
/// Use this when the downstream consumer cannot handle Arrow Union
|
||||
/// (Polars) and the column is known to be temporal-only.
|
||||
CoalesceTemporals,
|
||||
}
|
||||
|
||||
/// Combined projection options threaded through `project()` / `project_table()`.
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub struct ProjectionOptions {
|
||||
|
|
@ -81,9 +59,6 @@ pub struct ProjectionOptions {
|
|||
pub string: StringProjection,
|
||||
pub list: ListProjection,
|
||||
pub union_mode: UnionMode,
|
||||
/// How heterogeneous general lists are projected when child Arrow types
|
||||
/// differ. Default: `Union` (faithful, lossless, may be rejected by Polars).
|
||||
pub heterogeneous_list_mode: HeterogeneousListMode,
|
||||
/// When `true`, q infinity sentinels (e.g. `0Wi`, `0Wj`, `0w`) are mapped
|
||||
/// to Arrow nulls alongside the standard null sentinels. Default: `false`.
|
||||
pub treat_infinity_as_null: bool,
|
||||
|
|
@ -102,7 +77,6 @@ impl Default for ProjectionOptions {
|
|||
string: StringProjection::default(),
|
||||
list: ListProjection::default(),
|
||||
union_mode: UnionMode::default(),
|
||||
heterogeneous_list_mode: HeterogeneousListMode::default(),
|
||||
treat_infinity_as_null: false,
|
||||
parallel: true,
|
||||
assume_symbol_utf8: true,
|
||||
|
|
|
|||
|
|
@ -99,7 +99,6 @@ use rayon::prelude::*;
|
|||
use crate::error::ProjectionError;
|
||||
use crate::error::ProjectionResult;
|
||||
use crate::metadata::q_field;
|
||||
use crate::options::HeterogeneousListMode;
|
||||
use crate::options::ListProjection;
|
||||
use crate::options::ProjectionOptions;
|
||||
use crate::options::StringProjection;
|
||||
|
|
@ -965,55 +964,11 @@ fn project_list(list: &List, opts: &ProjectionOptions) -> ProjectionResult<Array
|
|||
// Heterogeneous list → Union projection
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
fn is_temporal_dt(dt: &DataType) -> bool {
|
||||
matches!(
|
||||
dt,
|
||||
DataType::Timestamp(_, _)
|
||||
| DataType::Date32
|
||||
| DataType::Date64
|
||||
| DataType::Time32(_)
|
||||
| DataType::Time64(_)
|
||||
| DataType::Duration(_)
|
||||
)
|
||||
}
|
||||
|
||||
/// Attempts to coalesce a heterogeneous list of temporal Arrow arrays into a
|
||||
/// flat `Timestamp(Nanosecond, None)` array. Returns `None` if any arm is
|
||||
/// non-temporal or `arrow_cast::cast` rejects an arm. Callers fall back to
|
||||
/// Union encoding in that case.
|
||||
fn try_coalesce_temporals_to_ns(child_exports: &[ArrayExport]) -> Option<ArrayRef> {
|
||||
let target = DataType::Timestamp(TimeUnit::Nanosecond, None);
|
||||
let mut cast_arrays: Vec<ArrayRef> = Vec::with_capacity(child_exports.len());
|
||||
for export in child_exports {
|
||||
let dt = export.array.data_type();
|
||||
if !is_temporal_dt(dt) {
|
||||
return None;
|
||||
}
|
||||
match arrow_cast::cast(export.array.as_ref(), &target) {
|
||||
Ok(arr) => cast_arrays.push(arr),
|
||||
Err(_) => return None,
|
||||
}
|
||||
}
|
||||
let refs: Vec<&dyn arrow_array::Array> = cast_arrays.iter().map(|a| a.as_ref()).collect();
|
||||
arrow_select::concat::concat(&refs).ok()
|
||||
}
|
||||
|
||||
fn project_heterogeneous_list(
|
||||
values: &[Value],
|
||||
child_exports: Vec<ArrayExport>,
|
||||
opts: &ProjectionOptions,
|
||||
) -> ProjectionResult<ArrayExport> {
|
||||
// Opt-in: collapse all-temporal heterogeneous lists into Timestamp(ns).
|
||||
// Polars and similar consumers cannot ingest Arrow Union; this lets users
|
||||
// accept the lossy promotion in exchange for a flat, ingestible column.
|
||||
if opts.heterogeneous_list_mode == HeterogeneousListMode::CoalesceTemporals
|
||||
&& let Some(flat) = try_coalesce_temporals_to_ns(&child_exports)
|
||||
{
|
||||
let dt = DataType::Timestamp(TimeUnit::Nanosecond, None);
|
||||
let field = q_field(dt, false, "list", None, None, None);
|
||||
return Ok(ArrayExport { array: flat, field });
|
||||
}
|
||||
|
||||
// Assign a type_id (i8) to each unique Arrow DataType in insertion order.
|
||||
let mut type_id_map: Vec<(DataType, i8)> = Vec::new();
|
||||
let mut type_ids: Vec<i8> = Vec::with_capacity(values.len());
|
||||
|
|
@ -1203,70 +1158,3 @@ fn attribute_meta_str(a: Attribute) -> &'static str {
|
|||
Attribute::Grouped => "grouped",
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use qroissant_core::Atom;
|
||||
use qroissant_core::List;
|
||||
|
||||
fn mixed_temporal_list() -> Value {
|
||||
Value::List(List::new(
|
||||
Attribute::None,
|
||||
vec![
|
||||
Value::Atom(Atom::Timestamp(0)),
|
||||
Value::Atom(Atom::Datetime(0.0)),
|
||||
],
|
||||
))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn heterogeneous_temporals_emit_union_by_default() {
|
||||
let value = mixed_temporal_list();
|
||||
let opts = ProjectionOptions::default();
|
||||
let export = project(&value, &opts).expect("projection");
|
||||
assert!(
|
||||
matches!(export.array.data_type(), DataType::Union(_, _)),
|
||||
"expected Union under default mode, got {:?}",
|
||||
export.array.data_type()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn heterogeneous_temporals_coalesce_to_timestamp_ns() {
|
||||
let value = mixed_temporal_list();
|
||||
let opts = ProjectionOptions {
|
||||
heterogeneous_list_mode: HeterogeneousListMode::CoalesceTemporals,
|
||||
..ProjectionOptions::default()
|
||||
};
|
||||
let export = project(&value, &opts).expect("projection");
|
||||
assert_eq!(
|
||||
export.array.data_type(),
|
||||
&DataType::Timestamp(TimeUnit::Nanosecond, None),
|
||||
"expected flat Timestamp(ns) under CoalesceTemporals"
|
||||
);
|
||||
assert_eq!(export.array.len(), 2, "expected 2 rows, one per arm");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn coalesce_falls_back_to_union_for_non_temporal_arm() {
|
||||
// Mixed temporal + int: not all-temporal, should fall through to Union.
|
||||
let value = Value::List(List::new(
|
||||
Attribute::None,
|
||||
vec![
|
||||
Value::Atom(Atom::Timestamp(0)),
|
||||
Value::Atom(Atom::Int(42)),
|
||||
],
|
||||
));
|
||||
let opts = ProjectionOptions {
|
||||
heterogeneous_list_mode: HeterogeneousListMode::CoalesceTemporals,
|
||||
..ProjectionOptions::default()
|
||||
};
|
||||
let export = project(&value, &opts).expect("projection");
|
||||
assert!(
|
||||
matches!(export.array.data_type(), DataType::Union(_, _)),
|
||||
"non-temporal arm must force Union fallback, got {:?}",
|
||||
export.array.data_type()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@
|
|||
//! Each transform function uses `portable_simd` for the aligned middle of the
|
||||
//! slice and falls back to a scalar loop for the head and tail.
|
||||
|
||||
use std::simd::Select;
|
||||
use std::simd::prelude::*;
|
||||
|
||||
use crate::nulls::Q_NULL_DATE;
|
||||
|
|
|
|||
|
|
@ -3,7 +3,6 @@ use std::sync::Arc;
|
|||
use pyo3::prelude::*;
|
||||
use pyo3::types::PyAny;
|
||||
use pyo3::types::PyBytes;
|
||||
use qroissant_arrow::HeterogeneousListMode;
|
||||
use qroissant_arrow::ListProjection;
|
||||
use qroissant_arrow::ProjectionOptions;
|
||||
use qroissant_arrow::StringProjection;
|
||||
|
|
@ -57,11 +56,6 @@ pub fn decode_options_to_proj_opts(opts: Option<&DecodeOptions>) -> Arc<Projecti
|
|||
crate::types::UnionMode::Dense => qroissant_arrow::UnionMode::Dense,
|
||||
crate::types::UnionMode::Sparse => qroissant_arrow::UnionMode::Sparse,
|
||||
},
|
||||
heterogeneous_list_mode: if opts.coalesce_temporals_value() {
|
||||
HeterogeneousListMode::CoalesceTemporals
|
||||
} else {
|
||||
HeterogeneousListMode::Union
|
||||
},
|
||||
treat_infinity_as_null: opts.treat_infinity_as_null(),
|
||||
parallel: opts.parallel_value(),
|
||||
assume_symbol_utf8: opts.assume_symbol_utf8_value(),
|
||||
|
|
|
|||
|
|
@ -817,7 +817,6 @@ pub struct DecodeOptions {
|
|||
validate_compressed_trailing_bytes: bool,
|
||||
temporal_nulls: bool,
|
||||
treat_infinity_as_null: bool,
|
||||
coalesce_temporals: bool,
|
||||
}
|
||||
|
||||
#[pymethods]
|
||||
|
|
@ -876,11 +875,6 @@ impl DecodeOptions {
|
|||
fn get_treat_infinity_as_null(&self) -> bool {
|
||||
self.treat_infinity_as_null
|
||||
}
|
||||
|
||||
#[getter]
|
||||
fn coalesce_temporals(&self) -> bool {
|
||||
self.coalesce_temporals
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for DecodeOptions {
|
||||
|
|
@ -896,7 +890,6 @@ impl Default for DecodeOptions {
|
|||
validate_compressed_trailing_bytes: true,
|
||||
temporal_nulls: true,
|
||||
treat_infinity_as_null: false,
|
||||
coalesce_temporals: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -933,10 +926,6 @@ impl DecodeOptions {
|
|||
pub(crate) fn assume_symbol_utf8_value(&self) -> bool {
|
||||
self.assume_symbol_utf8
|
||||
}
|
||||
|
||||
pub(crate) fn coalesce_temporals_value(&self) -> bool {
|
||||
self.coalesce_temporals
|
||||
}
|
||||
}
|
||||
|
||||
#[pyclass(module = "qroissant", frozen, eq)]
|
||||
|
|
@ -1007,12 +996,6 @@ impl DecodeOptionsBuilder {
|
|||
next
|
||||
}
|
||||
|
||||
fn with_coalesce_temporals(&self, value: bool) -> Self {
|
||||
let mut next = self.clone();
|
||||
next.options.coalesce_temporals = value;
|
||||
next
|
||||
}
|
||||
|
||||
fn build(&self) -> DecodeOptions {
|
||||
self.options.clone()
|
||||
}
|
||||
|
|
@ -1066,11 +1049,6 @@ impl DecodeOptionsBuilder {
|
|||
fn treat_infinity_as_null(&self) -> bool {
|
||||
self.options.treat_infinity_as_null
|
||||
}
|
||||
|
||||
#[getter]
|
||||
fn coalesce_temporals(&self) -> bool {
|
||||
self.options.coalesce_temporals
|
||||
}
|
||||
}
|
||||
|
||||
#[pyclass(module = "qroissant", frozen, eq)]
|
||||
|
|
|
|||
|
|
@ -260,14 +260,6 @@ class DecodeOptions:
|
|||
def treat_infinity_as_null(self) -> bool:
|
||||
"""Whether ±∞ sentinels are mapped to ``None`` in Arrow arrays."""
|
||||
...
|
||||
@property
|
||||
def coalesce_temporals(self) -> bool:
|
||||
"""Whether heterogeneous all-temporal lists are flattened to ``Timestamp(ns)``
|
||||
instead of being emitted as Arrow ``Union``. Use this when the
|
||||
downstream consumer (e.g. Polars) cannot ingest ``Union`` types.
|
||||
Lossy: mixed precisions are promoted to nanoseconds.
|
||||
"""
|
||||
...
|
||||
|
||||
|
||||
class DecodeOptionsBuilder:
|
||||
|
|
@ -311,15 +303,6 @@ class DecodeOptionsBuilder:
|
|||
def with_treat_infinity_as_null(self, value: bool, /) -> DecodeOptionsBuilder:
|
||||
"""Set whether ±∞ sentinels are mapped to ``None`` in Arrow arrays."""
|
||||
...
|
||||
def with_coalesce_temporals(self, value: bool, /) -> DecodeOptionsBuilder:
|
||||
"""Set whether heterogeneous all-temporal lists are flattened to
|
||||
``Timestamp(ns)`` instead of being emitted as Arrow ``Union``.
|
||||
|
||||
Set to ``True`` for consumers (e.g. Polars) that reject Arrow union
|
||||
types. Lossy: mixed precisions are promoted to nanoseconds. Default
|
||||
``False`` (faithful ``Union`` representation).
|
||||
"""
|
||||
...
|
||||
def build(self) -> DecodeOptions:
|
||||
"""Finalize the builder into an immutable :class:`DecodeOptions` instance."""
|
||||
...
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue