Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,33 @@ jobs:
with:
token: ${{secrets.GITHUB_TOKEN}}
deny: warnings
stubtest:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
components: clippy
override: true
- uses: actions/setup-python@v6
with:
python-version: 3.x
- name: Install uv
uses: astral-sh/setup-uv@v7
- id: setup-venv
name: Setup virtualenv
run: python -m venv .venv
- name: Build lib
uses: PyO3/maturin-action@v1
with:
command: dev --uv
sccache: true
- name: Run stubtest
run: |
set -e
source .venv/bin/activate
stubtest --ignore-disjoint-bases natsrpy
pytest:
runs-on: ubuntu-latest
steps:
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,5 @@ docs/_build/

# Pyenv
.python-version
.venv/
target/
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async-nats = "0.46"
bytes = "1.11.1"
futures-util = "0.3.32"
log = "0.4.29"
pyo3 = { version = "0.28", features = ["abi3"] }
pyo3 = { version = "0.28", features = ["abi3", "experimental-inspect"] }
pyo3-async-runtimes = { version = "0.28", features = ["tokio-runtime"] }
pyo3-log = "0.13.3"
thiserror = "2.0.18"
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ email = "s3riussan@gmail.com"
[dependency-groups]
dev = [
"anyio>=4,<5",
"mypy>=1.19.1,<2",
"pytest>=9,<10",
"pytest-xdist>=3,<4",
]
Expand Down Expand Up @@ -110,6 +111,8 @@ ignore = [
"SLF001", # Private member accessed
"S311", # Standard pseudo-random generators are not suitable for security/cryptographic purposes
"D101", # Missing docstring in public class
"PLR2004", # Magic value used in comparison
"B017", # Do not assert blind exception
]

[tool.ruff.lint.pydocstyle]
Expand Down
77 changes: 63 additions & 14 deletions python/natsrpy/_natsrpy_rs/__init__.pyi
Original file line number Diff line number Diff line change
@@ -1,35 +1,66 @@
from collections.abc import Awaitable, Callable
from datetime import timedelta
from typing import Any, overload
from typing import Any, final, overload

from natsrpy._natsrpy_rs.js import JetStream
from natsrpy._natsrpy_rs.message import Message
from typing_extensions import Self

from . import js

@final
class Message:
"""
Simple NATS message.

Attributes:
subject: subject where message was published
reply: subject where reply should be sent, if any
payload: message payload
headers: dictionary of message headers,
every value can be a simple value or a list.
status: status is used for reply messages to indicate the status of the reply.
It is None for regular messages.
description: message description is used for reply messages to
provide additional information about the status.
length: a length of the message payload in bytes.
"""

subject: str
reply: str | None
payload: bytes
headers: dict[str, Any]
status: int | None
description: str | None
length: int

@final
class IteratorSubscription:
def __aiter__(self) -> IteratorSubscription: ...
async def __anext__(self) -> Message: ...
async def next(self, timeout: float | timedelta | None = None) -> Message: ...
async def unsubscribe(self, limit: int | None = None) -> None: ...
async def drain(self) -> None: ...

@final
class CallbackSubscription:
async def unsubscribe(self, limit: int | None = None) -> None: ...
async def drain(self) -> None: ...

@final
class Nats:
def __init__(
self,
def __new__(
cls,
/,
addrs: list[str] = ["nats://localhost:4222"],
addrs: list[str] | None = None,
user_and_pass: tuple[str, str] | None = None,
nkey: str | None = None,
token: str | None = None,
custom_inbox_prefix: str | None = None,
read_buffer_capacity: int = 65535,
sender_capacity: int = 128,
read_buffer_capacity: int = ..., # 65535 bytes
sender_capacity: int = ..., # 128 bytes
max_reconnects: int | None = None,
connection_timeout: float | timedelta = ...,
request_timeout: float | timedelta = ...,
) -> None: ...
connection_timeout: float | timedelta = ..., # 5 sec
request_timeout: float | timedelta = ..., # 10 sec
) -> Self: ...
async def startup(self) -> None: ...
async def shutdown(self) -> None: ...
async def publish(
Expand All @@ -41,7 +72,15 @@ class Nats:
reply: str | None = None,
err_on_disconnect: bool = False,
) -> None: ...
async def request(self, subject: str, payload: bytes) -> None: ...
async def request(
self,
subject: str,
payload: bytes | str | bytearray | memoryview,
*,
headers: dict[str, Any] | None = None,
inbox: str | None = None,
timeout: float | timedelta | None = None,
) -> None: ...
async def drain(self) -> None: ...
async def flush(self) -> None: ...
@overload
Expand All @@ -56,6 +95,16 @@ class Nats:
subject: str,
callback: None = None,
) -> IteratorSubscription: ...
async def jetstream(self) -> JetStream: ...
async def jetstream(
self,
*,
domain: str | None = None,
api_prefix: str | None = None,
timeout: timedelta | None = None,
ack_timeout: timedelta | None = None,
concurrency_limit: int | None = None,
max_ack_inflight: int | None = None,
backpressure_on_inflight: bool | None = None,
) -> js.JetStream: ...

__all__ = ["CallbackSubscription", "IteratorSubscription", "Message", "Nats"]
__all__ = ["CallbackSubscription", "IteratorSubscription", "Message", "Nats", "js"]
47 changes: 45 additions & 2 deletions python/natsrpy/_natsrpy_rs/js/__init__.pyi
Original file line number Diff line number Diff line change
@@ -1,25 +1,68 @@
from datetime import datetime, timedelta
from typing import Any
from typing import Any, Literal, final, overload

from . import consumers, kv, managers, object_store, stream
from .managers import KVManager, ObjectStoreManager, StreamsManager

__all__ = [
"JetStream",
"JetStreamMessage",
"Publication",
"consumers",
"kv",
"managers",
"object_store",
"stream",
]

@final
class Publication:
stream: str
sequence: int
domain: str
duplicate: bool
value: str | None

@final
class JetStream:
@overload
async def publish(
self,
subject: str,
payload: str | bytes | bytearray | memoryview,
*,
headers: dict[str, str] | None = None,
reply: str | None = None,
err_on_disconnect: bool = False,
wait: Literal[True],
) -> Publication: ...
@overload
async def publish(
self,
subject: str,
payload: str | bytes | bytearray | memoryview,
*,
headers: dict[str, str] | None = None,
err_on_disconnect: bool = False,
wait: Literal[False] = False,
) -> None: ...
@overload
async def publish(
self,
subject: str,
payload: str | bytes | bytearray | memoryview,
*,
headers: dict[str, str] | None = None,
err_on_disconnect: bool = False,
wait: bool = False,
) -> Publication | None: ...
@property
def kv(self) -> KVManager: ...
@property
def streams(self) -> StreamsManager: ...
@property
def object_store(self) -> ObjectStoreManager: ...

@final
class JetStreamMessage:
@property
def subject(self) -> str: ...
Expand Down
35 changes: 29 additions & 6 deletions python/natsrpy/_natsrpy_rs/js/consumers.pyi
Original file line number Diff line number Diff line change
@@ -1,7 +1,22 @@
from datetime import timedelta
from typing import final

from natsrpy._natsrpy_rs.js import JetStreamMessage
from typing_extensions import Self

__all__ = [
"AckPolicy",
"DeliverPolicy",
"MessagesIterator",
"PriorityPolicy",
"PullConsumer",
"PullConsumerConfig",
"PushConsumer",
"PushConsumerConfig",
"ReplayPolicy",
]

@final
class DeliverPolicy:
ALL: DeliverPolicy
LAST: DeliverPolicy
Expand All @@ -10,21 +25,25 @@ class DeliverPolicy:
BY_START_TIME: DeliverPolicy
LAST_PER_SUBJECT: DeliverPolicy

@final
class AckPolicy:
EXPLICIT: AckPolicy
NONE: AckPolicy
ALL: AckPolicy

@final
class ReplayPolicy:
INSTANT: ReplayPolicy
ORIGINAL: ReplayPolicy

@final
class PriorityPolicy:
NONE: PriorityPolicy
OVERFLOW: PriorityPolicy
PINNED_CLIENT: PriorityPolicy
PRIORITIZED: PriorityPolicy

@final
class PullConsumerConfig:
name: str | None
durable_name: str | None
Expand Down Expand Up @@ -55,8 +74,8 @@ class PullConsumerConfig:
priority_groups: list[str]
pause_until: int | None

def __init__(
self,
def __new__(
cls,
name: str | None = None,
durable_name: str | None = None,
description: str | None = None,
Expand Down Expand Up @@ -85,8 +104,9 @@ class PullConsumerConfig:
priority_policy: PriorityPolicy | None = None,
priority_groups: list[str] | None = None,
pause_until: int | None = None,
) -> None: ...
) -> Self: ...

@final
class PushConsumerConfig:
deliver_subject: str
name: str | None
Expand Down Expand Up @@ -116,8 +136,8 @@ class PushConsumerConfig:
inactive_threshold: timedelta
pause_until: int | None

def __init__(
self,
def __new__(
cls,
deliver_subject: str,
name: str | None = None,
durable_name: str | None = None,
Expand Down Expand Up @@ -145,8 +165,9 @@ class PushConsumerConfig:
backoff: list[timedelta] | None = None,
inactive_threshold: timedelta | None = None,
pause_until: int | None = None,
) -> None: ...
) -> Self: ...

@final
class MessagesIterator:
def __aiter__(self) -> MessagesIterator: ...
async def __anext__(self) -> JetStreamMessage: ...
Expand All @@ -155,9 +176,11 @@ class MessagesIterator:
timeout: float | timedelta | None = None,
) -> JetStreamMessage: ...

@final
class PushConsumer:
async def messages(self) -> MessagesIterator: ...

@final
class PullConsumer:
async def fetch(
self,
Expand Down
16 changes: 13 additions & 3 deletions python/natsrpy/_natsrpy_rs/js/kv.pyi
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
from typing import final

from natsrpy._natsrpy_rs.js.stream import Placement, Republish, Source, StorageType
from typing_extensions import Self

__all__ = [
"KVConfig",
"KeyValue",
]

@final
class KVConfig:
"""
KV bucket config.
Expand All @@ -23,8 +32,8 @@ class KVConfig:
placement: Placement | None
limit_markers: float | None

def __init__(
self,
def __new__(
cls,
bucket: str,
description: str | None = None,
max_value_size: int | None = None,
Expand All @@ -40,8 +49,9 @@ class KVConfig:
compression: bool | None = None,
placement: Placement | None = None,
limit_markers: float | None = None,
) -> None: ...
) -> Self: ...

@final
class KeyValue:
@property
def stream_name(self) -> str: ...
Expand Down
Loading
Loading