Source code for app.services.flow_service

"""
Flow management service for Node-RED flows
"""

from typing import List, Optional, Dict, Any
from datetime import datetime
import uuid
import structlog

from app.schemas.flow import FlowCreate, FlowUpdate, FlowResponse
from app.core.nats import nats_client

logger = structlog.get_logger()


[docs] class FlowService: """Service for managing Node-RED flows"""
[docs] def __init__(self): self._flows: Dict[str, Dict[str, Any]] = {}
[docs] async def create_flow(self, flow_data: FlowCreate) -> FlowResponse: """Create a new flow""" flow_id = str(uuid.uuid4()) flow = { "id": flow_id, "name": flow_data.name, "description": flow_data.description, "config": flow_data.config, "target_nodes": flow_data.target_nodes, "flow_metadata": flow_data.flow_metadata, "deployed": False, "deployed_at": None, "version": 1, "created_at": datetime.utcnow(), "updated_at": datetime.utcnow(), } self._flows[flow_id] = flow logger.info("Flow created", flow_id=flow_id, name=flow_data.name) return FlowResponse(**flow)
[docs] async def get_flow(self, flow_id: str) -> Optional[FlowResponse]: """Get flow by ID""" flow = self._flows.get(flow_id) if flow: return FlowResponse(**flow) return None
[docs] async def update_flow(self, flow_id: str, update_data: FlowUpdate) -> Optional[FlowResponse]: """Update flow""" flow = self._flows.get(flow_id) if not flow: return None # Update fields update_dict = update_data.model_dump(exclude_unset=True) if update_dict: flow.update(update_dict) flow["updated_at"] = datetime.utcnow() flow["version"] += 1 # If config changed and flow is deployed, mark as needing redeploy if "config" in update_dict and flow["deployed"]: flow["deployed"] = False flow["deployed_at"] = None logger.info("Flow updated", flow_id=flow_id, version=flow["version"]) return FlowResponse(**flow)
[docs] async def list_flows(self, deployed_only: bool = False) -> List[FlowResponse]: """List all flows""" flows = list(self._flows.values()) if deployed_only: flows = [f for f in flows if f["deployed"]] return [FlowResponse(**f) for f in flows]
[docs] async def deploy_flow(self, flow_id: str, target_nodes: Optional[List[str]] = None) -> Optional[FlowResponse]: """Deploy flow to nodes""" flow = self._flows.get(flow_id) if not flow: return None # Use specified nodes or flow's default targets nodes = target_nodes or flow["target_nodes"] if not nodes: logger.error("No target nodes specified for deployment", flow_id=flow_id) return None # Publish deployment command deployment = { "flow_id": flow_id, "config": flow["config"], "version": flow["version"], "timestamp": datetime.utcnow().isoformat(), } for node in nodes: subject = f"node.{node}.flow.deploy" await nats_client.publish(subject, deployment) # Update flow status flow["deployed"] = True flow["deployed_at"] = datetime.utcnow() flow["updated_at"] = datetime.utcnow() logger.info("Flow deployed", flow_id=flow_id, nodes=nodes) return FlowResponse(**flow)
[docs] async def undeploy_flow(self, flow_id: str) -> Optional[FlowResponse]: """Undeploy flow from nodes""" flow = self._flows.get(flow_id) if not flow or not flow["deployed"]: return None # Publish undeploy command undeploy_cmd = { "flow_id": flow_id, "timestamp": datetime.utcnow().isoformat(), } for node in flow["target_nodes"]: subject = f"node.{node}.flow.undeploy" await nats_client.publish(subject, undeploy_cmd) # Update flow status flow["deployed"] = False flow["deployed_at"] = None flow["updated_at"] = datetime.utcnow() logger.info("Flow undeployed", flow_id=flow_id) return FlowResponse(**flow)
# Singleton instance flow_service = FlowService()