CrewAI is a Python framework for orchestrating teams of autonomous AI agents. Each agent has a role, goal, backstory, and a set of tools. Agents are assigned Tasks and assembled into a Crew that runs them in sequential or hierarchical order. CrewAI abstracts away the LLM-loop complexity so you can focus on workflow design. For data engineers, it excels at parallelisable research, automated reporting, and pipeline triage workflows where multiple specialised agents collaborate.
Python 3.10+ crewai ≥ 0.28 Multi-Agent Sequential / Hierarchical Memory Tool Use
The fundamental actor in CrewAI. An agent wraps an LLM with a persona: role (job title), goal (what it wants to achieve), and backstory (context shaping its reasoning). Agents can be given a list of tools, a specific llm, and flags like allow_delegation (can pass work to other agents) and memory=True.
A unit of work assigned to an agent. Key fields: description (detailed natural-language instructions), agent (assigned agent), expected_output (contract for the result), and optional context (list of prior tasks whose output is passed in). Tasks are the composable building blocks; the same agent can handle many tasks.
The orchestration container that holds agents, tasks, and a process type. Calling crew.kickoff() starts execution and returns the final task's output. Optional params: verbose, memory, full_output (returns all task outputs), max_rpm (rate-limit LLM calls).
| Process | Execution Order | Manager Agent | Best For |
|---|---|---|---|
Process.sequential | Tasks run in list order; each gets previous output as context | No | Linear pipelines, chained summarisation |
Process.hierarchical | A manager agent decomposes the goal, delegates sub-tasks, and aggregates results | Required | Complex goals with flexible sub-task allocation |
CrewAI provides three memory scopes that persist context across agent interactions:
Agents extend their capabilities with tools. Built-in tools include SerperDevTool (web search), FileReadTool, DirectoryReadTool, and CodeInterpreterTool. Custom tools are created by subclassing BaseTool or using the @tool decorator from LangChain (CrewAI is LangChain-compatible).
A Researcher agent crawls the web for domain data, a Data Analyst agent cleans and structures findings, and a Report Writer agent formats them into a Markdown or PDF document. Replaces hours of manual research with an on-demand pipeline.
A Log Analyst agent parses Airflow/dbt failure logs, a Schema Inspector agent queries the data catalog, and an RCA Synthesiser agent combines findings into a structured incident report — all triggered automatically on pipeline failure via webhook.
A Researcher writes first-draft documentation from code, a Reviewer critiques it for accuracy, and a Formatter applies style guidelines. Keeps technical docs in sync with codebase changes at low cost.
In a SaaS context, a Data Extractor agent pulls customer usage metrics from a warehouse, an Insight Agent identifies at-risk accounts, and a Communication Agent drafts personalised outreach email templates — all from a single kickoff() call.
from crewai import Agent, Task, Crew, Process
from crewai_tools import SerperDevTool
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.3)
search = SerperDevTool()
# --- Agents ---
researcher = Agent(
role="Data Engineering Researcher",
goal="Find current best practices and tools for real-time data pipelines",
backstory="Senior DE with 10 years experience across Kafka, Flink, and cloud platforms",
tools=[search],
llm=llm,
verbose=True,
)
analyst = Agent(
role="Data Analyst",
goal="Distil research into actionable insights with concrete recommendations",
backstory="Analyst specialising in evaluating trade-offs across DE architectures",
llm=llm,
verbose=True,
)
writer = Agent(
role="Technical Writer",
goal="Produce clear, well-structured Markdown reports for engineering teams",
backstory="Technical writer with background in open-source documentation",
llm=llm,
verbose=True,
)
# --- Tasks ---
research_task = Task(
description="Research the top 5 real-time streaming frameworks in 2025. Include adoption, performance benchmarks, and cloud integration support.",
agent=researcher,
expected_output="A structured summary of 5 frameworks with pros, cons, and benchmark data",
)
analysis_task = Task(
description="Analyse the research findings and recommend the best framework for a fintech company processing 1M events/sec on AWS.",
agent=analyst,
expected_output="A recommendation with justification, risk factors, and migration considerations",
context=[research_task], # receives research_task output
)
report_task = Task(
description="Write a 2-page engineering decision record (EDR) in Markdown format summarising the research and recommendation.",
agent=writer,
expected_output="Markdown EDR with sections: Background, Options Considered, Decision, Consequences",
context=[research_task, analysis_task],
)
# --- Crew ---
crew = Crew(
agents=[researcher, analyst, writer],
tasks=[research_task, analysis_task, report_task],
process=Process.sequential,
verbose=True,
)
result = crew.kickoff()
print(result)
from crewai import Agent, Task, Crew, Process
manager = Agent(
role="Data Platform Lead",
goal="Coordinate the team to deliver a complete data quality audit report",
backstory="Experienced data platform manager who delegates effectively",
allow_delegation=True,
llm=llm,
)
schema_inspector = Agent(
role="Schema Inspector",
goal="Analyse table schemas for type inconsistencies and missing constraints",
backstory="DBA with expertise in data modelling and schema design",
tools=[run_sql_tool], # custom tool
llm=llm,
)
profiler = Agent(
role="Data Profiler",
goal="Compute null rates, cardinality, and outlier statistics for each column",
backstory="Data quality engineer specialised in profiling large datasets",
tools=[run_sql_tool],
llm=llm,
)
audit_task = Task(
description="Perform a full data quality audit of the 'orders' table in the production warehouse.",
agent=manager,
expected_output="Comprehensive DQ report with schema issues, null rates, outliers, and severity ratings",
)
crew = Crew(
agents=[manager, schema_inspector, profiler],
tasks=[audit_task],
process=Process.hierarchical,
manager_llm=llm,
verbose=True,
)
result = crew.kickoff()
from crewai import Agent, Task, Crew, Process
from crewai.memory import LongTermMemory
from crewai.memory.storage.ltm_sqlite_storage import LTMSQLiteStorage
# Long-term memory persisted to local SQLite
ltm = LongTermMemory(
storage=LTMSQLiteStorage(db_path="./crew_memory.db")
)
incident_agent = Agent(
role="Incident Analyst",
goal="Diagnose pipeline failures using historical incident knowledge",
backstory="SRE who has handled hundreds of data pipeline incidents",
memory=True,
llm=llm,
)
triage_task = Task(
description="Analyse the attached Airflow task failure log and suggest a fix based on past incidents.",
agent=incident_agent,
expected_output="Root cause, similar past incidents, and recommended fix with confidence score",
)
crew = Crew(
agents=[incident_agent],
tasks=[triage_task],
memory=True,
long_term_memory=ltm,
verbose=True,
)
result = crew.kickoff(inputs={"log": open("airflow_error.log").read()})
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
import duckdb
class SQLInput(BaseModel):
query: str = Field(description="READ-ONLY SQL query to execute on the warehouse")
class WarehouseTool(BaseTool):
name: str = "warehouse_query"
description: str = "Execute a read-only SQL query against the DuckDB warehouse and return results."
args_schema: type[BaseModel] = SQLInput
def _run(self, query: str) -> str:
if any(kw in query.upper() for kw in ["DROP", "DELETE", "UPDATE", "INSERT"]):
raise ValueError("Only SELECT queries are permitted")
conn = duckdb.connect("warehouse.ddb", read_only=True)
df = conn.execute(query).fetchdf()
return df.to_string(max_rows=30)
run_sql_tool = WarehouseTool()
↑ Back to top
| Framework | Multi-Agent | Execution Model | Memory | Learning Curve | Best For |
|---|---|---|---|---|---|
| CrewAI | Native | Sequential / Hierarchical | Short + Long + Entity | Low | Role-based team workflows |
| LangChain Agents | Basic (single agent loops) | ReAct loop | Pluggable | Medium | Tool-heavy single-agent tasks |
| AutoGen | Native | Conversational (message passing) | Limited | Medium | Debate / critique agent patterns |
| Google ADK | Native | Event-driven orchestration | Full (short + long) | Medium | GCP-integrated autonomous agents |
| LangGraph | Via graph nodes | Stateful DAG / cyclic graph | State channels | High | Complex conditional branching |
allow_delegation=True across multiple agents and hierarchical process, it's possible for agents to delegate in circles. Always test with verbose=True and set max_iterations on agent executors.context=[task_a, task_b] param appends the full output of each dependency into the prompt. Long upstream outputs can fill the context window. Summarise task outputs or cap expected_output length explicitly in the task description.args_schema (mandatory input validation) and log all tool calls via callbacks.kickoff calls, learns which frameworks a user's team has already evaluated (via long-term memory) and skips re-recommending them in subsequent research sessions.Process.sequential and Process.hierarchical?manager_llm.
allow_delegation=True on an agent?context parameter on a Task work?args_schema (Pydantic model) when defining a custom BaseTool?