自动化工作流系统架构

📅 2026-06-12 ✍️ 以观其妙书院

自动化工作流系统架构

版本: v1.0 | 优先级: P0 | 日期: 2026-04-15
核心能力: 定时/事件驱动任务调度

一、架构总览

┌─────────────────────────────────────────────────────────────────┐
│                    自动化工作流系统 v1.0                          │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                  调度引擎 (Scheduler)                    │   │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────────┐  │   │
│  │  │ 定时调度器   │  │ 事件监听器   │  │ 工作流编排引擎   │  │   │
│  │  │ (Cron)      │  │ (Event Bus) │  │ (DAG Executor)  │  │   │
│  │  └─────────────┘  └─────────────┘  └─────────────────┘  │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              │                                  │
│                              ▼                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                  任务队列 (Task Queue)                   │   │
│  │         Redis / SQLite / In-Memory Queue               │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              │                                  │
│                              ▼                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                  执行器集群 (Executors)                  │   │
│  │  ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────────┐   │   │
│  │  │ 本地执行 │ │ 远程SSH │ │ API调用 │ │ 子Agent委派 │   │   │
│  │  │ Local   │ │ Remote  │ │ HTTP    │ │ Sub-Agent   │   │   │
│  │  └─────────┘ └─────────┘ └─────────┘ └─────────────┘   │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              │                                  │
│                              ▼                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                  状态存储 (State Store)                  │   │
│  │  SQLite / JSON / Obsidian / IMA 三向同步                │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

二、核心组件设计

2.1 工作流定义DSL

============================================

工作流定义DSL (Domain Specific Language)

文件: ~/.workbuddy/workflows/示例工作流.yaml

============================================

workflow: name: "每日知识沉淀" version: "1.0" description: "自动执行每日知识库备份和总结" # 触发器配置 triggers: # 定时触发 - type: "schedule" cron: "0 23 * * *" # 每天23:00 timezone: "Asia/Shanghai" # 事件触发 - type: "event" event: "session.end" # 会话结束时 condition: "session.duration > 30min" # 手动触发 - type: "manual" command: "执行每日沉淀" # 变量定义 variables: backup_dir: "{{env.OBSIDIAN_VAULT}}/备份/{{date.Y-m-d}}" summary_file: "{{env.WORKBUDDY_MEMORY}}/{{date.Y-m-d}}.md" # 任务DAG(有向无环图) tasks: # 任务1: 备份Obsidian - id: "backup_obsidian" name: "备份Obsidian知识库" type: "local" command: | robocopy "{{env.OBSIDIAN_VAULT}}" "{{backup_dir}}" /E /COPYALL retry: max_attempts: 3 backoff: "exponential" # 任务2: 生成日报 - id: "generate_summary" name: "生成今日总结" type: "subagent" agent: "知识总结助手" input: memory_file: "{{summary_file}}" format: "markdown" depends_on: ["backup_obsidian"] # 任务3: 同步到IMA - id: "sync_ima" name: "同步到IMA笔记" type: "api" endpoint: "https://ima.qq.com/api/notes" method: "POST" headers: Authorization: "Bearer {{secrets.IMA_API_KEY}}" body: title: "{{date.Y-m-d}} 每日总结" content: "{{tasks.generate_summary.output}}" depends_on: ["generate_summary"] # 任务4: 发送通知 - id: "notify" name: "发送完成通知" type: "notification" channel: "system" message: "✅ 每日知识沉淀完成\n备份: {{backup_dir}}\n总结: {{summary_file}}" depends_on: ["sync_ima"] # 错误处理 on_error: strategy: "continue" # continue / stop / retry notify: true fallback_task: "error_handler" # 并发控制 concurrency: max_parallel: 3 queue_size: 10

2.2 Python核心类

============================================

自动化工作流系统 - Python核心实现

文件: ~/.workbuddy/skills/自动化工作流/workflow_engine.py

============================================

from dataclasses import dataclass, field from typing import List, Dict, Any, Optional, Callable from enum import Enum from datetime import datetime, timedelta import asyncio import json import hashlib import croniter

class TriggerType(Enum): """触发器类型""" SCHEDULE = "schedule" # 定时触发 EVENT = "event" # 事件触发 MANUAL = "manual" # 手动触发 WEBHOOK = "webhook" # Webhook触发

class TaskStatus(Enum): """任务状态""" PENDING = "pending" # 等待中 RUNNING = "running" # 运行中 SUCCESS = "success" # 成功 FAILED = "failed" # 失败 RETRYING = "retrying" # 重试中 CANCELLED = "cancelled" # 已取消

class TaskType(Enum): """任务类型""" LOCAL = "local" # 本地命令 REMOTE = "remote" # 远程SSH API = "api" # API调用 SUBAGENT = "subagent" # 子Agent NOTIFICATION = "notification" # 通知 CONDITION = "condition" # 条件判断 LOOP = "loop" # 循环

@dataclass class Trigger: """触发器定义""" type: TriggerType config: Dict[str, Any] # 触发器配置 enabled: bool = True def should_trigger(self, context: Dict) -> bool: """检查是否应该触发""" if not self.enabled: return False if self.type == TriggerType.SCHEDULE: # Cron表达式检查 cron = self.config.get("cron") last_run = context.get("last_run") iter = croniter(cron, last_run) next_run = iter.get_next(datetime) return datetime.now() >= next_run elif self.type == TriggerType.EVENT: # 事件匹配检查 event = context.get("event") expected_event = self.config.get("event") condition = self.config.get("condition") if event != expected_event: return False if condition: # 执行条件表达式 return eval(condition, {"__builtins__": {}}, context) return True elif self.type == TriggerType.MANUAL: # 手动触发 command = context.get("command") return command == self.config.get("command") return False

@dataclass class Task: """任务定义""" id: str name: str type: TaskType config: Dict[str, Any] # 任务配置 depends_on: List[str] = field(default_factory=list) retry: Dict[str, Any] = field(default_factory=dict) timeout: int = 300 # 默认5分钟超时 # 运行时状态 status: TaskStatus = TaskStatus.PENDING output: Any = None error: Optional[str] = None start_time: Optional[datetime] = None end_time: Optional[datetime] = None attempt_count: int = 0

@dataclass class Workflow: """工作流定义""" name: str version: str description: str triggers: List[Trigger] tasks: List[Task] variables: Dict[str, Any] = field(default_factory=dict) on_error: Dict[str, Any] = field(default_factory=dict) concurrency: Dict[str, Any] = field(default_factory=dict) # 运行时状态 instance_id: Optional[str] = None status: str = "idle" current_task: Optional[str] = None task_states: Dict[str, TaskStatus] = field(default_factory=dict) @property def workflow_id(self) -> str: """工作流唯一标识""" content = f"{self.name}:{self.version}:{json.dumps(self.tasks, default=str)}" return hashlib.md5(content.encode()).hexdigest()[:12] def get_execution_order(self) -> List[List[str]]: """ 获取任务执行顺序(拓扑排序) 返回分层级的任务ID列表,同层级可并行执行 """ # 构建依赖图 graph = {t.id: set(t.depends_on) for t in self.tasks} # 拓扑排序 levels = [] remaining = set(graph.keys()) while remaining: # 找没有依赖或依赖已完成的任务 ready = {t for t in remaining if not graph[t]} if not ready: raise ValueError("循环依赖 detected") levels.append(list(ready)) remaining -= ready # 移除已完成的依赖 for t in remaining: graph[t] -= ready return levels

class WorkflowEngine: """ 工作流引擎 核心职责: 1. 工作流注册和管理 2. 触发器监听和调度 3. 任务执行和状态管理 4. 错误处理和重试 """ def __init__(self): self.workflows: Dict[str, Workflow] = {} self.running_instances: Dict[str, Workflow] = {} self.task_executors: Dict[TaskType, Callable] = {} self._setup_default_executors() def _setup_default_executors(self): """设置默认任务执行器""" self.task_executors[TaskType.LOCAL] = self._execute_local self.task_executors[TaskType.REMOTE] = self._execute_remote self.task_executors[TaskType.API] = self._execute_api self.task_executors[TaskType.SUBAGENT] = self._execute_subagent self.task_executors[TaskType.NOTIFICATION] = self._execute_notification def register_workflow(self, workflow: Workflow): """注册工作流""" self.workflows[workflow.workflow_id] = workflow print(f"✅ 工作流已注册: {workflow.name} (v{workflow.version})") def unregister_workflow(self, workflow_id: str): """注销工作流""" if workflow_id in self.workflows: del self.workflows[workflow_id] async def start_scheduler(self): """启动调度器""" print("🚀 工作流调度器已启动") while True: await self._check_triggers() await asyncio.sleep(1) # 每秒检查一次 async def _check_triggers(self): """检查所有触发器""" for workflow in self.workflows.values(): for trigger in workflow.triggers: context = self._build_context(workflow) if trigger.should_trigger(context): await self._trigger_workflow(workflow, trigger) async def _trigger_workflow(self, workflow: Workflow, trigger: Trigger): """触发工作流执行""" instance_id = f"{workflow.workflow_id}_{datetime.now().strftime('%Y%m%d%H%M%S')}" # 创建工作流实例 instance = Workflow( name=workflow.name, version=workflow.version, description=workflow.description, triggers=workflow.triggers, tasks=[Task(**t.__dict__) for t in workflow.tasks], variables=workflow.variables.copy(), instance_id=instance_id, status="running" ) self.running_instances[instance_id] = instance print(f"▶️ 工作流启动: {workflow.name} (实例: {instance_id})") try: await self._execute_workflow(instance) except Exception as e: print(f"❌ 工作流执行失败: {e}") instance.status = "failed" finally: if instance.status == "running": instance.status = "completed" del self.running_instances[instance_id] async def _execute_workflow(self, instance: Workflow): """执行工作流""" execution_order = instance.get_execution_order() for level_tasks in execution_order: # 同层级任务并行执行 tasks = [t for t in instance.tasks if t.id in level_tasks] await asyncio.gather(*[ self._execute_task(instance, task) for task in tasks ]) # 检查是否有任务失败 failed_tasks = [t for t in tasks if t.status == TaskStatus.FAILED] if failed_tasks: if instance.on_error.get("strategy") == "stop": raise Exception(f"任务失败: {failed_tasks[0].id}") async def _execute_task(self, instance: Workflow, task: Task): """执行单个任务""" task.status = TaskStatus.RUNNING task.start_time = datetime.now() task.attempt_count += 1 print(f" 📝 执行任务: {task.name}") try: executor = self.task_executors.get(task.type) if not executor: raise ValueError(f"未知任务类型: {task.type}") # 渲染配置模板 config = self._render_template(task.config, instance) # 执行任务 task.output = await executor(config) task.status = TaskStatus.SUCCESS print(f" ✅ 任务完成: {task.name}") except Exception as e: task.error = str(e) # 重试逻辑 max_attempts = task.retry.get("max_attempts", 1) if task.attempt_count < max_attempts: task.status = TaskStatus.RETRYING backoff = task.retry.get("backoff", "fixed") delay = self._calculate_backoff(task.attempt_count, backoff) print(f" 🔄 任务重试 ({task.attempt_count}/{max_attempts}): {task.name}") await asyncio.sleep(delay) await self._execute_task(instance, task) else: task.status = TaskStatus.FAILED print(f" ❌ 任务失败: {task.name} - {e}") finally: task.end_time = datetime.now() async def _execute_local(self, config: Dict) -> Any: """执行本地命令""" import subprocess command = config.get("command") cwd = config.get("cwd") process = await asyncio.create_subprocess_shell( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=cwd ) stdout, stderr = await process.communicate() if process.returncode != 0: raise Exception(f"命令执行失败: {stderr.decode()}") return stdout.decode() async def _execute_remote(self, config: Dict) -> Any: """执行远程SSH命令""" # 使用paramiko或asyncssh pass async def _execute_api(self, config: Dict) -> Any: """执行API调用""" import aiohttp async with aiohttp.ClientSession() as session: method = config.get("method", "GET") url = config.get("endpoint") headers = config.get("headers", {}) body = config.get("body") async with session.request(method, url, headers=headers, json=body) as resp: return await resp.json() async def _execute_subagent(self, config: Dict) -> Any: """执行子Agent任务""" # 调用WorkBuddy子Agent API pass async def _execute_notification(self, config: Dict) -> Any: """执行通知任务""" channel = config.get("channel") message = config.get("message") if channel == "system": print(f"🔔 系统通知: {message}") elif channel == "discord": # 发送到Discord pass return {"sent": True, "channel": channel} def _render_template(self, template: Dict, instance: Workflow) -> Dict: """渲染模板变量""" import re template_str = json.dumps(template) # 替换变量 def replace_var(match): var_path = match.group(1) # 支持嵌套变量访问,如 {{tasks.task1.output}} value = self._resolve_variable(var_path, instance) return str(value) if value is not None else match.group(0) rendered = re.sub(r'\{\{(.*?)\}\}', replace_var, template_str) return json.loads(rendered) def _resolve_variable(self, path: str, instance: Workflow) -> Any: """解析变量路径""" parts = path.split(".") value = instance.variables for part in parts: if isinstance(value, dict): value = value.get(part) else: return None return value def _calculate_backoff(self, attempt: int, strategy: str) -> float: """计算重试延迟""" if strategy == "fixed": return 5.0 elif strategy == "linear": return 5.0 * attempt elif strategy == "exponential": return 5.0 * (2 ** (attempt - 1)) return 5.0 def _build_context(self, workflow: Workflow) -> Dict: """构建触发器上下文""" return { "workflow": workflow, "last_run": None, # 从状态存储读取 "env": {}, # 环境变量 "date": datetime.now() }


三、预设工作流

3.1 心跳巡检工作流

~/.workbuddy/workflows/心跳巡检.yaml

workflow: name: "心跳巡检" version: "1.0" description: "每小时检查系统健康状态" triggers: - type: "schedule" cron: "0 * * * *" # 每小时 tasks: - id: "check_obsidian" name: "检查Obsidian知识库" type: "local" command: | if (Test-Path "{{env.OBSIDIAN_VAULT}}/.obsidian") { Write-Output "healthy" } else { Write-Output "unhealthy" } - id: "check_workbuddy" name: "检查WorkBuddy Brain" type: "local" command: | $brain_path = "{{env.WORKBUDDY_BRAIN}}" $file_count = (Get-ChildItem $brain_path -Recurse -File | Measure-Object).Count Write-Output "Files: $file_count" - id: "check_memory" name: "检查记忆系统" type: "subagent" agent: "系统诊断助手" input: check_items: ["MEMORY.md", "daily_logs", "context_snapshots"] - id: "generate_report" name: "生成巡检报告" type: "subagent" agent: "报告生成助手" input: template: "heartbeat_report" data_sources: - "{{tasks.check_obsidian.output}}" - "{{tasks.check_workbuddy.output}}" - "{{tasks.check_memory.output}}" depends_on: ["check_obsidian", "check_workbuddy", "check_memory"] - id: "notify_if_issue" name: "异常通知" type: "condition" condition: "{{tasks.check_obsidian.output}} == 'unhealthy'" true_task: type: "notification" channel: "system" message: "⚠️ Obsidian知识库异常,请检查!"

3.2 知行合一沉淀工作流

~/.workbuddy/workflows/知行合一沉淀.yaml

workflow: name: "知行合一沉淀" version: "1.0" description: "深度对话后自动执行知行合一沉淀" triggers: - type: "event" event: "session.end" condition: "session.depth_level >= 'S2'" tasks: - id: "analyze_session" name: "分析会话数据" type: "subagent" agent: "会话分析助手" input: session_id: "{{event.session_id}}" extract: ["engines_used", "key_insights", "decisions_made"] - id: "compress_insight" name: "压缩核心洞察" type: "subagent" agent: "知识压缩助手" input: raw_insights: "{{tasks.analyze_session.output.insights}}" format: "one_sentence + symbol" depends_on: ["analyze_session"] - id: "generalize_scenarios" name: "泛化应用场景" type: "subagent" agent: "场景泛化助手" input: core_insight: "{{tasks.compress_insight.output}}" num_scenarios: 3 depends_on: ["compress_insight"] - id: "save_to_memory" name: "保存到记忆系统" type: "local" command: | $content = @" ## {{date.Y-m-d}} 知行合一沉淀 核心洞察: {{tasks.compress_insight.output}} 泛化场景: {{tasks.generalize_scenarios.output}} 系统进化: {{tasks.analyze_session.output.evolution_suggestions}} "@ $content | Out-File -FilePath "{{env.MEMORY_PATH}}/{{date.Y-m-d}}.md" -Append depends_on: ["generalize_scenarios"] - id: "sync_to_obsidian" name: "同步到Obsidian" type: "local" command: | Copy-Item "{{env.MEMORY_PATH}}/{{date.Y-m-d}}.md" ` "{{env.OBSIDIAN_VAULT}}/99-每日沉淀/{{date.Y-m-d}}.md" depends_on: ["save_to_memory"]


四、与龙心OS集成

4.1 工作流触发场景

龙心OS场景与工作流映射

文件: ~/.workbuddy/skills/龙心OS/references/workflow-triggers.yaml

scene_workflow_mapping: # S2 深度理解 → 知识学习后沉淀 S2: - workflow: "知行合一沉淀" trigger: "session.end" condition: "session.engines_used contains '知识学习'" # S3 创意创新 → 创新方案归档 S3: - workflow: "创新方案归档" trigger: "session.end" condition: "session.engines_used contains '象思维'" # S5 重大决策 → 决策记录与复盘 S5: - workflow: "决策记录与复盘" trigger: "session.end" condition: "session.scene == 'S5'" # S7 系统规划 → 架构文档更新 S7: - workflow: "架构文档自动更新" trigger: "session.end" condition: "session.scene == 'S7'" # S9 系统升级 → Skill自动进化 S9: - workflow: "GEPA Skill进化" trigger: "session.end" condition: "session.scene == 'S9'"

4.2 CLI命令

工作流管理CLI

列出所有工作流

workbuddy workflow list

查看工作流详情

workbuddy workflow show

手动触发工作流

workbuddy workflow run [--vars key=value]

查看工作流执行历史

workbuddy workflow logs [--limit 10]

暂停/恢复工作流

workbuddy workflow pause workbuddy workflow resume

注册新工作流

workbuddy workflow register

查看运行中的实例

workbuddy workflow instances

取消工作流实例

workbuddy workflow cancel

五、监控仪表盘

工作流监控仪表盘

文件: ~/.workbuddy/skills/自动化工作流/dashboard.py

class WorkflowDashboard: """工作流监控仪表盘""" def get_stats(self) -> Dict: """获取统计信息""" return { "workflows": { "total": len(self.engine.workflows), "active": len([w for w in self.engine.workflows.values() if any(t.enabled for t in w.triggers)]) }, "executions": { "today": self._count_executions(since="today"), "success_rate": self._calc_success_rate(), "avg_duration": self._calc_avg_duration() }, "queue": { "pending": len(self._get_pending_tasks()), "running": len(self.engine.running_instances) } } def render_console(self): """渲染控制台仪表盘""" stats = self.get_stats() print("╔══════════════════════════════════════════════════╗") print("║ 自动化工作流监控仪表盘 v1.0 ║") print("╠══════════════════════════════════════════════════╣") print(f"║ 工作流: {stats['workflows']['active']}/{stats['workflows']['total']} 活跃 ║") print(f"║ 今日执行: {stats['executions']['today']} 次 ║") print(f"║ 成功率: {stats['executions']['success_rate']:.1%} ║") print(f"║ 平均耗时: {stats['executions']['avg_duration']:.1f}s ║") print(f"║ 队列: {stats['queue']['pending']} 待处理 / {stats['queue']['running']} 运行中 ║") print("╚══════════════════════════════════════════════════╝")


六、实施计划

Phase 1: 核心引擎 (3天)
├── Day 1: 工作流DSL + 解析器
├── Day 2: 调度引擎 + 任务队列
└── Day 3: 执行器实现

Phase 2: 触发系统 (2天) ├── Day 4: 定时调度 + Cron解析 └── Day 5: 事件监听 + Webhook

Phase 3: 预设工作流 (2天) ├── Day 6: 心跳巡检工作流 └── Day 7: 知行合一沉淀工作流

Phase 4: 集成测试 (2天) ├── Day 8: 与龙心OS集成 └── Day 9: 监控仪表盘 + 文档

Total: 9天


七、核心金句

"自动化不是替代人,而是释放人去做更有创造性的事。"
"最好的工作流是'设置后忘记',但永远不会真正被遗忘。"
"定时任务像心跳,保持系统活力;事件驱动像神经,响应环境变化。"

文档版本: v1.0-draft | 状态: 设计完成待实现 | 下一步: 开始Phase 1实施