Source code for app.services.device_service

"""
Device management service
"""

from typing import List, Optional, Dict, Any
from datetime import datetime
import structlog

from app.schemas.device import DeviceCreate, DeviceUpdate, DeviceResponse, DeviceStatus
from app.core.nats import nats_client

logger = structlog.get_logger()


[docs] class DeviceService: """Service for managing devices"""
[docs] def __init__(self): self._devices: Dict[str, Dict[str, Any]] = {}
[docs] async def create_device(self, device_data: DeviceCreate) -> DeviceResponse: """Create a new device""" device = { "id": device_data.id, "name": device_data.name, "type": device_data.type, "status": DeviceStatus.discovered, "claimed": False, "capabilities": device_data.capabilities, "device_metadata": device_data.device_metadata, "ip_address": device_data.ip_address, "mac_address": device_data.mac_address, "last_seen": datetime.utcnow(), "created_at": datetime.utcnow(), "updated_at": datetime.utcnow(), } self._devices[device_data.id] = device # Publish device discovery event await self._publish_device_event("device.discovered", device) logger.info("Device created", device_id=device_data.id) return DeviceResponse(**device)
[docs] async def get_device(self, device_id: str) -> Optional[DeviceResponse]: """Get device by ID""" device = self._devices.get(device_id) if device: return DeviceResponse(**device) return None
[docs] async def update_device(self, device_id: str, update_data: DeviceUpdate) -> Optional[DeviceResponse]: """Update device""" device = self._devices.get(device_id) if not device: return None # Update fields update_dict = update_data.model_dump(exclude_unset=True) device.update(update_dict) device["updated_at"] = datetime.utcnow() # Update last seen if status is online if update_data.status == DeviceStatus.online: device["last_seen"] = datetime.utcnow() # Publish update event await self._publish_device_event("device.updated", device) logger.info("Device updated", device_id=device_id) return DeviceResponse(**device)
[docs] async def list_devices(self, status: Optional[DeviceStatus] = None) -> List[DeviceResponse]: """List all devices""" devices = list(self._devices.values()) if status: devices = [d for d in devices if d["status"] == status] return [DeviceResponse(**d) for d in devices]
[docs] async def claim_device(self, device_id: str) -> Optional[DeviceResponse]: """Claim a device""" device = self._devices.get(device_id) if not device: return None device["claimed"] = True device["status"] = DeviceStatus.claimed device["updated_at"] = datetime.utcnow() # Publish claim event await self._publish_device_event("device.claimed", device) logger.info("Device claimed", device_id=device_id) return DeviceResponse(**device)
[docs] async def send_command(self, device_id: str, command: Dict[str, Any]) -> bool: """Send command to device""" device = self._devices.get(device_id) if not device: return False # Publish command to device-specific subject subject = f"device.{device_id}.command" await nats_client.publish(subject, command) logger.info("Command sent to device", device_id=device_id, command=command.get("type")) return True
async def _publish_device_event(self, event_type: str, device: Dict[str, Any]): """Publish device event to NATS""" event = { "type": event_type, "device_id": device["id"], "timestamp": datetime.utcnow().isoformat(), "data": device, } await nats_client.publish("hub.events.devices", event)
# Singleton instance device_service = DeviceService()