Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion python/natsrpy/_natsrpy_rs/js/__init__.pyi
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import timedelta
from datetime import datetime, timedelta
from typing import Any

from .managers import KVManager, ObjectStoreManager, StreamsManager
Expand Down Expand Up @@ -29,6 +29,26 @@ class JetStreamMessage:
def payload(self) -> bytes: ...
@property
def headers(self) -> dict[str, Any]: ...
@property
def domain(self) -> str | None: ...
@property
def acc_hash(self) -> str | None: ...
@property
def stream(self) -> str: ...
@property
def consumer(self) -> str: ...
@property
def stream_sequence(self) -> int: ...
@property
def consumer_sequence(self) -> int: ...
@property
def delivered(self) -> int: ...
@property
def pending(self) -> int: ...
@property
def published(self) -> datetime: ...
@property
def token(self) -> str | None: ...
async def ack(self, double: bool = False) -> None:
"""
Acknowledge that a message was handled.
Expand Down
92 changes: 90 additions & 2 deletions src/js/message.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,65 @@
use pyo3::{
Bound, Py, PyAny, Python,
types::{PyBytes, PyDict},
types::{PyBytes, PyDateTime, PyDict},
};
use std::sync::Arc;
use tokio::sync::RwLock;

use crate::{
exceptions::rust_err::{NatsrpyError, NatsrpyResult},
utils::{natsrpy_future, py_types::TimeValue},
utils::{
natsrpy_future,
py_types::{TimeValue, ToPyDate},
},
};

#[derive(Debug, Clone)]
pub struct JSInfo {
pub domain: Option<String>,
pub acc_hash: Option<String>,
pub stream: String,
pub consumer: String,
pub stream_sequence: u64,
pub consumer_sequence: u64,
pub delivered: i64,
pub pending: u64,
pub published: time::OffsetDateTime,
pub token: Option<String>,
}

impl From<async_nats::jetstream::message::Info<'_>> for JSInfo {
fn from(value: async_nats::jetstream::message::Info) -> Self {
Self {
domain: value.domain.map(ToString::to_string),
acc_hash: value.acc_hash.map(ToString::to_string),
stream: value.stream.to_string(),
consumer: value.consumer.to_string(),
stream_sequence: value.stream_sequence,
consumer_sequence: value.consumer_sequence,
delivered: value.delivered,
pending: value.pending,
published: value.published,
token: value.token.map(ToString::to_string),
}
}
}

#[pyo3::pyclass]
pub struct JetStreamMessage {
message: crate::message::Message,
info: JSInfo,
acker: Arc<RwLock<async_nats::jetstream::message::Acker>>,
}

impl TryFrom<async_nats::jetstream::Message> for JetStreamMessage {
type Error = NatsrpyError;

fn try_from(value: async_nats::jetstream::Message) -> Result<Self, Self::Error> {
let js_info = JSInfo::from(value.info()?);
let (message, acker) = value.split();
Ok(Self {
message: message.try_into()?,
info: js_info,
acker: Arc::new(RwLock::new(acker)),
})
}
Expand Down Expand Up @@ -69,6 +106,57 @@ impl JetStreamMessage {
&self.message.headers
}

#[getter]
pub const fn domain(&mut self) -> &Option<String> {
&self.info.domain
}

#[getter]
#[must_use]
pub const fn acc_hash(&self) -> &Option<String> {
&self.info.acc_hash
}
#[getter]
#[must_use]
pub const fn stream(&self) -> &str {
self.info.stream.as_str()
}
#[getter]
#[must_use]
pub const fn consumer(&self) -> &str {
self.info.consumer.as_str()
}
#[getter]
#[must_use]
pub const fn stream_sequence(&self) -> u64 {
self.info.stream_sequence
}
#[getter]
#[must_use]
pub const fn consumer_sequence(&self) -> u64 {
self.info.consumer_sequence
}
#[getter]
#[must_use]
pub const fn delivered(&self) -> i64 {
self.info.delivered
}
#[getter]
#[must_use]
pub const fn pending(&self) -> u64 {
self.info.pending
}
#[getter]
pub fn published<'py>(&self, py: Python<'py>) -> NatsrpyResult<Bound<'py, PyDateTime>> {
Ok(self.info.published.to_py_date(py)?)
}

#[getter]
#[must_use]
pub const fn token(&self) -> &Option<String> {
&self.info.token
}

#[pyo3(signature=(double=false))]
pub fn ack<'py>(&self, py: Python<'py>, double: bool) -> NatsrpyResult<Bound<'py, PyAny>> {
self.inner_ack(py, async_nats::jetstream::message::AckKind::Ack, double)
Expand Down
19 changes: 3 additions & 16 deletions src/js/stream.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use pyo3::{
Py,
types::{PyBytes, PyDateTime, PyDict, PyTzInfo},
types::{PyBytes, PyDateTime, PyDict},
};
use std::{collections::HashMap, ops::Deref, sync::Arc, time::Duration};

use crate::{
exceptions::rust_err::{NatsrpyError, NatsrpyResult},
js::managers::consumers::ConsumersManager,
utils::{headers::NatsrpyHeadermapExt, natsrpy_future},
utils::{headers::NatsrpyHeadermapExt, natsrpy_future, py_types::ToPyDate},
};
use pyo3::{Bound, PyAny, Python};
use tokio::sync::RwLock;
Expand Down Expand Up @@ -801,25 +801,12 @@ impl StreamMessage {
py: Python,
msg: &async_nats::jetstream::message::StreamMessage,
) -> NatsrpyResult<Self> {
let time = msg.time.to_utc();
let tz_info = PyTzInfo::utc(py)?;
let time = PyDateTime::new(
py,
time.year(),
time.month().into(),
time.day(),
time.hour(),
time.minute(),
time.second(),
time.microsecond(),
Some(&*tz_info),
)?;
Ok(Self {
subject: msg.subject.to_string(),
payload: PyBytes::new(py, &msg.payload).unbind(),
headers: msg.headers.to_pydict(py)?.unbind(),
sequence: msg.sequence,
time: time.unbind(),
time: msg.time.to_py_date(py)?.unbind(),
})
}
}
Expand Down
26 changes: 24 additions & 2 deletions src/utils/py_types.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::time::Duration;

use pyo3::{
FromPyObject,
types::{PyBytes, PyBytesMethods},
Bound, FromPyObject, PyResult, Python,
types::{PyBytes, PyBytesMethods, PyDateTime, PyTzInfo},
};

use crate::exceptions::rust_err::NatsrpyError;
Expand Down Expand Up @@ -74,3 +74,25 @@ impl<'py> FromPyObject<'_, 'py> for TimeValue {
}
}
}

pub trait ToPyDate {
fn to_py_date<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyDateTime>>;
}

impl ToPyDate for time::OffsetDateTime {
fn to_py_date<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyDateTime>> {
let time = self.to_utc();
let tz_info = PyTzInfo::utc(py)?;
PyDateTime::new(
py,
time.year(),
time.month().into(),
time.day(),
time.hour(),
time.minute(),
time.second(),
time.microsecond(),
Some(&*tz_info),
)
}
}
Loading