Back
BY
Bonnie
and
Nathan Tarbert
August 8, 2025

In this guide, you will learn how to integrate CrewAI agents with AG-UI protocol. Also, we will cover how to integrate the AG-UI + CrewAI agents with CopilotKit in order to chat with the agent and stream its responses in the frontend.

Before we jump in, here is what we will cover:

  • What is AG-UI protocol?
  • Integrating CrewAI agents with AG-UI protocol
  • Integrating a frontend to the AG-UI + CrewAI agent using CopilotKit

Here’s a preview of what we will be building:

What is AG-UI protocol?

The Agent User Interaction Protocol (AG-UI), developed by CopilotKit, is an open-source, lightweight, event-based protocol that facilitates rich, real-time interactions between the frontend and AI agents.

The AG-UI protocol enables event-driven communication, state management, tool usage, and streaming AI agent responses.

Star AG-UI ⭐️

To send information between the frontend and your AI agent, AG-UI uses events such as:

  • Lifecycle events: These events mark the start or end of an agent’s task execution. Lifecycle events include RUN_STARTED and RUN_FINISHED events.
  • Text message events: These events handle streaming agent responses to the frontend. Text message events include TEXT_MESSAGE_START, TEXT_MESSAGE_CONTENT, and TEXT_MESSAGE_END events.
  • Tool call events: These events manage the agent’s tool executions. Tool call events include TOOL_CALL_START, TOOL_CALL_ARGS, and TOOL_CALL_END events.
  • State management events: These events keep the frontend and the AI agent state in sync. State management events include STATE_SNAPSHOT and STATE_DELTA events.

You can learn more about the AG-UI protocol and its architecture here on AG-UI docs.

Image from Notion

Now that we have learned what the AG-UI protocol is, let us see how to integrate it with the CrewAI agent framework.

Let’s get started!

Prerequisites

To fully understand this tutorial, you need to have a basic understanding of React or Next.js.

We'll also make use of the following:

  • Python - a popular programming language for building AI agents with LangGraph; make sure it is installed on your computer.
  • Crew AI - a lean, lightning-fast Python framework that enables you to build AI agent teams that work together to tackle complex tasks.
  • OpenAI API Key - an API key to enable us to perform various tasks using the GPT models; for this tutorial, ensure you have access to the GPT-4 model.
  • CopilotKit - an open-source copilot framework for building custom AI chatbots, in-app AI agents, and text areas.

Integrating CrewAI agents with AG-UI protocol

To get started, clone the Open AG UI Demo repository that consists of a Python-based backend (agent) and a Next.js frontend (frontend).

Next, navigate to the backend directory:

cd agent

Then install the dependencies using Poetry:

poetry install

After that, create a .env file with OpenAI API Key API key:

OPENAI_API_KEY=<<your-OpenAI-key-here>>

Then run the agent using the command below:

poetry run python main.py

To test the AG-UI + CrewAI integration, run the curl command below on https://reqbin.com/curl.

curl -X POST "http://localhost:8000/crewai-agent" \
  -H "Content-Type: application/json" \
  -d '{
    "thread_id": "test_thread_123",
    "run_id": "test_run_456",
    "messages": [
      {
        "id": "msg_1",
        "role": "user",
        "content": "Analyze AAPL stock with a $10000 investment from 2023-01-01"
      }
    ],
    "tools": [],
    "context": [],
    "forwarded_props": {},
    "state": {}
  }'

Lets now see how to integrate AG-UI protocol with CrewAI agents.

Step 1: Create your CrewAI agent workflow

Before integrating AG-UI protocol with a CrewAI agent, create your CrewAI agent workflow, as shown in the agent/stock_analysis.py file.

# ===============================================================================
# MAIN FLOW CLASS
# ===============================================================================

class StockAnalysisFlow(Flow):
    """
    Main workflow class that orchestrates the stock analysis process.
    This flow consists of multiple stages:
    1. start() - Initialize the system prompt with portfolio data
    2. chat() - Parse user input and extract investment parameters
    3. simulation() - Gather historical stock data
    4. allocation() - Calculate portfolio allocation and performance
    5. insights() - Generate bull/bear insights about the investments
    6. end() - Return final state
    """

    @start()
    def start(self):
        """
        Step 1: Initialize the workflow
        - Replace placeholder in system prompt with actual portfolio data
        - This sets up the AI assistant with context about the current portfolio
        """
        # Inject current portfolio data into the system prompt template
        self.state['state']["messages"][0].content = system_prompt.replace(
            "{PORTFOLIO_DATA_PLACEHOLDER}", json.dumps(self.state["investment_portfolio"])
        )
        return self.state

    // ...

    @listen(or_("chat", "insights"))
    def end(self):
        """
        Step 6: Final step - return the complete state
        - This method is called from either 'chat' (if no investment data) 
          or 'insights' (after successful analysis)
        - Returns the final state with all analysis results
        """
        return self.state

Step 2: Create an endpoint with FastAPI

Once you have defined your CrewAI agent workflow, create a FastAPI endpoint and import the CrewAI agent workflow as shown in the agent/main.py file.

# ===============================================================================
# IMPORTS AND SETUP
# ===============================================================================

# FastAPI framework for building the web API
from fastapi import FastAPI
from fastapi.responses import StreamingResponse  # For streaming real-time responses to the client

# Standard Python libraries
import uuid  # For generating unique identifiers
from typing import Any  # Type hints for flexible data types
import os  # For environment variables
import uvicorn  # ASGI server for running FastAPI
import asyncio  # For asynchronous programming

# AG UI core components for agent communication and event handling
from ag_ui.core import (
    RunAgentInput,           # Input data structure for agent runs
    StateSnapshotEvent,      # Event for sending state snapshots
    EventType,               # Enumeration of event types
    RunStartedEvent,         # Event emitted when the agent run starts
    RunFinishedEvent,        # Event emitted when the agent run completes
    TextMessageStartEvent,   # Event for starting text message streaming
    TextMessageEndEvent,     # Event for ending text message streaming
    TextMessageContentEvent, # Event for streaming text message content
    ToolCallStartEvent,      # Event for starting tool call execution
    ToolCallEndEvent,        # Event for ending tool call execution
    ToolCallArgsEvent,       # Event for streaming tool call arguments
    StateDeltaEvent          # Event for updating specific parts of state
)
from ag_ui.encoder import EventEncoder  # Encoder for converting events to streamable format

# Import our custom stock analysis workflow
from stock_analysis import StockAnalysisFlow

# CopilotKit state management
from copilotkit import CopilotKitState

# ===============================================================================
# APPLICATION SETUP
# ===============================================================================

# Initialize FastAPI application instance
app = FastAPI()

# ===============================================================================
# MAIN API ENDPOINT
# ===============================================================================

@app.post("/crewai-agent")
async def crewai_agent(input_data: RunAgentInput):
    """
    Main API endpoint for processing stock analysis requests.

    This endpoint:
    1. Receives user input and current state from the frontend
    2. Streams real-time events back to the client during processing
    3. Runs the StockAnalysisFlow workflow asynchronously
    4. Returns results via Server-Sent Events (SSE) streaming

    Args:
        input_data (RunAgentInput): Contains user messages, tools, state, thread/run IDs

    Returns:
        StreamingResponse: Real-time stream of events during agent execution
    """

    // ...

# ===============================================================================
# SERVER STARTUP AND CONFIGURATION
# ===============================================================================

def main():
    """
    Main function to start the uvicorn server.

    This function:
    - Reads the port from environment variables (defaults to 8000)
    - Configures uvicorn server settings
    - Starts the server with hot reload enabled for development
    """
    # Get port from environment variable or use default
    port = int(os.getenv("PORT", "8000"))

    # Start uvicorn server with configuration
    uvicorn.run(
        "main:app",           # Module and app instance
        host="0.0.0.0",      # Listen on all network interfaces
        port=port,           # Port number
        reload=True,         # Enable hot reload for development
    )

if __name__ == "__main__":
    """
    Entry point when script is run directly.
    Starts the FastAPI server using uvicorn.
    """
    main()

Step 3: Define an event generator

After creating a FastAPI endpoint, define an event generator that produces a stream of AG-UI protocol events, initialize the event encoder, and return the streaming response to the client or the frontend, as shown in the agent/main.py file.

@app.post("/crewai-agent")
async def crewai_agent(input_data: RunAgentInput):
    """
    Main API endpoint for processing stock analysis requests.

    This endpoint:
    1. Receives user input and current state from the frontend
    2. Streams real-time events back to the client during processing
    3. Runs the StockAnalysisFlow workflow asynchronously
    4. Returns results via Server-Sent Events (SSE) streaming

    Args:
        input_data (RunAgentInput): Contains user messages, tools, state, thread/run IDs

    Returns:
        StreamingResponse: Real-time stream of events during agent execution
    """
    try:

        async def event_generator():
            """
            An asynchronous generator that streams events to the client in real-time.

            This function orchestrates the entire stock analysis workflow:
            1. Sets up event streaming infrastructure
            2. Emits initial state and run started events
            3. Launches the StockAnalysisFlow workflow
            4. Streams progress events as they occur
            5. Handles final results (tool calls or text messages)
            6. Emits run completion events

            Yields:
                Encoded events for Server-Sent Events (SSE) streaming
            """
            # Step 1: Initialize event streaming infrastructure
            encoder = EventEncoder()  # Converts events to SSE format
            event_queue = asyncio.Queue()  # Queue for events from the workflow

            def emit_event(event):
                """Callback function for the workflow to emit events"""
                event_queue.put_nowait(event)

            # Generate a unique identifier for text messages
            # Generate unique identifier for text messages
            message_id = str(uuid.uuid4())

            // ...

    except Exception as e:
        # Step 10: Handle any unexpected errors during processing
        print(e)  # Log error for debugging

    # Step 11: Return streaming response to client
    # Step 11: Return streaming response to client
    return StreamingResponse(event_generator(), media_type="text/event-stream")

Step 4: Configure AG-UI protocol lifecycle events

Once you have defined an event generator, define the AG-UI protocol lifecycle events that represent the lifecycle of an AG-UI + CrewAI agent workflow run as shown in the agent/main.py file.

@app.post("/crewai-agent")
async def crewai_agent(input_data: RunAgentInput):
    """
    Main API endpoint for processing stock analysis requests.

    This endpoint:
    1. Receives user input and current state from the frontend
    2. Streams real-time events back to the client during processing
    3. Runs the StockAnalysisFlow workflow asynchronously
    4. Returns results via Server-Sent Events (SSE) streaming

    Args:
        input_data (RunAgentInput): Contains user messages, tools, state, thread/run IDs

    Returns:
        StreamingResponse: Real-time stream of events during agent execution
    """
    try:

        async def event_generator():
            // ...

            # Step 2: Emit run started event to notify client that processing has begun
            yield encoder.encode(
                RunStartedEvent(
                    type=EventType.RUN_STARTED,
                    thread_id=input_data.thread_id,  # Conversation thread identifier
                    run_id=input_data.run_id,        # Unique run identifier
                )
            )

            // ...

            # Step 9: Emit run finished event to signal completion
            yield encoder.encode(
                RunFinishedEvent(
                    type=EventType.RUN_FINISHED,
                    thread_id=input_data.thread_id,  # Same thread ID from start
                    run_id=input_data.run_id,        # Same run ID from start
                )
            )

    except Exception as e:
        # Step 10: Handle any unexpected errors during processing
        print(e)  # Log error for debugging

    # Step 11: Return streaming response to client
    # Step 11: Return streaming response to client
    return StreamingResponse(event_generator(), media_type="text/event-stream")

Step 5: Configure AG-UI protocol state management events

After defining AG-UI protocol lifecycle events, integrate AG-UI protocol state management events into your CrewAI agent workflow, as shown in the stock analysis CrewAI workflow in the agent/stock_analysis.py file.

# AG UI types for message handling and state management
from ag_ui.core.types import AssistantMessage, ToolMessage
from ag_ui.core.events import StateDeltaEvent, EventType

# ===============================================================================
# MAIN FLOW CLASS
# ===============================================================================

class StockAnalysisFlow(Flow):
    """
    Main workflow class that orchestrates the stock analysis process.
    This flow consists of multiple stages:
    1. start() - Initialize the system prompt with portfolio data
    2. chat() - Parse user input and extract investment parameters
    3. simulation() - Gather historical stock data
    4. allocation() - Calculate portfolio allocation and performance
    5. insights() - Generate bull/bear insights about the investments
    6. end() - Return final state
    """

    // ...

    @listen("start")
    async def chat(self):
        """
        Step 2: Parse user input and extract investment parameters
        - Create a tool log entry to show progress to the user
        - Use OpenAI to analyze the user's message and extract structured data
        - Return the next step based on whether structured data was extracted
        """
        try:
            // ...

            # Step 2.2: Emit state change event to update UI with new tool log
            self.state.get("emit_event")(
                StateDeltaEvent(
                    type=EventType.STATE_DELTA,
                    delta=[
                        {
                            "op": "add",
                            "path": "/tool_logs/-",
                            "value": {
                                "message": "Analyzing user query",
                                "status": "processing",
                                "id": tool_log_id,
                            },
                        }
                    ],
                )
            )
            await asyncio.sleep(0)  # Allow other tasks to run

            // ...

            # Step 2.4: Update tool log status to completed
            index = len(self.state['state']["tool_logs"]) - 1
            self.state.get("emit_event")(
                StateDeltaEvent(
                    type=EventType.STATE_DELTA,
                    delta=[
                        {
                            "op": "replace",
                            "path": f"/tool_logs/{index}/status",
                            "value": "completed",
                        }
                    ],
                )
            )
            await asyncio.sleep(0)

            // ...

        except Exception as e:
            # Step 2.6: Handle any errors during processing
            print(e)
            a_message = AssistantMessage(id=response.id, content="", role="assistant")
            self.state['state']["messages"].append(a_message)
            return "end"

Then in the FastAPI endpoint, initialize your CrewAI agent workflow state using STATE_SNAPSHOT AG-UI protocol state management event, as shown below.

@app.post("/crewai-agent")
async def crewai_agent(input_data: RunAgentInput):
    """
    Main API endpoint for processing stock analysis requests.

    This endpoint:
    1. Receives user input and current state from the frontend
    2. Streams real-time events back to the client during processing
    3. Runs the StockAnalysisFlow workflow asynchronously
    4. Returns results via Server-Sent Events (SSE) streaming

    Args:
        input_data (RunAgentInput): Contains user messages, tools, state, thread/run IDs

    Returns:
        StreamingResponse: Real-time stream of events during agent execution
    """
    try:

        async def event_generator():
            # Step 1: Initialize event streaming infrastructure
            encoder = EventEncoder()  # Converts events to SSE format

            // ...

            # Step 3: Send initial state snapshot to client
            # This provides the client with the current financial state before analysis
            yield encoder.encode(
                StateSnapshotEvent(
                    type=EventType.STATE_SNAPSHOT, 
                    snapshot={
                        "available_cash": input_data.state["available_cash"],        # Current cash balance
                        "investment_summary": input_data.state["investment_summary"], # Previous analysis results
                        "investment_portfolio": input_data.state["investment_portfolio"], # Current holdings
                        "tool_logs": []  # Reset tool logs for new analysis
                    }
                )
            )

            // ...

Step 6: Configure your CrewAI agent workflow with AG-UI protocol

Once you have initialized the CrewAI agent workflow state, integrate your CrewAI agent workflow with AG-UI protocol, as in the agent/main.py file.

@app.post("/crewai-agent")
async def crewai_agent(input_data: RunAgentInput):
    """
    Main API endpoint for processing stock analysis requests.

    This endpoint:
    1. Receives user input and current state from the frontend
    2. Streams real-time events back to the client during processing
    3. Runs the StockAnalysisFlow workflow asynchronously
    4. Returns results via Server-Sent Events (SSE) streaming

    Args:
        input_data (RunAgentInput): Contains user messages, tools, state, thread/run IDs

    Returns:
        StreamingResponse: Real-time stream of events during agent execution
    """
    try:

        async def event_generator():
            # Step 1: Initialize event streaming infrastructure
            encoder = EventEncoder()  # Converts events to SSE format

            // ...

            # Step 4: Initialize agent state with input data
            state = AgentState(
                tools=input_data.tools,                                      # Available tools
                messages=input_data.messages,                                # Conversation history
                be_stock_data=None,                                         # Will be populated during analysis
                be_arguments=None,                                          # Will be populated during analysis
                available_cash=input_data.state["available_cash"],          # Current cash
                investment_portfolio=input_data.state["investment_portfolio"], # Current portfolio
                tool_logs=[]                                                # Progress tracking
            )

            # Step 5: Launch the stock analysis workflow asynchronously
            # This creates a task that runs the StockAnalysisFlow in the background
            agent_task = asyncio.create_task(
                StockAnalysisFlow().kickoff_async(inputs={
                    "state": state,                                          # Agent state
                    "emit_event": emit_event,                               # Event emission callback
                    "investment_portfolio": input_data.state["investment_portfolio"]  # Portfolio data
                })
            )

            # Step 6: Event streaming loop - relay events from workflow to client
            while True:
                try:
                    # Try to get an event from the queue with a short timeout
                    event = await asyncio.wait_for(event_queue.get(), timeout=0.1)
                    yield encoder.encode(event)  # Stream the event to the client
                except asyncio.TimeoutError:
                    # No events in queue - check if workflow is complete
                    if agent_task.done():
                        break  # Exit loop when workflow finishes

            # Step 7: Clear tool logs after workflow completion
            # This prevents old progress logs from cluttering the UI
            yield encoder.encode(
            StateDeltaEvent(
                type=EventType.STATE_DELTA,
                delta=[
                    {
                        "op": "replace",
                        "path": "/tool_logs",
                        "value": []
                    }
                ]
            )
            )

            // ...

    except Exception as e:
        # Step 10: Handle any unexpected errors during processing
        print(e)  # Log error for debugging

    # Step 11: Return streaming response to client
    # Step 11: Return streaming response to client
    return StreamingResponse(event_generator(), media_type="text/event-stream")

Step 7: Configure AG-UI protocol tool events to handle Human-in-the-Loop breakpoint

After integrating your CrewAI agent workflow with AG-UI protocol, append a tool call message with a tool call name to the state, as shown in the cash allocation step in the agent/stock_analysis.py file.

# ===============================================================================
# MAIN FLOW CLASS
# ===============================================================================

class StockAnalysisFlow(Flow):
    """
    Main workflow class that orchestrates the stock analysis process.
    This flow consists of multiple stages:
    1. start() - Initialize the system prompt with portfolio data
    2. chat() - Parse user input and extract investment parameters
    3. simulation() - Gather historical stock data
    4. allocation() - Calculate portfolio allocation and performance
    5. insights() - Generate bull/bear insights about the investments
    6. end() - Return final state
    """

    // ...    

    @listen("simulation")
    async def allocation(self):
        """
        Step 4: Calculate portfolio allocation and performance simulation
        - Simulate buying stocks based on investment strategy (single-shot vs DCA)
        - Calculate returns, allocation percentages, and performance metrics
        - Compare portfolio performance against SPY (S&P 500) benchmark
        - Generate performance data for charting
        """
        # Step 4.1: Ensure we have tool calls with investment data
        if self.state['state']['messages'][-1].tool_calls is None:
            return "end"

           // ...

        # Step 4.18: Add a tool message indicating data extraction is complete
        self.state['state']["messages"].append(
            ToolMessage(
                role="tool",
                id=str(uuid.uuid4()),
                content="The relevant details had been extracted",
                tool_call_id=self.state['state']["messages"][-1].tool_calls[0].id,
            )
        )

        # Step 4.19: Add assistant message with chart rendering tool call
        self.state['state']["messages"].append(
            AssistantMessage(
                role="assistant",
                tool_calls=[
                    {
                        "id": str(uuid.uuid4()),
                        "type": "function",
                        "function": {
                            "name": "render_standard_charts_and_table",
                            "arguments": json.dumps(
                                {"investment_summary": self.state['state']["investment_summary"]}
                            ),
                        },
                    }
                ],
                id=str(uuid.uuid4()),
            )
        )

       // ...

Then, define AG-UI protocol tool call events that an agent can use to trigger frontend actions by calling the frontend action using a tool name to request user feedback, as shown in the agent/main.py file.

@app.post("/crewai-agent")
async def crewai_agent(input_data: RunAgentInput):
    """
    Main API endpoint for processing stock analysis requests.

    This endpoint:
    1. Receives user input and current state from the frontend
    2. Streams real-time events back to the client during processing
    3. Runs the StockAnalysisFlow workflow asynchronously
    4. Returns results via Server-Sent Events (SSE) streaming

    Args:
        input_data (RunAgentInput): Contains user messages, tools, state, thread/run IDs

    Returns:
        StreamingResponse: Real-time stream of events during agent execution
    """
    try:

        async def event_generator():
            # Step 1: Initialize event streaming infrastructure
            encoder = EventEncoder()  # Converts events to SSE format

            // ...

            # Step 8: Handle workflow results based on the final message type
            # Check if the last message is from the assistant (AI agent)
            if state["messages"][-1].role == "assistant":

                # Step 8.1: Handle tool call results (charts, data visualizations)
                if state["messages"][-1].tool_calls:
                    # The workflow generated a tool call (e.g., render charts)
                    # Stream tool call events to trigger UI rendering

                    yield encoder.encode(
                        ToolCallStartEvent(
                            type=EventType.TOOL_CALL_START,
                            tool_call_id=state["messages"][-1].tool_calls[0].id,
                            toolCallName=state["messages"][-1]
                            .tool_calls[0]
                            .function.name,
                        )
                    )

                    # Stream the tool call arguments (contains analysis results)
                    yield encoder.encode(
                        ToolCallArgsEvent(
                            type=EventType.TOOL_CALL_ARGS,
                            tool_call_id=state["messages"][-1].tool_calls[0].id,
                            delta=state["messages"][-1]
                            .tool_calls[0]
                            .function.arguments,  # Contains investment summary and insights
                        )
                    )

                    # Signal end of tool call
                    yield encoder.encode(
                        ToolCallEndEvent(
                            type=EventType.TOOL_CALL_END,
                            tool_call_id=state["messages"][-1].tool_calls[0].id,
                        )
                    )
                else:
                    # Step 8.2: Handle text message results (when no analysis was performed)
                    yield encoder.encode(
                        TextMessageStartEvent(
                            type=EventType.TEXT_MESSAGE_START,
                            message_id=message_id,
                            role="assistant",
                        )
                    )

                    # Step 8.2.1: Stream text content if available
                    if state["messages"][-1].content:
                        content = state["messages"][-1].content

                        # Split content into chunks for smooth streaming effect
                        n_parts = 100  # Number of chunks for streaming
                        part_length = max(1, len(content) // n_parts)
                        parts = [content[i:i+part_length] for i in range(0, len(content), part_length)]

                        # Ensure we don't exceed the target number of parts
                        if len(parts) > n_parts:
                            parts = parts[:n_parts-1] + [''.join(parts[n_parts-1:])]

                        # Stream each part with a small delay for smooth typing effect
                        for part in parts:
                            yield encoder.encode(
                                TextMessageContentEvent(
                                    type=EventType.TEXT_MESSAGE_CONTENT,
                                    message_id=message_id,
                                    delta=part,  # Chunk of text content
                                )
                            )
                            await asyncio.sleep(0.05)  # Small delay for typing effect
                    else:
                        # Step 8.2.2: Handle case where no content is available (error scenario)
                        yield encoder.encode(
                            TextMessageContentEvent(
                                type=EventType.TEXT_MESSAGE_CONTENT,
                                message_id=message_id,
                                delta="Something went wrong! Please try again.",
                            )
                        )

                    # Step 8.2.3: Signal end of text message
                    yield encoder.encode(
                        TextMessageEndEvent(
                            type=EventType.TEXT_MESSAGE_END,
                            message_id=message_id,
                        )
                    )

    except Exception as e:
        # Step 10: Handle any unexpected errors during processing
        print(e)  # Log error for debugging

    # Step 11: Return streaming response to client
    # Step 11: Return streaming response to client
    return StreamingResponse(event_generator(), media_type="text/event-stream")

Step 8: Configure AG-UI protocol text message events

Once you have configured AG-UI protocol tool events, define AG-UI protocol text message events to handle streaming agent responses to the frontend, as shown in the agent/main.py file.

@app.post("/crewai-agent")
async def crewai_agent(input_data: RunAgentInput):
    """
    Main API endpoint for processing stock analysis requests.

    This endpoint:
    1. Receives user input and current state from the frontend
    2. Streams real-time events back to the client during processing
    3. Runs the StockAnalysisFlow workflow asynchronously
    4. Returns results via Server-Sent Events (SSE) streaming

    Args:
        input_data (RunAgentInput): Contains user messages, tools, state, thread/run IDs

    Returns:
        StreamingResponse: Real-time stream of events during agent execution
    """
    try:

        async def event_generator():
            # Step 1: Initialize event streaming infrastructure
            encoder = EventEncoder()  # Converts events to SSE format

            // ...

            # Step 8: Handle workflow results based on the final message type
            # Check if the last message is from the assistant (AI agent)
            if state["messages"][-1].role == "assistant":

                // ...

                else:
                    # Step 8.2: Handle text message results (when no analysis was performed)
                    yield encoder.encode(
                        TextMessageStartEvent(
                            type=EventType.TEXT_MESSAGE_START,
                            message_id=message_id,
                            role="assistant",
                        )
                    )

                    # Step 8.2.1: Stream text content if available
                    if state["messages"][-1].content:
                        content = state["messages"][-1].content

                        # Split content into chunks for smooth streaming effect
                        n_parts = 100  # Number of chunks for streaming
                        part_length = max(1, len(content) // n_parts)
                        parts = [content[i:i+part_length] for i in range(0, len(content), part_length)]

                        # Ensure we don't exceed the target number of parts
                        if len(parts) > n_parts:
                            parts = parts[:n_parts-1] + [''.join(parts[n_parts-1:])]

                        # Stream each part with a small delay for smooth typing effect
                        for part in parts:
                            yield encoder.encode(
                                TextMessageContentEvent(
                                    type=EventType.TEXT_MESSAGE_CONTENT,
                                    message_id=message_id,
                                    delta=part,  # Chunk of text content
                                )
                            )
                            await asyncio.sleep(0.05)  # Small delay for typing effect
                    else:
                        # Step 8.2.2: Handle case where no content is available (error scenario)
                        yield encoder.encode(
                            TextMessageContentEvent(
                                type=EventType.TEXT_MESSAGE_CONTENT,
                                message_id=message_id,
                                delta="Something went wrong! Please try again.",
                            )
                        )

                    # Step 8.2.3: Signal end of text message
                    yield encoder.encode(
                        TextMessageEndEvent(
                            type=EventType.TEXT_MESSAGE_END,
                            message_id=message_id,
                        )
                    )

    except Exception as e:
        # Step 10: Handle any unexpected errors during processing
        print(e)  # Log error for debugging

    # Step 11: Return streaming response to client
    # Step 11: Return streaming response to client
    return StreamingResponse(event_generator(), media_type="text/event-stream")

Congratulations! You have integrated a CrewAI agent workflow with AG-UI protocol. Let’s now see how to add a frontend to the AG-UI + CrewAI agent workflow.

Integrating a frontend to the AG-UI + CrewAI agent workflow using CopilotKit

In this section, you will learn how to create a connection between your AG-UI + CrewAI agent workflow and a frontend using CopilotKit.

Let’s get started.

First, navigate to the frontend directory:

cd frontend

Next create a .env file with OpenAI API Key API key:

OPENAI_API_KEY=<<your-OpenAI-key-here>>

Then install the dependencies:

pnpm install

After that, start the development server:

pnpm run dev

Navigate to http://localhost:3000, and you should see the AG-UI + CrewAI agent frontend up and running.

Image from Notion

Let’s now see how to build the frontend UI for the AG-UI + CrewAI agent using CopilotKit.

Step 1: Create an HttpAgent instance

Before creating an HttpAgent instance, let’s first understand what an HttpAgent is.

HttpAgent is a client from the AG-UI Library that bridges your frontend application with any AG-UI-compatible AI agent’s server.

To create an HttpAgent instance, define it in an API route as shown in the src/app/api/copilotkit/route.ts file.

// ===============================================================================
// IMPORTS AND DEPENDENCIES
// ===============================================================================

// CopilotKit framework imports for AI assistant integration
import {
  CopilotRuntime, // Core runtime for managing AI agents
  copilotRuntimeNextJSAppRouterEndpoint, // Next.js App Router integration
  OpenAIAdapter, // Adapter for OpenAI API compatibility
} from "@copilotkit/runtime";

// Next.js framework imports
import { NextRequest } from "next/server"; // Type for Next.js request objects

// AG UI client for communicating with external agents
import { HttpAgent } from "@ag-ui/client"; // HTTP client for agent communication

// ===============================================================================
// AGENT CONFIGURATION
// ===============================================================================

/**
 * Step 1: Configure the CrewAI stock analysis agent
 *
 * This HttpAgent connects to our FastAPI backend running the stock analysis workflow.
 * It acts as a bridge between the CopilotKit frontend and the Python-based agent.
 */
const crewaiAgent = new HttpAgent({
  // Development URL (commented out for reference)
  // url: "http://0.0.0.0:8000/crewai-agent",

  // Production-ready URL configuration with environment variable fallback
  // Uses NEXT_PUBLIC_CREWAI_URL if set, otherwise defaults to local development URL
  url: process.env.NEXT_PUBLIC_CREWAI_URL || "http://0.0.0.0:8000/crewai-agent",
});

// ===============================================================================
// COPILOT RUNTIME SETUP
// ===============================================================================

/**
 * Step 2: Configure the OpenAI service adapter
 *
 * This adapter provides OpenAI API compatibility for the CopilotKit runtime.
 * It handles communication with OpenAI's language models for natural language processing.
 */
const serviceAdapter = new OpenAIAdapter();

/**
 * Step 3: Initialize the CopilotKit runtime
 *
 * The runtime orchestrates communication between:
 * - Frontend UI components
 * - Language models (via OpenAI adapter)
 * - External agents (our stock analysis agent)
 */
const runtime = new CopilotRuntime({
  agents: {
    // Register our stock analysis agent with the runtime
    // @ts-ignore - Suppressing TypeScript warning for agent type compatibility
    crewaiAgent: crewaiAgent,
  },
});

// Alternative simple runtime configuration (commented out)
// const runtime = new CopilotRuntime()

// ===============================================================================
// API ROUTE HANDLER
// ===============================================================================

/**
 * Step 4: Define the POST endpoint handler
 *
 * This Next.js API route handles incoming requests from the CopilotKit frontend.
 * It processes user interactions, manages agent communication, and streams responses.
 *
 * @param req - Next.js request object containing user input and session data
 * @returns Promise<Response> - Streaming response with agent results
 */
export const POST = async (req: NextRequest) => {
  /**
   * Step 4.1: Create the request handler using CopilotKit's Next.js integration
   *
   * This function:
   * - Extracts user messages and state from the request
   * - Routes requests to appropriate agents
   * - Manages streaming responses back to the frontend
   * - Handles error scenarios gracefully
   */
  const { handleRequest } = copilotRuntimeNextJSAppRouterEndpoint({
    runtime, // The configured CopilotKit runtime with our agents
    serviceAdapter, // OpenAI adapter for language model integration
    endpoint: "/api/copilotkit", // The API endpoint path for this handler
  });

  /**
   * Step 4.2: Process the incoming request
   *
   * The handleRequest function:
   * - Parses the request body for user input
   * - Determines which agent should handle the request
   * - Forwards the request to the appropriate agent (crewaiAgent)
   * - Streams the agent's response back to the client
   */
  return handleRequest(req);
};

Step 2: Set up CopilotKit provider

To set up the CopilotKit Provider, the [<CopilotKit>](https://docs.copilotkit.ai/reference/components/CopilotKit) component must wrap the Copilot-aware parts of your application.

For most use cases, it's appropriate to wrap the CopilotKit provider around the entire app, e.g., in your layout.tsx, as shown below in the src/app/layout.tsx file.

// Next.js imports for metadata and font handling
import type { Metadata } from "next";
import { Geist, Geist_Mono } from "next/font/google";
// Global styles for the application
import "./globals.css";
// CopilotKit UI styles for AI components
import "@copilotkit/react-ui/styles.css";
// CopilotKit core component for AI functionality
import { CopilotKit } from "@copilotkit/react-core";

// Configure Geist Sans font with CSS variables for consistent typography
const geistSans = Geist({
  variable: "--font-geist-sans",
  subsets: ["latin"],
});

// Configure Geist Mono font for code and monospace text
const geistMono = Geist_Mono({
  variable: "--font-geist-mono",
  subsets: ["latin"],
});

// Metadata configuration for SEO and page information
export const metadata: Metadata = {
  title: "AI Stock Portfolio",
  description: "AI Stock Portfolio",
};

// Root layout component that wraps all pages in the application
export default function RootLayout({
  children,
}: Readonly<{
  children: React.ReactNode;
}>) {
  return (
    <html lang="en">
      <body
        className={`${geistSans.variable} ${geistMono.variable} antialiased`}>
        {/* CopilotKit wrapper that enables AI functionality throughout the app */}
        {/* runtimeUrl points to the API endpoint for AI backend communication */}
        {/* agent specifies which AI agent to use (stockAgent for stock analysis) */}
        <CopilotKit runtimeUrl="/api/copilotkit" agent="crewaiAgent">
          {children}
        </CopilotKit>
      </body>
    </html>
  );
}

Step 3: Set up a Copilot chat component

CopilotKit ships with a number of built-in chat components, which include CopilotPopup, CopilotSidebar, and CopilotChat.

To set up a Copilot chat component, define it as shown in the src/app/components/prompt-panel.tsx file.

// Client-side component directive for Next.js
"use client";

import type React from "react";
// CopilotKit chat component for AI interactions
import { CopilotChat } from "@copilotkit/react-ui";

// Props interface for the PromptPanel component
interface PromptPanelProps {
  // Amount of available cash for investment, displayed in the panel
  availableCash: number;
}

// Main component for the AI chat interface panel
export function PromptPanel({ availableCash }: PromptPanelProps) {
  // Utility function to format numbers as USD currency
  // Removes decimal places for cleaner display of large amounts
  const formatCurrency = (amount: number) => {
    return new Intl.NumberFormat("en-US", {
      style: "currency",
      currency: "USD",
      minimumFractionDigits: 0,
      maximumFractionDigits: 0,
    }).format(amount);
  };

  return (
    // Main container with full height and white background
    <div className="h-full flex flex-col bg-white">
      {/* Header section with title, description, and cash display */}
      <div className="p-4 border-b border-[#D8D8E5] bg-[#FAFCFA]">
        {/* Title section with icon and branding */}
        <div className="flex items-center gap-2 mb-2">
          <span className="text-xl">🪁</span>
          <div>
            <h1 className="text-lg font-semibold text-[#030507] font-['Roobert']">
              Portfolio Chat
            </h1>
            {/* Pro badge indicator */}
            <div className="inline-block px-2 py-0.5 bg-[#BEC9FF] text-[#030507] text-xs font-semibold uppercase rounded">
              PRO
            </div>
          </div>
        </div>
        {/* Description of the AI agent's capabilities */}
        <p className="text-xs text-[#575758]">
          Interact with the LangGraph-powered AI agent for portfolio
          visualization and analysis
        </p>

        {/* Available Cash Display section */}
        <div className="mt-3 p-2 bg-[#86ECE4]/10 rounded-lg">
          <div className="text-xs text-[#575758] font-medium">
            Available Cash
          </div>
          <div className="text-sm font-semibold text-[#030507] font-['Roobert']">
            {formatCurrency(availableCash)}
          </div>
        </div>
      </div>
      {/* CopilotKit chat interface with custom styling and initial message */}
      {/* Takes up majority of the panel height for conversation */}
      <CopilotChat
        className="h-[78vh] p-2"
        labels={{
          // Initial welcome message explaining the AI agent's capabilities and limitations
          initial: `I am a Crew AI agent designed to analyze investment opportunities and track stock performance over time. How can I help you with your investment query? For example, you can ask me to analyze a stock like "Invest in Apple with 10k dollars since Jan 2023". \n\nNote: The AI agent has access to stock data from the past 4 years only`,
        }}
      />
    </div>
  );
}

Step 4: Sync AG-UI + CrewAI agent state with the frontend using CopilotKit hooks

In CopilotKit, CoAgents maintain a shared state that seamlessly connects your frontend UI with the agent's execution. This shared state system allows you to:

  • Display the agent's current progress and intermediate results
  • Update the agent's state through UI interactions
  • React to state changes in real-time across your application

You can learn more about CoAgents’ shared state here on the CopilotKit docs.

Image from Notion

To sync your AG-UI + CrewAI agent state with the frontend, use the CopilotKit useCoAgent hook to share the AG-UI + CrewAI agent state with your frontend, as shown in the src/app/page.tsx file.

"use client";

import {
  useCoAgent,
} from "@copilotkit/react-core";

// ...

export interface SandBoxPortfolioState {
  performanceData: Array<{
    date: string;
    portfolio: number;
    spy: number;
  }>;
}
export interface InvestmentPortfolio {
  ticker: string;
  amount: number;
}

export default function OpenStocksCanvas() {

  // ...

  const [totalCash, setTotalCash] = useState(1000000);

  const { state, setState } = useCoAgent({
    name: "crewaiAgent",
    initialState: {
      available_cash: totalCash,
      investment_summary: {} as any,
      investment_portfolio: [] as InvestmentPortfolio[],
    },
  });

    // ...

  return (
    <div className="h-screen bg-[#FAFCFA] flex overflow-hidden">
       {/* ... */}
    </div>
  );
}

Then render the AG-UI + CrewAI agent's state in the chat UI, which is useful for informing the user about the agent's state in a more in-context way.

To render the AG-UI + CrewAI agent's state in the chat UI, you can use the useCoAgentStateRender hook, as shown in the src/app/page.tsx file.

"use client";

import {
  useCoAgentStateRender,
} from "@copilotkit/react-core";

import { ToolLogs } from "./components/tool-logs";

// ...

export default function OpenStocksCanvas() {

  // ...

  useCoAgentStateRender({
    name: "crewaiAgent",
    render: ({ state }) => <ToolLogs logs={state.tool_logs} />,
  });

  // ...

  return (
    <div className="h-screen bg-[#FAFCFA] flex overflow-hidden">
      {/* ... */}
    </div>
  );
}

If you execute a query in the chat, you should see the AG-UI + CrewAI agent’s state task execution rendered in the chat UI, as shown below.

Image from Notion

Step 5: Implementing Human-in-the-Loop (HITL) in the frontend

Human-in-the-loop (HITL) allows agents to request human input or approval during execution, making AI systems more reliable and trustworthy. This pattern is essential when building AI applications that need to handle complex decisions or actions that require human judgment.

You can learn more about Human in the Loop here on CopilotKit docs.

To implement Human-in-the-Loop (HITL) in the frontend, you need to use the CopilotKit useCopilotKitAction hook with the renderAndWaitForResponse method, which allows returning values asynchronously from the render function, as shown in the src/app/page.tsx file.

"use client";

import {
  useCopilotAction,
} from "@copilotkit/react-core";

// ...

export default function OpenStocksCanvas() {

  // ...

  useCopilotAction({
    name: "render_standard_charts_and_table",
    description:
      "This is an action to render a standard chart and table. The chart can be a bar chart or a line chart. The table can be a table of data.",
    renderAndWaitForResponse: ({ args, respond, status }) => {
      useEffect(() => {
        console.log(args, "argsargsargsargsargsaaa");
      }, [args]);
      return (
        <>
          {args?.investment_summary?.percent_allocation_per_stock &&
            args?.investment_summary?.percent_return_per_stock &&
            args?.investment_summary?.performanceData && (
              <>
                <div className="flex flex-col gap-4">
                  <LineChartComponent
                    data={args?.investment_summary?.performanceData}
                    size="small"
                  />
                  <BarChartComponent
                    data={Object.entries(
                      args?.investment_summary?.percent_return_per_stock
                    ).map(([ticker, return1]) => ({
                      ticker,
                      return: return1 as number,
                    }))}
                    size="small"
                  />
                  <AllocationTableComponent
                    allocations={Object.entries(
                      args?.investment_summary?.percent_allocation_per_stock
                    ).map(([ticker, allocation]) => ({
                      ticker,
                      allocation: allocation as a number,
                      currentValue:
                        args?.investment_summary.final_prices[ticker] *
                        args?.investment_summary.holdings[ticker],
                      totalReturn:
                        args?.investment_summary.percent_return_per_stock[
                          ticker
                        ],
                    }))}
                    size="small"
                  />
                </div>

                <button
                  hidden={status == "complete"}
                  className="mt-4 rounded-full px-6 py-2 bg-green-50 text-green-700 border border-green-200 shadow-sm hover:bg-green-100 transition-colors font-semibold text-sm"
                  onClick={() => {
                    debugger;
                    if (respond) {
                      setTotalCash(args?.investment_summary?.cash);
                      setCurrentState({
                        ...currentState,
                        returnsData: Object.entries(
                          args?.investment_summary?.percent_return_per_stock
                        ).map(([ticker, return1]) => ({
                          ticker,
                          return: return1 as number,
                        })),
                        allocations: Object.entries(
                          args?.investment_summary?.percent_allocation_per_stock
                        ).map(([ticker, allocation]) => ({
                          ticker,
                          allocation: allocation as a number,
                          currentValue:
                            args?.investment_summary?.final_prices[ticker] *
                            args?.investment_summary?.holdings[ticker],
                          totalReturn:
                            args?.investment_summary?.percent_return_per_stock[
                              ticker
                            ],
                        })),
                        performanceData:
                          args?.investment_summary?.performanceData,
                        bullInsights: args?.insights?.bullInsights || [],
                        bearInsights: args?.insights?.bearInsights || [],
                        currentPortfolioValue:
                          args?.investment_summary?.total_value,
                        totalReturns: (
                          Object.values(
                            args?.investment_summary?.returns
                          ) as number[]
                        ).reduce((acc, val) => acc + val, 0),
                      });
                      setInvestedAmount(
                        (
                          Object.values(
                            args?.investment_summary?.total_invested_per_stock
                          ) as number[]
                        ).reduce((acc, val) => acc + val, 0)
                      );
                      setState({
                        ...state,
                        available_cash: totalCash,
                      });
                      respond(
                        "Data rendered successfully. Provide a summary of the investments by not making any tool calls."
                      );
                    }
                  }}>
                  Accept
                </button>
                <button
                  hidden={status == "complete"}
                  className="rounded-full px-6 py-2 bg-red-50 text-red-700 border border-red-200 shadow-sm hover:bg-red-100 transition-colors font-semibold text-sm ml-2"
                  onClick={() => {
                    debugger;
                    if (respond) {
                      respond(
                        "Data rendering rejected. Just give a summary of the rejected investments by not making any tool calls."
                      );
                    }
                  }}>
                  Reject
                </button>
              </>
            )}
        </>
      );
    },
  });

  // ...

  return (
    <div className="h-screen bg-[#FAFCFA] flex overflow-hidden">
      {/* ... */}
    </div>
  );
}

When an agent triggers frontend actions by tool/action name to request human input or feedback during execution, the end-user is prompted with a choice (rendered inside the chat UI). Then the user can choose by pressing a button in the chat UI, as shown below.

Image from Notion

Step 6: Streaming AG-UI + CrewAI agent responses in the frontend

To stream your AG-UI + CrewAI agent responses or results in the frontend, pass the agent’s state field values to the frontend components, as shown in the src/app/page.tsx file.

"use client";

import { useEffect, useState } from "react";
import { PromptPanel } from "./components/prompt-panel";
import { GenerativeCanvas } from "./components/generative-canvas";
import { ComponentTree } from "./components/component-tree";
import { CashPanel } from "./components/cash-panel";

// ...

export default function OpenStocksCanvas() {
  const [currentState, setCurrentState] = useState<PortfolioState>({
    id: "",
    trigger: "",
    performanceData: [],
    allocations: [],
    returnsData: [],
    bullInsights: [],
    bearInsights: [],
    currentPortfolioValue: 0,
    totalReturns: 0,
  });
  const [sandBoxPortfolio, setSandBoxPortfolio] = useState<
    SandBoxPortfolioState[]
  >([]);
  const [selectedStock, setSelectedStock] = useState<string | null>(null);


  return (
    <div className="h-screen bg-[#FAFCFA] flex overflow-hidden">
      {/* Left Panel - Prompt Input */}
      <div className="w-85 border-r border-[#D8D8E5] bg-white flex-shrink-0">
        <PromptPanel availableCash={totalCash} />
      </div>

      {/* Center Panel - Generative Canvas */}
      <div className="flex-1 relative min-w-0">
        {/* Top Bar with Cash Info */}
        <div className="absolute top-0 left-0 right-0 bg-white border-b border-[#D8D8E5] p-4 z-10">
          <CashPanel
            totalCash={totalCash}
            investedAmount={investedAmount}
            currentPortfolioValue={
              totalCash + investedAmount + currentState.totalReturns || 0
            }
            onTotalCashChange={setTotalCash}
            onStateCashChange={setState}
          />
        </div>

        <div className="pt-20 h-full">
          <GenerativeCanvas
            setSelectedStock={setSelectedStock}
            portfolioState={currentState}
            sandBoxPortfolio={sandBoxPortfolio}
            setSandBoxPortfolio={setSandBoxPortfolio}
          />
        </div>
      </div>

      {/* Right Panel - Component Tree (Optional) */}
      {showComponentTree && (
        <div className="w-64 border-l border-[#D8D8E5] bg-white flex-shrink-0">
          <ComponentTree portfolioState={currentState} />
        </div>
      )}
    </div>
  );
}

If you query your agent and approve its feedback request, you should see the agent’s response or results streaming in the UI, as shown below.

Conclusion

In this guide, we have walked through the steps of integrating CrewAI agents with AG-UI protocol and then adding a frontend to the agents using CopilotKit.

While we’ve explored a couple of features, we have barely scratched the surface of the countless use cases for CopilotKit, ranging from building interactive AI chatbots to building agentic solutions—in essence, CopilotKit lets you add a ton of useful AI capabilities to your products in minutes.

Hopefully, this guide makes it easier for you to integrate AI-powered Copilots into your existing application.

Follow CopilotKit on Twitter and say hi, and if you'd like to build something cool, join the Discord community.

Subscribe to the newsletter

Get notified of the latest news and updates.