From f24af467ec4ba1dad0e599a28d5bc51f9aa1156e Mon Sep 17 00:00:00 2001 From: CamZalewski Date: Wed, 20 May 2026 14:13:41 +0100 Subject: [PATCH] fix: align typed column buffers to T in decode paths cast_slice:: panics with TargetAlignmentGreaterAndInputNotAligned on KDB IPC payloads where a variable-length column leaves a numeric column at a misaligned wire offset. The sync decode path's alignment fallback used Bytes::copy_from_slice (Vec layout, align=1), which only happens to work because most allocators over-align byte blocks -- not guaranteed by Rust's allocator API. The async pipelined path went through read_bytes(len * size) directly, with no alignment branch at all, and panicked in arrow projection's as_*_slice on Windows release builds under AsyncPool.query. Both paths now back typed columns with Vec (Layout::array:: guarantees align_of::()), exposed as bytes::Bytes via a new AlignedTBuf AsRef<[u8]> owner passed to Bytes::from_owner. Sync fallback uses the same wrapper. Pipelined typed reads route through a new read_typed_bytes:: helper that swaps in for every typed Primitive arm in decode_vector_async. Regression test in pipelined::tests constructs a table with an odd- length symbol column followed by Long, exercising the previously panicking path. --- crates/qroissant-core/src/decode.rs | 26 ++++-- crates/qroissant-core/src/pipelined.rs | 106 ++++++++++++++++++++++--- 2 files changed, 114 insertions(+), 18 deletions(-) diff --git a/crates/qroissant-core/src/decode.rs b/crates/qroissant-core/src/decode.rs index a3aad59..1708e44 100644 --- a/crates/qroissant-core/src/decode.rs +++ b/crates/qroissant-core/src/decode.rs @@ -70,6 +70,16 @@ impl Default for DecodeOptions { } } +/// Owns a `Vec` and exposes it as `&[u8]` so `bytes::Bytes::from_owner` +/// can keep a T-aligned allocation alive while presenting a byte view. +struct AlignedTBuf(Vec); + +impl AsRef<[u8]> for AlignedTBuf { + fn as_ref(&self) -> &[u8] { + bytemuck::cast_slice(&self.0) + } +} + struct BodyReader { bytes: bytes::Bytes, offset: usize, @@ -128,8 +138,13 @@ impl BodyReader { /// Returns a `Bytes` wrapper of `count * size_of::()` bytes, aligned for `T`. /// /// If the current offset is already aligned for `T`, this is zero-copy - /// (a `Bytes::slice`). Otherwise it copies into a new aligned allocation. - fn read_bytes_aligned(&mut self, count: usize) -> CoreResult { + /// (a `Bytes::slice`). Otherwise it copies into a `Vec`-backed allocation, + /// guaranteeing T-alignment regardless of the global allocator's behavior + /// for `Vec` (whose layout only requires align=1). + fn read_bytes_aligned(&mut self, count: usize) -> CoreResult + where + T: bytemuck::Pod + Send + Sync + 'static, + { let byte_len = count .checked_mul(std::mem::size_of::()) .ok_or(CoreError::LengthOverflow(count))?; @@ -143,11 +158,12 @@ impl BodyReader { let ptr = self.bytes[self.offset..].as_ptr(); let align = std::mem::align_of::(); let result = if (ptr as usize) % align == 0 { - // Already aligned — zero-copy slice. self.bytes.slice(self.offset..end) } else { - // Misaligned — must copy into an aligned allocation. - bytes::Bytes::copy_from_slice(&self.bytes[self.offset..end]) + let mut aligned = vec![T::zeroed(); count]; + let dst: &mut [u8] = bytemuck::cast_slice_mut(&mut aligned); + dst.copy_from_slice(&self.bytes[self.offset..end]); + bytes::Bytes::from_owner(AlignedTBuf(aligned)) }; self.offset = end; Ok(result) diff --git a/crates/qroissant-core/src/pipelined.rs b/crates/qroissant-core/src/pipelined.rs index a18a4e4..800f509 100644 --- a/crates/qroissant-core/src/pipelined.rs +++ b/crates/qroissant-core/src/pipelined.rs @@ -129,6 +129,27 @@ impl PipelinedReader { self.reader.read_exact(dst).await?; Ok(values) } + + /// Reads `count` elements of `T` into a T-aligned allocation and exposes + /// the result as a `bytes::Bytes`. Required for downstream `cast_slice` + /// callers (Arrow projection, `VectorData::as_*_slice`) that need the + /// payload aligned to `align_of::()`. A raw `Vec` would only + /// guarantee align=1 and can panic on `cast_slice::`. + pub async fn read_typed_bytes(&mut self, count: usize) -> CoreResult + where + T: bytemuck::Pod + bytemuck::AnyBitPattern + Send + Sync + 'static, + { + let values: Vec = self.read_vec::(count).await?; + Ok(bytes::Bytes::from_owner(AlignedTBuf(values))) + } +} + +struct AlignedTBuf(Vec); + +impl AsRef<[u8]> for AlignedTBuf { + fn as_ref(&self) -> &[u8] { + bytemuck::cast_slice(&self.0) + } } pub async fn decode_value_async( @@ -249,11 +270,11 @@ async fn decode_vector_async( VectorData::Guid(reader.read_bytes(byte_len).await?) } Primitive::Byte => VectorData::Byte(reader.read_bytes(length).await?), - Primitive::Short => VectorData::Short(reader.read_bytes(length * 2).await?), - Primitive::Int => VectorData::Int(reader.read_bytes(length * 4).await?), - Primitive::Long => VectorData::Long(reader.read_bytes(length * 8).await?), - Primitive::Real => VectorData::Real(reader.read_bytes(length * 4).await?), - Primitive::Float => VectorData::Float(reader.read_bytes(length * 8).await?), + Primitive::Short => VectorData::Short(reader.read_typed_bytes::(length).await?), + Primitive::Int => VectorData::Int(reader.read_typed_bytes::(length).await?), + Primitive::Long => VectorData::Long(reader.read_typed_bytes::(length).await?), + Primitive::Real => VectorData::Real(reader.read_typed_bytes::(length).await?), + Primitive::Float => VectorData::Float(reader.read_typed_bytes::(length).await?), Primitive::Char => VectorData::Char(reader.read_bytes(length).await?), Primitive::Symbol => { let mut values = Vec::with_capacity(length); @@ -262,14 +283,14 @@ async fn decode_vector_async( } VectorData::Symbol(values) } - Primitive::Timestamp => VectorData::Timestamp(reader.read_bytes(length * 8).await?), - Primitive::Month => VectorData::Month(reader.read_bytes(length * 4).await?), - Primitive::Date => VectorData::Date(reader.read_bytes(length * 4).await?), - Primitive::Datetime => VectorData::Datetime(reader.read_bytes(length * 8).await?), - Primitive::Timespan => VectorData::Timespan(reader.read_bytes(length * 8).await?), - Primitive::Minute => VectorData::Minute(reader.read_bytes(length * 4).await?), - Primitive::Second => VectorData::Second(reader.read_bytes(length * 4).await?), - Primitive::Time => VectorData::Time(reader.read_bytes(length * 4).await?), + Primitive::Timestamp => VectorData::Timestamp(reader.read_typed_bytes::(length).await?), + Primitive::Month => VectorData::Month(reader.read_typed_bytes::(length).await?), + Primitive::Date => VectorData::Date(reader.read_typed_bytes::(length).await?), + Primitive::Datetime => VectorData::Datetime(reader.read_typed_bytes::(length).await?), + Primitive::Timespan => VectorData::Timespan(reader.read_typed_bytes::(length).await?), + Primitive::Minute => VectorData::Minute(reader.read_typed_bytes::(length).await?), + Primitive::Second => VectorData::Second(reader.read_typed_bytes::(length).await?), + Primitive::Time => VectorData::Time(reader.read_typed_bytes::(length).await?), Primitive::Mixed => unreachable!("mixed values are not encoded as vectors"), }; @@ -371,6 +392,65 @@ mod tests { )); } + /// Regression: KDB IPC packs columns with no inter-column padding. A + /// symbol col of odd byte length leaves the following numeric column + /// at a misaligned wire offset. Async pipelined decode used to back + /// typed columns with a plain `Vec` (align=1), which let arrow + /// `cast_slice::` panic on consumers / allocators that don't + /// happen to over-align byte allocations. + #[tokio::test] + async fn test_decode_misaligned_table_async() -> CoreResult<()> { + let mut data = Vec::new(); + data.push(TypeCode::Table as u8); + data.push(0_u8); + + data.push(TypeCode::Dictionary as u8); + + data.push(TypeCode::SymbolVector as u8); + data.push(0_u8); + data.extend_from_slice(&2_i32.to_le_bytes()); + data.extend_from_slice(b"sym\0"); + data.extend_from_slice(b"val\0"); + + data.push(TypeCode::GeneralList as u8); + data.push(0_u8); + data.extend_from_slice(&2_i32.to_le_bytes()); + + // Column 1: Symbol vector of length 2 with odd total payload to + // throw downstream Long column off 8-byte alignment. + data.push(TypeCode::SymbolVector as u8); + data.push(0_u8); + data.extend_from_slice(&2_i32.to_le_bytes()); + data.extend_from_slice(b"ab\0"); + data.extend_from_slice(b"cdef\0"); + + // Column 2: Long vector [123, 456] — wire-offset now misaligned. + data.push(TypeCode::LongVector as u8); + data.push(0_u8); + data.extend_from_slice(&2_i32.to_le_bytes()); + data.extend_from_slice(&123_i64.to_le_bytes()); + data.extend_from_slice(&456_i64.to_le_bytes()); + + let mut reader = PipelinedReader::new(Cursor::new(data), Encoding::LittleEndian).unwrap(); + let value = decode_value_async(&mut reader).await?; + + match &value { + Value::Table(table) => { + assert_eq!(table.num_columns(), 2); + match &table.columns()[1] { + Value::Vector(v) => { + // This call previously panicked under cast_slice on + // misaligned backing storage. + assert_eq!(v.data().as_i64_slice(), &[123, 456]); + } + _ => panic!("Expected Long Vector"), + } + } + _ => panic!("Expected Table, got {:?}", value), + } + Ok(()) + } + #[tokio::test] async fn test_negative_length_gives_proper_error() -> CoreResult<()> { let mut data = Vec::new();