AWSAILangGraphFastAPI

Building an AI Research Agent with LangGraph, Claude, and AWS

How I built a production AI research agent using LangGraph ReAct, Claude, and AWS. Deep dive into autonomous tool use, SSE streaming, multi-turn conversations, and deploying on ECS Fargate with CDK.

May 3, 2026·10 min read·Sanjay Patoliya·⭐ View on GitHub

What I Built

An AI research agent that answers any question by searching the web, Wikipedia, and academic papers — then streams its reasoning back to the browser in real time.

User flow:

  1. Ask any research question in natural language
  2. Watch the agent decide which tools to call — live, as it happens
  3. Read a structured report: Summary, Key Findings, Academic Research, Sources, Conclusion
  4. Ask follow-up questions — the agent remembers the full conversation

The key difference from a simple chatbot: this agent is autonomous. It decides when to search, what to search for, and which source to use. Claude does the reasoning; LangGraph controls the loop.


Architecture

Browser
  │
  ▼
CloudFront (HTTPS)
  ├── /* ──────────────► S3 (React static files)
  └── /api/v1/* ───────► ALB
                           │
                           ▼
                      ECS Fargate (FastAPI)
                           │
                           ▼
                      LangGraph ReAct Agent
                           │
              ┌────────────┼────────────┐
              ▼            ▼            ▼
        Tavily API    Wikipedia API  arXiv API
      (web search)   (background    (academic
                       facts)        papers)
              └────────────┼────────────┘
                           ▼
                     Anthropic Claude
                   (reasoning + answer)
                           │
                           ▼
                   SSE Stream → Browser
               (tokens + tool trace events)
                           │
                           ▼
                       DynamoDB
                  (conversation history)

Same CloudFront pattern as my resume analyzer/api/v1/* proxies to the ALB so the frontend has a single HTTPS endpoint with no CORS issues.


Tech Stack

LayerTechnology
FrontendReact 18 + TypeScript + Vite + TailwindCSS
BackendFastAPI (Python 3.12)
AI AgentLangGraph ReAct + Anthropic Claude (claude-sonnet-4-6)
Web SearchTavily API
KnowledgeWikipedia API
Academic ResearcharXiv API
PersistenceAmazon DynamoDB
StreamingServer-Sent Events (SSE)
HostingECS Fargate + ALB + CloudFront
IaCAWS CDK (Python)

AWS Services Used

ServicePurpose
ECS FargateServerless container hosting for FastAPI
Application Load BalancerRoutes traffic to ECS tasks
CloudFrontCDN + HTTPS termination + SSE proxy
S3React static file hosting
DynamoDBConversation history with GSI
SSM Parameter StoreSecure API key storage (Anthropic + Tavily)
ECRDocker image registry
VPC + NAT GatewayPrivate network with outbound internet for API calls
AWS CDK (Python)Infrastructure as Code

The Core: LangGraph ReAct Agent

The most interesting part of this project is the agent itself. Instead of a fixed pipeline (step 1 → step 2 → step 3), a ReAct agent reasons its way to an answer:

  1. Reason — think about what information is needed
  2. Act — call a tool (web search, Wikipedia, arXiv)
  3. Observe — read the result
  4. Repeat — until it has enough to answer

LangGraph models this as a StateGraph — a directed graph where each node is either the Claude model or a tool executor, and edges define when to call tools vs. when to return the final answer.

# agent/graph.py
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode

def build_agent(tools: list) -> CompiledGraph:
    model = ChatAnthropic(model=settings.model_name).bind_tools(tools)

    def call_model(state: AgentState) -> dict:
        messages = [SystemMessage(content=SYSTEM_PROMPT)] + state["messages"]
        response = model.invoke(messages)
        return {"messages": [response]}

    def should_continue(state: AgentState) -> str:
        last = state["messages"][-1]
        return "tools" if last.tool_calls else END

    graph = StateGraph(AgentState)
    graph.add_node("agent", call_model)
    graph.add_node("tools", ToolNode(tools))
    graph.set_entry_point("agent")
    graph.add_conditional_edges("agent", should_continue)
    graph.add_edge("tools", "agent")
    return graph.compile()

The should_continue function is the key decision point: if Claude's response includes tool calls, route to the tools node; otherwise, the answer is ready and the graph ends.

AgentState and Conversation Memory

AgentState holds the full message history using LangGraph's add_messages reducer — new messages are appended to the list rather than replacing it:

# agent/state.py
from langgraph.graph import add_messages
from typing import Annotated
from langchain_core.messages import BaseMessage

class AgentState(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]

This single field is all that's needed for multi-turn memory. Each conversation starts from DynamoDB history, so follow-up questions have full context of everything said before.


The Three Research Tools

The agent has three tools. Claude autonomously decides which ones to call based on the question.

Tavily — Web Search

Tavily is purpose-built for LLM agents. It returns clean, structured results without the noise of a raw search engine.

# agent/tools.py
from langchain_community.tools.tavily_search import TavilySearchResults

def get_tools() -> list:
    web_search = TavilySearchResults(
        max_results=5,
        description="Search the web for current information, news, and general knowledge."
    )
    ...

Wikipedia — Background Facts

For well-established topics, Wikipedia gives reliable background without burning Tavily quota.

@tool
def wikipedia_search(query: str) -> str:
    """Search Wikipedia for background information on a topic."""
    wiki = wikipediaapi.Wikipedia(language="en", user_agent="ai-research-agent/1.0")
    page = wiki.page(query)
    if not page.exists():
        return f"No Wikipedia article found for: {query}"
    # Truncate to 3000 chars to stay within Claude's context budget
    return page.summary[:3000]

arXiv — Academic Papers

For scientific or technical questions, the agent searches arXiv for peer-reviewed research. This differentiates the output from a standard web search.

@tool
def arxiv_search(query: str) -> str:
    """Search arXiv for academic papers and research on a topic."""
    import arxiv
    results = arxiv.Search(query=query, max_results=3,
                           sort_by=arxiv.SortCriterion.Relevance)
    papers = []
    for r in arxiv.Client().results(results):
        papers.append(f"Title: {r.title}\nAuthors: {', '.join(a.name for a in r.authors)}\n"
                      f"Summary: {r.summary[:500]}\nURL: {r.entry_id}")
    return "\n\n---\n\n".join(papers) if papers else "No papers found."

SSE Streaming — Tokens + Live Agent Trace

This is the part that makes the UI feel alive. Instead of waiting 20 seconds for a complete answer, the browser receives two types of events simultaneously:

  • Trace events — every tool call the agent makes, shown in a live panel
  • Token events — the answer streaming word by word

The SSE event protocol:

data: {"type": "conversation_id", "conversation_id": "abc-123"}

data: {"type": "trace", "step": "tool_start", "tool": "web_search", "input": "LangGraph tutorial"}

data: {"type": "trace", "step": "tool_end", "tool": "web_search", "output": "LangGraph is..."}

data: {"type": "token", "content": "LangGraph is a "}

data: {"type": "token", "content": "framework for building..."}

data: {"type": "done"}

The streaming endpoint uses LangGraph's astream_events — an async generator that yields every event in the agent's execution graph:

# routers/chat.py
async def stream_agent_response(message: str, conversation_id: str):
    history = await dynamodb_service.get_messages(conversation_id)
    state = {"messages": history + [HumanMessage(content=message)]}

    async for event in agent.astream_events(state, version="v2"):
        kind = event["event"]

        if kind == "on_tool_start":
            yield f'data: {{"type":"trace","step":"tool_start","tool":"{event["name"]}","input":"{event["data"].get("input","")[:200]}"}}\n\n'

        elif kind == "on_tool_end":
            yield f'data: {{"type":"trace","step":"tool_end","tool":"{event["name"]}","output":"{str(event["data"].get("output",""))[:300]}"}}\n\n'

        elif kind == "on_chat_model_stream":
            content = event["data"]["chunk"].content
            if content:
                yield f'data: {json.dumps({"type": "token", "content": content})}\n\n'

    yield 'data: {"type": "done"}\n\n'

The FastAPI endpoint sets media_type="text/event-stream" and returns a StreamingResponse — no WebSockets needed.


Multi-turn Conversations

Every conversation is stored in DynamoDB. When the user sends a follow-up question, the full history is loaded and passed back into the agent — so it has full context of everything discussed.

DynamoDB Schema

The table uses a single-table design with a GSI for listing conversations by recency:

KeyValuePurpose
pkCONV#{conversation_id}Partition key
skMETA or MSG#{timestamp}Sort key
entity_typeconversation or messageGSI partition key
updated_atISO timestampGSI sort key

The GSI (entity-type-index) makes it efficient to list all conversations sorted by most recently updated — without scanning the full table.

# services/dynamodb_service.py
async def save_messages(conversation_id: str, messages: list[BaseMessage]) -> None:
    with table.batch_writer() as batch:
        for i, msg in enumerate(messages):
            batch.put_item(Item={
                "pk": f"CONV#{conversation_id}",
                "sk": f"MSG#{i:06d}",
                "entity_type": "message",
                "role": msg.type,
                "content": msg.content,
                "updated_at": datetime.utcnow().isoformat(),
            })

API Endpoints

MethodEndpointDescription
GET/healthHealth check
POST/api/v1/chat/streamStream agent response + tool trace via SSE
GET/api/v1/conversationsList all conversations
GET/api/v1/conversations/{id}Get conversation with full message history
DELETE/api/v1/conversations/{id}Delete a conversation

Infrastructure with AWS CDK

Two stacks — simpler than the resume analyzer because the frontend is a pure static site with no server-side rendering.

BackendStack

VPC with 2 AZs, 1 NAT Gateway, ECS Fargate service, and ALB. Both API keys are injected from SSM Parameter Store at container startup.

# stacks/backend_stack.py (simplified)
fargate_service = ecs_patterns.ApplicationLoadBalancedFargateService(
    self, "BackendService",
    cluster=cluster,
    cpu=512,
    memory_limit_mib=1024,
    task_image_options=ecs_patterns.ApplicationLoadBalancedTaskImageOptions(
        image=ecs.ContainerImage.from_ecr_repository(ecr_repo),
        environment={"DYNAMODB_TABLE": table.table_name},
        secrets={
            "ANTHROPIC_API_KEY": ecs.Secret.from_ssm_parameter(anthropic_key),
            "TAVILY_API_KEY": ecs.Secret.from_ssm_parameter(tavily_key),
        },
    ),
)
# Long agent reasoning chains need more than the default 60s
fargate_service.load_balancer.set_attribute("idle_timeout.timeout_seconds", "120")

FrontendStack

CloudFront with two important timeout settings for SSE:

# stacks/frontend_stack.py (simplified)
api_behaviour = cloudfront.BehaviorOptions(
    origin=alb_origin,
    viewer_protocol_policy=cloudfront.ViewerProtocolPolicy.REDIRECT_TO_HTTPS,
    cache_policy=cloudfront.CachePolicy.CACHING_DISABLED,
    origin_request_policy=cloudfront.OriginRequestPolicy.ALL_VIEWER,
    # SSE streams can run for 60-120s — increase read timeout
    # Default is 30s; streaming connections silently drop without this
)

The CloudFront read timeout (default 30s) must be extended for SSE. Unlike REST endpoints, SSE connections stay open while the agent reasons and writes — they can easily exceed 30 seconds on complex questions.


Testing Strategy

Backend — pytest with mocked astream_events and DynamoDB. No real API calls needed.

cd backend
pytest                          # all tests
pytest tests/agent/             # ReAct agent + tools
pytest tests/routers/           # API endpoints + SSE format
pytest --cov=app                # with coverage report
Test fileWhat it covers
test_state.pyadd_messages appends correctly, preserves history
test_tools.pyWikipedia found/not found/truncation, arXiv search, get_tools returns 3 tools
test_chat.pySSE event format, trace events, token events, done/error, input validation
test_conversations.pyList/get/delete endpoints, 404 handling

Frontend — Vitest + Testing Library.

cd frontend
npm test
npm run test:coverage

Lessons Learned

1. ReAct agents decide — you don't have to hardcode logic A fixed pipeline would need explicit code for "if scientific question → use arXiv". The ReAct loop lets Claude make that call based on context. The agent often combines all three tools on a single question.

2. CloudFront read timeout must be increased for SSE The default CloudFront read timeout is 30 seconds. SSE connections for complex research questions run longer. Without setting this to 120s, streams silently drop mid-response — no error, just a frozen browser. This was the hardest bug to diagnose.

3. ALB idle timeout must also be extended Both CloudFront (read timeout) and ALB (idle timeout) need to be set to 120s. Fixing only one still drops long connections. The ALB's default 60s kicks in before CloudFront's if not changed.

4. NAT Gateway is required for private ECS tasks ECS tasks in a private subnet need a NAT Gateway to reach Anthropic, Tavily, Wikipedia, and arXiv. Without it, all external API calls silently time out — no DNS error, just a hanging request.

5. add_messages is all you need for conversation memory LangGraph's add_messages reducer appends new messages to the state list automatically. Passing the full DynamoDB history into the initial state gives the agent complete context of past turns with zero extra logic.

6. Build Docker images for linux/amd64 on Apple Silicon ECS Fargate runs on x86 by default. Building on an M-series Mac without specifying the platform creates an arm64 image that won't start on Fargate. Always add --platform linux/amd64 to the Docker build, or set it in the Dockerfile.

7. Two SSM parameters — both injected at container startup This project needs two API keys: Anthropic and Tavily. Both are stored as SSM SecureString and injected as environment variables at container startup. Keys never appear in CDK output, CloudFormation templates, or Docker images.


GitHub

The full source code is available on GitHub:

👉 github.com/sanjaypatoliya/ai-research-agent


About the Author

I'm Sanjay Patoliya — AWS Certified Solutions Architect & DevOps Engineer building production-ready AI systems on AWS. If you're looking to build something similar or need a remote AWS + AI engineer, feel free to reach out.