use std::sync::Arc; use pyo3::prelude::*; use pyo3::types::PyAny; use pyo3::types::PyBytes; use qroissant_arrow::ListProjection; use qroissant_arrow::ProjectionOptions; use qroissant_arrow::StringProjection; use qroissant_arrow::SymbolProjection; use qroissant_core::DecodeOptions as CoreDecodeOptions; use qroissant_core::Value as CoreValue; use qroissant_core::decode_message_with_options; use qroissant_core::encode_message; use qroissant_transport::extract_q_error; use crate::errors::PythonError; use crate::errors::PythonResult; use crate::errors::to_py_err; use crate::types::Compression; use crate::types::DecodeOptions; use crate::types::EncodeOptions; use crate::types::Encoding; use crate::types::ListInterpretation; use crate::types::MessageType; use crate::types::StringInterpretation; use crate::types::SymbolInterpretation; use crate::values::core_value_to_python_with_opts; use crate::values::python_to_core_value; /// Maps Python-facing "Interpretation" options to Rust-internal "Projection" options. /// /// The Python API uses "Interpretation" (e.g. `SymbolInterpretation`) as it describes /// how the user wants data to be interpreted. The Rust/Arrow layer uses "Projection" /// (e.g. `SymbolProjection`) as it describes how values are projected into Arrow arrays. /// Both refer to the same concept viewed from different perspectives. pub fn decode_options_to_proj_opts(opts: Option<&DecodeOptions>) -> Arc { let opts = opts.map(|o| o.clone()).unwrap_or_default(); Arc::new(ProjectionOptions { symbol: match opts.symbol_interpretation_value() { SymbolInterpretation::Utf8 => SymbolProjection::Utf8, SymbolInterpretation::LargeUtf8 => SymbolProjection::LargeUtf8, SymbolInterpretation::Utf8View => SymbolProjection::Utf8View, SymbolInterpretation::Dictionary => SymbolProjection::Dictionary, SymbolInterpretation::RawBytes => SymbolProjection::RawBytes, }, string: match opts.string_interpretation_value() { StringInterpretation::Utf8 => StringProjection::Utf8, StringInterpretation::Binary => StringProjection::Binary, }, list: match opts.list_interpretation_value() { ListInterpretation::List => ListProjection::List, ListInterpretation::LargeList => ListProjection::LargeList, ListInterpretation::ListView => ListProjection::ListView, }, union_mode: match opts.union_mode_value() { crate::types::UnionMode::Dense => qroissant_arrow::UnionMode::Dense, crate::types::UnionMode::Sparse => qroissant_arrow::UnionMode::Sparse, }, treat_infinity_as_null: opts.treat_infinity_as_null(), parallel: opts.parallel_value(), assume_symbol_utf8: opts.assume_symbol_utf8_value(), }) } fn decode_options_to_core(opts: &DecodeOptions) -> CoreDecodeOptions { CoreDecodeOptions { parallel: opts.parallel_value(), ..CoreDecodeOptions::default() } } fn ensure_default_encode_options(options: Option<&EncodeOptions>) -> PythonResult<()> { if let Some(options) = options && options != &EncodeOptions::default() { return Err(PythonError::NotImplemented( "custom encode options are not implemented yet".to_string(), )); } Ok(()) } pub fn decode_core_value( payload: bytes::Bytes, options: Option<&DecodeOptions>, ) -> PythonResult<(CoreValue, Arc)> { if let Some(message) = extract_q_error(payload.as_ref()).map_err(crate::errors::map_transport_error)? { return Err(PythonError::QRuntime(message)); } let core_opts = options.map(decode_options_to_core).unwrap_or_default(); let decoded = decode_message_with_options(payload, &core_opts) .map_err(|error| PythonError::Decode(error.to_string()))?; let proj_opts = decode_options_to_proj_opts(options); let (_header, value) = decoded.into_parts(); Ok((value, proj_opts)) } /// Wraps a Python `bytes` object in a [`bytes::Bytes`] without copying. /// /// CPython `bytes` objects are immutable and their backing memory is never /// moved, so it is sound to hold a raw pointer into them as long as the /// `Py` reference (which increments the CPython refcount) is alive. struct PinnedPyBytes { _owner: Py, ptr: *const u8, len: usize, } // SAFETY: `Py` is `Send`, and the pointed-to memory is immutable. unsafe impl Send for PinnedPyBytes {} // SAFETY: The data is immutable and the owner keeps it alive. unsafe impl Sync for PinnedPyBytes {} impl AsRef<[u8]> for PinnedPyBytes { #[inline] fn as_ref(&self) -> &[u8] { // SAFETY: `ptr` is valid for `len` bytes while `_owner` keeps the // CPython bytes object alive (refcount > 0, no deallocation possible). unsafe { std::slice::from_raw_parts(self.ptr, self.len) } } } /// Minimum payload size for the zero-copy `PinnedPyBytes` path. /// /// For small payloads the `Arc` allocation inside `Bytes::from_owner` costs /// more than a plain `memcpy`, so we fall back to copying below this threshold. const ZERO_COPY_MIN_BYTES: usize = 32 * 1024; // 32 KB /// Converts a Python `bytes`-like object into a [`bytes::Bytes`]. /// /// For plain `bytes` objects ≥ [`ZERO_COPY_MIN_BYTES`] the underlying buffer /// is **borrowed without copying** via [`bytes::Bytes::from_owner`]. /// Smaller payloads and other buffer protocols (bytearray, memoryview) take a /// single copy — same cost as before. fn payload_to_bytes(payload: &Bound<'_, PyAny>) -> PyResult { if let Ok(pb) = payload.downcast::() { let data = pb.as_bytes(); if data.len() >= ZERO_COPY_MIN_BYTES { let pinned = PinnedPyBytes { _owner: pb.clone().unbind(), ptr: data.as_ptr(), len: data.len(), }; return Ok(bytes::Bytes::from_owner(pinned)); } return Ok(bytes::Bytes::copy_from_slice(data)); } Ok(bytes::Bytes::from(payload.extract::>()?)) } pub fn encode_core_value_bytes( value: &CoreValue, options: Option<&EncodeOptions>, encoding: Encoding, message_type: MessageType, compression: Compression, ) -> PythonResult> { ensure_default_encode_options(options)?; encode_message( value, encoding.into(), message_type.into(), compression.into(), ) .map_err(|error| PythonError::Protocol(error.to_string())) } #[pyfunction] #[pyo3(signature = (payload, /, *, options=None))] pub fn decode( py: Python<'_>, payload: &Bound<'_, PyAny>, options: Option<&DecodeOptions>, ) -> PyResult> { let bytes = payload_to_bytes(payload)?; let options_clone = options.cloned(); let (value, proj_opts) = py .detach(|| decode_core_value(bytes, options_clone.as_ref())) .map_err(to_py_err)?; core_value_to_python_with_opts(py, value, proj_opts) } #[pyfunction] #[pyo3(signature = (value, /, *, options=None, encoding=Encoding::LittleEndian, message_type=MessageType::Asynchronous, compression=Compression::Uncompressed))] pub fn encode( py: Python<'_>, value: &Bound<'_, PyAny>, options: Option<&EncodeOptions>, encoding: Encoding, message_type: MessageType, compression: Compression, ) -> PyResult> { let value = python_to_core_value(value)?; let options_clone = options.cloned(); let payload = py .detach(|| { encode_core_value_bytes( &value, options_clone.as_ref(), encoding, message_type, compression, ) }) .map_err(to_py_err)?; Ok(PyBytes::new(py, &payload).unbind()) } pub fn register(module: &Bound<'_, PyModule>) -> PyResult<()> { module.add_function(wrap_pyfunction!(decode, module)?)?; module.add_function(wrap_pyfunction!(encode, module)?)?; Ok(()) }