Source code for app.core.nats

"""
NATS client singleton for the application
"""

import nats
from nats.js import JetStreamContext
from typing import Optional
import structlog
from app.core.config import settings

logger = structlog.get_logger()


[docs] class NATSClient: """NATS client wrapper with connection management"""
[docs] def __init__(self): self.nc: Optional[nats.NATS] = None self.js: Optional[JetStreamContext] = None
[docs] async def connect(self): """Connect to NATS server""" try: self.nc = await nats.connect( servers=[settings.NATS_URL], user=settings.NATS_USER, password=settings.NATS_PASSWORD, name="tafy-hub-api", reconnect_time_wait=2, max_reconnect_attempts=60, ) # Enable JetStream if available try: self.js = self.nc.jetstream() logger.info("JetStream enabled") except Exception as e: logger.warning("JetStream not available", error=str(e)) except Exception as e: logger.error("Failed to connect to NATS", error=str(e)) raise
[docs] async def close(self): """Close NATS connection""" if self.nc: await self.nc.close()
@property def is_connected(self) -> bool: """Check if connected to NATS""" return self.nc is not None and self.nc.is_connected
[docs] async def publish(self, subject: str, data: bytes): """Publish message to subject""" if not self.is_connected: raise RuntimeError("NATS not connected") await self.nc.publish(subject, data)
[docs] async def subscribe(self, subject: str, callback): """Subscribe to subject with callback""" if not self.is_connected: raise RuntimeError("NATS not connected") return await self.nc.subscribe(subject, cb=callback)
# Global NATS client instance nats_client = NATSClient()