本文介绍 CrewAI 框架的核心概念 Agent/Task/Crew 以及多 Agent 协作的基本用法。通过实际代码展示如何定义专业 Agent、配置任务依赖、设置执行流程(顺序/并行),以及如何启动 Crew 并获取执行结果。
CrewAI 是一个用于编排自主 AI Agent 的 Python 框架,允许你像组建虚拟团队一样为每个 Agent 分配特定角色和目标,让它们协同处理复杂任务。
Agent 是具备特定角色的 AI 执行单元:
from crewai import Agent
researcher = Agent(
role="研究分析师",
goal="提供准确、深入的研究分析",
backstory="你是一位资深研究分析师,擅长从多角度分析问题。",
verbose=True # 启用详细输出
)
Task 是分配给 Agent 的具体工作:
from crewai import Task
research_task = Task(
description="分析人工智能在医疗领域的应用趋势",
agent=researcher, # 分配给 researcher
expected_output="一份结构化的分析报告"
)
Crew 协调多个 Agent 完成任务:
from crewai import Crew
crew = Crew(
agents=[researcher, writer], # 团队成员
tasks=[research_task, write_task], # 任务列表
process="sequential" # 或 "parallel"
)
from crewai import Agent, Task, Crew
from crewai.process import Process
# 1. 定义 Agent
researcher = Agent(
role="市场研究员",
goal="收集并分析市场数据",
backstory="你是一位专业市场分析师,擅长数据分析。"
)
writer = Agent(
role="内容撰写师",
goal="撰写清晰、专业的市场报告",
backstory="你是一位资深内容撰写师,擅长将复杂信息转化为易读文章。"
)
# 2. 定义 Task
research_task = Task(
description="收集 2024 年 AI 行业市场规模和增长趋势数据",
agent=researcher,
expected_output="市场规模数据、增长率、关键玩家列表"
)
write_task = Task(
description="基于研究数据撰写市场分析报告",
agent=writer,
expected_output="一份完整的市场分析报告,包含执行摘要和结论"
)
write_task.context = [research_task] # 依赖 research_task
# 3. 创建 Crew
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task],
process=Process.sequential # 顺序执行
)
# 4. 启动执行
result = crew.kickoff()
print(result)
crew = Crew(
agents=[agent1, agent2, agent3],
tasks=[task1, task2, task3],
process=Process.sequential
)
# 任务按定义顺序依次执行
crew = Crew(
agents=[researcher1, researcher2, researcher3],
tasks=[task1, task2, task3],
process=Process.hierarchical # 层级协作
)
from crewai import Agent
from crewai_tools import SerperDevTool, WebsiteSearchTool
agent = Agent(
role="数据分析师",
goal="从多个来源收集数据",
backstory="你擅长数据收集和分析。",
tools=[
SerperDevTool(), # 搜索工具
WebsiteSearchTool() # 网站搜索
]
)
Q1: Task 的 context 参数有什么用?
Q2: sequential 和 hierarchical 流程有什么区别?
Q3: 如何让 Agent 协同工作?
示例代码逻辑完整,可正常导入执行
代码结构符合 CrewAI 框架规范