Compare commits
No commits in common. "a1a621ddfd78fac123b08e17a5c85a9736e979d4" and "f24af467ec4ba1dad0e599a28d5bc51f9aa1156e" have entirely different histories.
a1a621ddfd
...
f24af467ec
9 changed files with 1 additions and 186 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
|
@ -912,7 +912,6 @@ version = "0.3.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow-array",
|
"arrow-array",
|
||||||
"arrow-buffer",
|
"arrow-buffer",
|
||||||
"arrow-cast",
|
|
||||||
"arrow-schema",
|
"arrow-schema",
|
||||||
"arrow-select",
|
"arrow-select",
|
||||||
"bytemuck",
|
"bytemuck",
|
||||||
|
|
|
||||||
|
|
@ -12,7 +12,6 @@ path = "src/lib.rs"
|
||||||
[dependencies]
|
[dependencies]
|
||||||
arrow-array = "58.0.0"
|
arrow-array = "58.0.0"
|
||||||
arrow-buffer = "58.0.0"
|
arrow-buffer = "58.0.0"
|
||||||
arrow-cast = "58.0.0"
|
|
||||||
arrow-schema = "58.0.0"
|
arrow-schema = "58.0.0"
|
||||||
arrow-select = "58.0.0"
|
arrow-select = "58.0.0"
|
||||||
bytemuck = { version = "1", features = ["derive", "extern_crate_alloc"] }
|
bytemuck = { version = "1", features = ["derive", "extern_crate_alloc"] }
|
||||||
|
|
|
||||||
|
|
@ -14,7 +14,6 @@ pub use error::IngestionError;
|
||||||
pub use ingestion::ingest_array;
|
pub use ingestion::ingest_array;
|
||||||
pub use ingestion::ingest_record_batch;
|
pub use ingestion::ingest_record_batch;
|
||||||
pub use ingestion::ingest_record_batch_reader;
|
pub use ingestion::ingest_record_batch_reader;
|
||||||
pub use options::HeterogeneousListMode;
|
|
||||||
pub use options::ListProjection;
|
pub use options::ListProjection;
|
||||||
pub use options::ProjectionOptions;
|
pub use options::ProjectionOptions;
|
||||||
pub use options::StringProjection;
|
pub use options::StringProjection;
|
||||||
|
|
|
||||||
|
|
@ -52,28 +52,6 @@ pub enum UnionMode {
|
||||||
Sparse,
|
Sparse,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// How heterogeneous q general lists are projected when child Arrow types differ.
|
|
||||||
///
|
|
||||||
/// KDB returns mixed-typed columns (most commonly: temporal nulls of varying
|
|
||||||
/// precision interleaved in a single column) as q general lists. The
|
|
||||||
/// faithful representation is an Arrow Union, but consumers such as Polars
|
|
||||||
/// reject Union types outright.
|
|
||||||
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
|
|
||||||
pub enum HeterogeneousListMode {
|
|
||||||
/// Faithful: emit Arrow Union (Dense or Sparse per `union_mode`).
|
|
||||||
/// Lossless. Default.
|
|
||||||
#[default]
|
|
||||||
Union,
|
|
||||||
/// Lossy: when every arm of the union is a temporal type (Timestamp,
|
|
||||||
/// Date32, Date64, Time32, Time64, Duration), promote all values to
|
|
||||||
/// `Timestamp(Nanosecond, None)` and emit a flat array. Falls back to
|
|
||||||
/// Union if any arm is non-temporal or the cast fails.
|
|
||||||
///
|
|
||||||
/// Use this when the downstream consumer cannot handle Arrow Union
|
|
||||||
/// (Polars) and the column is known to be temporal-only.
|
|
||||||
CoalesceTemporals,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Combined projection options threaded through `project()` / `project_table()`.
|
/// Combined projection options threaded through `project()` / `project_table()`.
|
||||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||||
pub struct ProjectionOptions {
|
pub struct ProjectionOptions {
|
||||||
|
|
@ -81,9 +59,6 @@ pub struct ProjectionOptions {
|
||||||
pub string: StringProjection,
|
pub string: StringProjection,
|
||||||
pub list: ListProjection,
|
pub list: ListProjection,
|
||||||
pub union_mode: UnionMode,
|
pub union_mode: UnionMode,
|
||||||
/// How heterogeneous general lists are projected when child Arrow types
|
|
||||||
/// differ. Default: `Union` (faithful, lossless, may be rejected by Polars).
|
|
||||||
pub heterogeneous_list_mode: HeterogeneousListMode,
|
|
||||||
/// When `true`, q infinity sentinels (e.g. `0Wi`, `0Wj`, `0w`) are mapped
|
/// When `true`, q infinity sentinels (e.g. `0Wi`, `0Wj`, `0w`) are mapped
|
||||||
/// to Arrow nulls alongside the standard null sentinels. Default: `false`.
|
/// to Arrow nulls alongside the standard null sentinels. Default: `false`.
|
||||||
pub treat_infinity_as_null: bool,
|
pub treat_infinity_as_null: bool,
|
||||||
|
|
@ -102,7 +77,6 @@ impl Default for ProjectionOptions {
|
||||||
string: StringProjection::default(),
|
string: StringProjection::default(),
|
||||||
list: ListProjection::default(),
|
list: ListProjection::default(),
|
||||||
union_mode: UnionMode::default(),
|
union_mode: UnionMode::default(),
|
||||||
heterogeneous_list_mode: HeterogeneousListMode::default(),
|
|
||||||
treat_infinity_as_null: false,
|
treat_infinity_as_null: false,
|
||||||
parallel: true,
|
parallel: true,
|
||||||
assume_symbol_utf8: true,
|
assume_symbol_utf8: true,
|
||||||
|
|
|
||||||
|
|
@ -99,7 +99,6 @@ use rayon::prelude::*;
|
||||||
use crate::error::ProjectionError;
|
use crate::error::ProjectionError;
|
||||||
use crate::error::ProjectionResult;
|
use crate::error::ProjectionResult;
|
||||||
use crate::metadata::q_field;
|
use crate::metadata::q_field;
|
||||||
use crate::options::HeterogeneousListMode;
|
|
||||||
use crate::options::ListProjection;
|
use crate::options::ListProjection;
|
||||||
use crate::options::ProjectionOptions;
|
use crate::options::ProjectionOptions;
|
||||||
use crate::options::StringProjection;
|
use crate::options::StringProjection;
|
||||||
|
|
@ -965,55 +964,11 @@ fn project_list(list: &List, opts: &ProjectionOptions) -> ProjectionResult<Array
|
||||||
// Heterogeneous list → Union projection
|
// Heterogeneous list → Union projection
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
fn is_temporal_dt(dt: &DataType) -> bool {
|
|
||||||
matches!(
|
|
||||||
dt,
|
|
||||||
DataType::Timestamp(_, _)
|
|
||||||
| DataType::Date32
|
|
||||||
| DataType::Date64
|
|
||||||
| DataType::Time32(_)
|
|
||||||
| DataType::Time64(_)
|
|
||||||
| DataType::Duration(_)
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Attempts to coalesce a heterogeneous list of temporal Arrow arrays into a
|
|
||||||
/// flat `Timestamp(Nanosecond, None)` array. Returns `None` if any arm is
|
|
||||||
/// non-temporal or `arrow_cast::cast` rejects an arm. Callers fall back to
|
|
||||||
/// Union encoding in that case.
|
|
||||||
fn try_coalesce_temporals_to_ns(child_exports: &[ArrayExport]) -> Option<ArrayRef> {
|
|
||||||
let target = DataType::Timestamp(TimeUnit::Nanosecond, None);
|
|
||||||
let mut cast_arrays: Vec<ArrayRef> = Vec::with_capacity(child_exports.len());
|
|
||||||
for export in child_exports {
|
|
||||||
let dt = export.array.data_type();
|
|
||||||
if !is_temporal_dt(dt) {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
match arrow_cast::cast(export.array.as_ref(), &target) {
|
|
||||||
Ok(arr) => cast_arrays.push(arr),
|
|
||||||
Err(_) => return None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let refs: Vec<&dyn arrow_array::Array> = cast_arrays.iter().map(|a| a.as_ref()).collect();
|
|
||||||
arrow_select::concat::concat(&refs).ok()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn project_heterogeneous_list(
|
fn project_heterogeneous_list(
|
||||||
values: &[Value],
|
values: &[Value],
|
||||||
child_exports: Vec<ArrayExport>,
|
child_exports: Vec<ArrayExport>,
|
||||||
opts: &ProjectionOptions,
|
opts: &ProjectionOptions,
|
||||||
) -> ProjectionResult<ArrayExport> {
|
) -> ProjectionResult<ArrayExport> {
|
||||||
// Opt-in: collapse all-temporal heterogeneous lists into Timestamp(ns).
|
|
||||||
// Polars and similar consumers cannot ingest Arrow Union; this lets users
|
|
||||||
// accept the lossy promotion in exchange for a flat, ingestible column.
|
|
||||||
if opts.heterogeneous_list_mode == HeterogeneousListMode::CoalesceTemporals
|
|
||||||
&& let Some(flat) = try_coalesce_temporals_to_ns(&child_exports)
|
|
||||||
{
|
|
||||||
let dt = DataType::Timestamp(TimeUnit::Nanosecond, None);
|
|
||||||
let field = q_field(dt, false, "list", None, None, None);
|
|
||||||
return Ok(ArrayExport { array: flat, field });
|
|
||||||
}
|
|
||||||
|
|
||||||
// Assign a type_id (i8) to each unique Arrow DataType in insertion order.
|
// Assign a type_id (i8) to each unique Arrow DataType in insertion order.
|
||||||
let mut type_id_map: Vec<(DataType, i8)> = Vec::new();
|
let mut type_id_map: Vec<(DataType, i8)> = Vec::new();
|
||||||
let mut type_ids: Vec<i8> = Vec::with_capacity(values.len());
|
let mut type_ids: Vec<i8> = Vec::with_capacity(values.len());
|
||||||
|
|
@ -1203,70 +1158,3 @@ fn attribute_meta_str(a: Attribute) -> &'static str {
|
||||||
Attribute::Grouped => "grouped",
|
Attribute::Grouped => "grouped",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
use qroissant_core::Atom;
|
|
||||||
use qroissant_core::List;
|
|
||||||
|
|
||||||
fn mixed_temporal_list() -> Value {
|
|
||||||
Value::List(List::new(
|
|
||||||
Attribute::None,
|
|
||||||
vec![
|
|
||||||
Value::Atom(Atom::Timestamp(0)),
|
|
||||||
Value::Atom(Atom::Datetime(0.0)),
|
|
||||||
],
|
|
||||||
))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn heterogeneous_temporals_emit_union_by_default() {
|
|
||||||
let value = mixed_temporal_list();
|
|
||||||
let opts = ProjectionOptions::default();
|
|
||||||
let export = project(&value, &opts).expect("projection");
|
|
||||||
assert!(
|
|
||||||
matches!(export.array.data_type(), DataType::Union(_, _)),
|
|
||||||
"expected Union under default mode, got {:?}",
|
|
||||||
export.array.data_type()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn heterogeneous_temporals_coalesce_to_timestamp_ns() {
|
|
||||||
let value = mixed_temporal_list();
|
|
||||||
let opts = ProjectionOptions {
|
|
||||||
heterogeneous_list_mode: HeterogeneousListMode::CoalesceTemporals,
|
|
||||||
..ProjectionOptions::default()
|
|
||||||
};
|
|
||||||
let export = project(&value, &opts).expect("projection");
|
|
||||||
assert_eq!(
|
|
||||||
export.array.data_type(),
|
|
||||||
&DataType::Timestamp(TimeUnit::Nanosecond, None),
|
|
||||||
"expected flat Timestamp(ns) under CoalesceTemporals"
|
|
||||||
);
|
|
||||||
assert_eq!(export.array.len(), 2, "expected 2 rows, one per arm");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn coalesce_falls_back_to_union_for_non_temporal_arm() {
|
|
||||||
// Mixed temporal + int: not all-temporal, should fall through to Union.
|
|
||||||
let value = Value::List(List::new(
|
|
||||||
Attribute::None,
|
|
||||||
vec![
|
|
||||||
Value::Atom(Atom::Timestamp(0)),
|
|
||||||
Value::Atom(Atom::Int(42)),
|
|
||||||
],
|
|
||||||
));
|
|
||||||
let opts = ProjectionOptions {
|
|
||||||
heterogeneous_list_mode: HeterogeneousListMode::CoalesceTemporals,
|
|
||||||
..ProjectionOptions::default()
|
|
||||||
};
|
|
||||||
let export = project(&value, &opts).expect("projection");
|
|
||||||
assert!(
|
|
||||||
matches!(export.array.data_type(), DataType::Union(_, _)),
|
|
||||||
"non-temporal arm must force Union fallback, got {:?}",
|
|
||||||
export.array.data_type()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@
|
||||||
//! Each transform function uses `portable_simd` for the aligned middle of the
|
//! Each transform function uses `portable_simd` for the aligned middle of the
|
||||||
//! slice and falls back to a scalar loop for the head and tail.
|
//! slice and falls back to a scalar loop for the head and tail.
|
||||||
|
|
||||||
|
use std::simd::Select;
|
||||||
use std::simd::prelude::*;
|
use std::simd::prelude::*;
|
||||||
|
|
||||||
use crate::nulls::Q_NULL_DATE;
|
use crate::nulls::Q_NULL_DATE;
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,6 @@ use std::sync::Arc;
|
||||||
use pyo3::prelude::*;
|
use pyo3::prelude::*;
|
||||||
use pyo3::types::PyAny;
|
use pyo3::types::PyAny;
|
||||||
use pyo3::types::PyBytes;
|
use pyo3::types::PyBytes;
|
||||||
use qroissant_arrow::HeterogeneousListMode;
|
|
||||||
use qroissant_arrow::ListProjection;
|
use qroissant_arrow::ListProjection;
|
||||||
use qroissant_arrow::ProjectionOptions;
|
use qroissant_arrow::ProjectionOptions;
|
||||||
use qroissant_arrow::StringProjection;
|
use qroissant_arrow::StringProjection;
|
||||||
|
|
@ -57,11 +56,6 @@ pub fn decode_options_to_proj_opts(opts: Option<&DecodeOptions>) -> Arc<Projecti
|
||||||
crate::types::UnionMode::Dense => qroissant_arrow::UnionMode::Dense,
|
crate::types::UnionMode::Dense => qroissant_arrow::UnionMode::Dense,
|
||||||
crate::types::UnionMode::Sparse => qroissant_arrow::UnionMode::Sparse,
|
crate::types::UnionMode::Sparse => qroissant_arrow::UnionMode::Sparse,
|
||||||
},
|
},
|
||||||
heterogeneous_list_mode: if opts.coalesce_temporals_value() {
|
|
||||||
HeterogeneousListMode::CoalesceTemporals
|
|
||||||
} else {
|
|
||||||
HeterogeneousListMode::Union
|
|
||||||
},
|
|
||||||
treat_infinity_as_null: opts.treat_infinity_as_null(),
|
treat_infinity_as_null: opts.treat_infinity_as_null(),
|
||||||
parallel: opts.parallel_value(),
|
parallel: opts.parallel_value(),
|
||||||
assume_symbol_utf8: opts.assume_symbol_utf8_value(),
|
assume_symbol_utf8: opts.assume_symbol_utf8_value(),
|
||||||
|
|
|
||||||
|
|
@ -817,7 +817,6 @@ pub struct DecodeOptions {
|
||||||
validate_compressed_trailing_bytes: bool,
|
validate_compressed_trailing_bytes: bool,
|
||||||
temporal_nulls: bool,
|
temporal_nulls: bool,
|
||||||
treat_infinity_as_null: bool,
|
treat_infinity_as_null: bool,
|
||||||
coalesce_temporals: bool,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[pymethods]
|
#[pymethods]
|
||||||
|
|
@ -876,11 +875,6 @@ impl DecodeOptions {
|
||||||
fn get_treat_infinity_as_null(&self) -> bool {
|
fn get_treat_infinity_as_null(&self) -> bool {
|
||||||
self.treat_infinity_as_null
|
self.treat_infinity_as_null
|
||||||
}
|
}
|
||||||
|
|
||||||
#[getter]
|
|
||||||
fn coalesce_temporals(&self) -> bool {
|
|
||||||
self.coalesce_temporals
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for DecodeOptions {
|
impl Default for DecodeOptions {
|
||||||
|
|
@ -896,7 +890,6 @@ impl Default for DecodeOptions {
|
||||||
validate_compressed_trailing_bytes: true,
|
validate_compressed_trailing_bytes: true,
|
||||||
temporal_nulls: true,
|
temporal_nulls: true,
|
||||||
treat_infinity_as_null: false,
|
treat_infinity_as_null: false,
|
||||||
coalesce_temporals: false,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -933,10 +926,6 @@ impl DecodeOptions {
|
||||||
pub(crate) fn assume_symbol_utf8_value(&self) -> bool {
|
pub(crate) fn assume_symbol_utf8_value(&self) -> bool {
|
||||||
self.assume_symbol_utf8
|
self.assume_symbol_utf8
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn coalesce_temporals_value(&self) -> bool {
|
|
||||||
self.coalesce_temporals
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[pyclass(module = "qroissant", frozen, eq)]
|
#[pyclass(module = "qroissant", frozen, eq)]
|
||||||
|
|
@ -1007,12 +996,6 @@ impl DecodeOptionsBuilder {
|
||||||
next
|
next
|
||||||
}
|
}
|
||||||
|
|
||||||
fn with_coalesce_temporals(&self, value: bool) -> Self {
|
|
||||||
let mut next = self.clone();
|
|
||||||
next.options.coalesce_temporals = value;
|
|
||||||
next
|
|
||||||
}
|
|
||||||
|
|
||||||
fn build(&self) -> DecodeOptions {
|
fn build(&self) -> DecodeOptions {
|
||||||
self.options.clone()
|
self.options.clone()
|
||||||
}
|
}
|
||||||
|
|
@ -1066,11 +1049,6 @@ impl DecodeOptionsBuilder {
|
||||||
fn treat_infinity_as_null(&self) -> bool {
|
fn treat_infinity_as_null(&self) -> bool {
|
||||||
self.options.treat_infinity_as_null
|
self.options.treat_infinity_as_null
|
||||||
}
|
}
|
||||||
|
|
||||||
#[getter]
|
|
||||||
fn coalesce_temporals(&self) -> bool {
|
|
||||||
self.options.coalesce_temporals
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[pyclass(module = "qroissant", frozen, eq)]
|
#[pyclass(module = "qroissant", frozen, eq)]
|
||||||
|
|
|
||||||
|
|
@ -260,14 +260,6 @@ class DecodeOptions:
|
||||||
def treat_infinity_as_null(self) -> bool:
|
def treat_infinity_as_null(self) -> bool:
|
||||||
"""Whether ±∞ sentinels are mapped to ``None`` in Arrow arrays."""
|
"""Whether ±∞ sentinels are mapped to ``None`` in Arrow arrays."""
|
||||||
...
|
...
|
||||||
@property
|
|
||||||
def coalesce_temporals(self) -> bool:
|
|
||||||
"""Whether heterogeneous all-temporal lists are flattened to ``Timestamp(ns)``
|
|
||||||
instead of being emitted as Arrow ``Union``. Use this when the
|
|
||||||
downstream consumer (e.g. Polars) cannot ingest ``Union`` types.
|
|
||||||
Lossy: mixed precisions are promoted to nanoseconds.
|
|
||||||
"""
|
|
||||||
...
|
|
||||||
|
|
||||||
|
|
||||||
class DecodeOptionsBuilder:
|
class DecodeOptionsBuilder:
|
||||||
|
|
@ -311,15 +303,6 @@ class DecodeOptionsBuilder:
|
||||||
def with_treat_infinity_as_null(self, value: bool, /) -> DecodeOptionsBuilder:
|
def with_treat_infinity_as_null(self, value: bool, /) -> DecodeOptionsBuilder:
|
||||||
"""Set whether ±∞ sentinels are mapped to ``None`` in Arrow arrays."""
|
"""Set whether ±∞ sentinels are mapped to ``None`` in Arrow arrays."""
|
||||||
...
|
...
|
||||||
def with_coalesce_temporals(self, value: bool, /) -> DecodeOptionsBuilder:
|
|
||||||
"""Set whether heterogeneous all-temporal lists are flattened to
|
|
||||||
``Timestamp(ns)`` instead of being emitted as Arrow ``Union``.
|
|
||||||
|
|
||||||
Set to ``True`` for consumers (e.g. Polars) that reject Arrow union
|
|
||||||
types. Lossy: mixed precisions are promoted to nanoseconds. Default
|
|
||||||
``False`` (faithful ``Union`` representation).
|
|
||||||
"""
|
|
||||||
...
|
|
||||||
def build(self) -> DecodeOptions:
|
def build(self) -> DecodeOptions:
|
||||||
"""Finalize the builder into an immutable :class:`DecodeOptions` instance."""
|
"""Finalize the builder into an immutable :class:`DecodeOptions` instance."""
|
||||||
...
|
...
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue