What I Built
An AI research agent that answers any question by searching the web, Wikipedia, and academic papers — then streams its reasoning back to the browser in real time.
User flow:
- Ask any research question in natural language
- Watch the agent decide which tools to call — live, as it happens
- Read a structured report: Summary, Key Findings, Academic Research, Sources, Conclusion
- Ask follow-up questions — the agent remembers the full conversation
The key difference from a simple chatbot: this agent is autonomous. It decides when to search, what to search for, and which source to use. Claude does the reasoning; LangGraph controls the loop.
Architecture
Browser
│
▼
CloudFront (HTTPS)
├── /* ──────────────► S3 (React static files)
└── /api/v1/* ───────► ALB
│
▼
ECS Fargate (FastAPI)
│
▼
LangGraph ReAct Agent
│
┌────────────┼────────────┐
▼ ▼ ▼
Tavily API Wikipedia API arXiv API
(web search) (background (academic
facts) papers)
└────────────┼────────────┘
▼
Anthropic Claude
(reasoning + answer)
│
▼
SSE Stream → Browser
(tokens + tool trace events)
│
▼
DynamoDB
(conversation history)
Same CloudFront pattern as my resume analyzer — /api/v1/* proxies to the ALB so the frontend has a single HTTPS endpoint with no CORS issues.
Tech Stack
| Layer | Technology |
|---|---|
| Frontend | React 18 + TypeScript + Vite + TailwindCSS |
| Backend | FastAPI (Python 3.12) |
| AI Agent | LangGraph ReAct + Anthropic Claude (claude-sonnet-4-6) |
| Web Search | Tavily API |
| Knowledge | Wikipedia API |
| Academic Research | arXiv API |
| Persistence | Amazon DynamoDB |
| Streaming | Server-Sent Events (SSE) |
| Hosting | ECS Fargate + ALB + CloudFront |
| IaC | AWS CDK (Python) |
AWS Services Used
| Service | Purpose |
|---|---|
| ECS Fargate | Serverless container hosting for FastAPI |
| Application Load Balancer | Routes traffic to ECS tasks |
| CloudFront | CDN + HTTPS termination + SSE proxy |
| S3 | React static file hosting |
| DynamoDB | Conversation history with GSI |
| SSM Parameter Store | Secure API key storage (Anthropic + Tavily) |
| ECR | Docker image registry |
| VPC + NAT Gateway | Private network with outbound internet for API calls |
| AWS CDK (Python) | Infrastructure as Code |
The Core: LangGraph ReAct Agent
The most interesting part of this project is the agent itself. Instead of a fixed pipeline (step 1 → step 2 → step 3), a ReAct agent reasons its way to an answer:
- Reason — think about what information is needed
- Act — call a tool (web search, Wikipedia, arXiv)
- Observe — read the result
- Repeat — until it has enough to answer
LangGraph models this as a StateGraph — a directed graph where each node is either the Claude model or a tool executor, and edges define when to call tools vs. when to return the final answer.
# agent/graph.py
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
def build_agent(tools: list) -> CompiledGraph:
model = ChatAnthropic(model=settings.model_name).bind_tools(tools)
def call_model(state: AgentState) -> dict:
messages = [SystemMessage(content=SYSTEM_PROMPT)] + state["messages"]
response = model.invoke(messages)
return {"messages": [response]}
def should_continue(state: AgentState) -> str:
last = state["messages"][-1]
return "tools" if last.tool_calls else END
graph = StateGraph(AgentState)
graph.add_node("agent", call_model)
graph.add_node("tools", ToolNode(tools))
graph.set_entry_point("agent")
graph.add_conditional_edges("agent", should_continue)
graph.add_edge("tools", "agent")
return graph.compile()
The should_continue function is the key decision point: if Claude's response includes tool calls, route to the tools node; otherwise, the answer is ready and the graph ends.
AgentState and Conversation Memory
AgentState holds the full message history using LangGraph's add_messages reducer — new messages are appended to the list rather than replacing it:
# agent/state.py
from langgraph.graph import add_messages
from typing import Annotated
from langchain_core.messages import BaseMessage
class AgentState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
This single field is all that's needed for multi-turn memory. Each conversation starts from DynamoDB history, so follow-up questions have full context of everything said before.
The Three Research Tools
The agent has three tools. Claude autonomously decides which ones to call based on the question.
Tavily — Web Search
Tavily is purpose-built for LLM agents. It returns clean, structured results without the noise of a raw search engine.
# agent/tools.py
from langchain_community.tools.tavily_search import TavilySearchResults
def get_tools() -> list:
web_search = TavilySearchResults(
max_results=5,
description="Search the web for current information, news, and general knowledge."
)
...
Wikipedia — Background Facts
For well-established topics, Wikipedia gives reliable background without burning Tavily quota.
@tool
def wikipedia_search(query: str) -> str:
"""Search Wikipedia for background information on a topic."""
wiki = wikipediaapi.Wikipedia(language="en", user_agent="ai-research-agent/1.0")
page = wiki.page(query)
if not page.exists():
return f"No Wikipedia article found for: {query}"
# Truncate to 3000 chars to stay within Claude's context budget
return page.summary[:3000]
arXiv — Academic Papers
For scientific or technical questions, the agent searches arXiv for peer-reviewed research. This differentiates the output from a standard web search.
@tool
def arxiv_search(query: str) -> str:
"""Search arXiv for academic papers and research on a topic."""
import arxiv
results = arxiv.Search(query=query, max_results=3,
sort_by=arxiv.SortCriterion.Relevance)
papers = []
for r in arxiv.Client().results(results):
papers.append(f"Title: {r.title}\nAuthors: {', '.join(a.name for a in r.authors)}\n"
f"Summary: {r.summary[:500]}\nURL: {r.entry_id}")
return "\n\n---\n\n".join(papers) if papers else "No papers found."
SSE Streaming — Tokens + Live Agent Trace
This is the part that makes the UI feel alive. Instead of waiting 20 seconds for a complete answer, the browser receives two types of events simultaneously:
- Trace events — every tool call the agent makes, shown in a live panel
- Token events — the answer streaming word by word
The SSE event protocol:
data: {"type": "conversation_id", "conversation_id": "abc-123"}
data: {"type": "trace", "step": "tool_start", "tool": "web_search", "input": "LangGraph tutorial"}
data: {"type": "trace", "step": "tool_end", "tool": "web_search", "output": "LangGraph is..."}
data: {"type": "token", "content": "LangGraph is a "}
data: {"type": "token", "content": "framework for building..."}
data: {"type": "done"}
The streaming endpoint uses LangGraph's astream_events — an async generator that yields every event in the agent's execution graph:
# routers/chat.py
async def stream_agent_response(message: str, conversation_id: str):
history = await dynamodb_service.get_messages(conversation_id)
state = {"messages": history + [HumanMessage(content=message)]}
async for event in agent.astream_events(state, version="v2"):
kind = event["event"]
if kind == "on_tool_start":
yield f'data: {{"type":"trace","step":"tool_start","tool":"{event["name"]}","input":"{event["data"].get("input","")[:200]}"}}\n\n'
elif kind == "on_tool_end":
yield f'data: {{"type":"trace","step":"tool_end","tool":"{event["name"]}","output":"{str(event["data"].get("output",""))[:300]}"}}\n\n'
elif kind == "on_chat_model_stream":
content = event["data"]["chunk"].content
if content:
yield f'data: {json.dumps({"type": "token", "content": content})}\n\n'
yield 'data: {"type": "done"}\n\n'
The FastAPI endpoint sets media_type="text/event-stream" and returns a StreamingResponse — no WebSockets needed.
Multi-turn Conversations
Every conversation is stored in DynamoDB. When the user sends a follow-up question, the full history is loaded and passed back into the agent — so it has full context of everything discussed.
DynamoDB Schema
The table uses a single-table design with a GSI for listing conversations by recency:
| Key | Value | Purpose |
|---|---|---|
pk | CONV#{conversation_id} | Partition key |
sk | META or MSG#{timestamp} | Sort key |
entity_type | conversation or message | GSI partition key |
updated_at | ISO timestamp | GSI sort key |
The GSI (entity-type-index) makes it efficient to list all conversations sorted by most recently updated — without scanning the full table.
# services/dynamodb_service.py
async def save_messages(conversation_id: str, messages: list[BaseMessage]) -> None:
with table.batch_writer() as batch:
for i, msg in enumerate(messages):
batch.put_item(Item={
"pk": f"CONV#{conversation_id}",
"sk": f"MSG#{i:06d}",
"entity_type": "message",
"role": msg.type,
"content": msg.content,
"updated_at": datetime.utcnow().isoformat(),
})
API Endpoints
| Method | Endpoint | Description |
|---|---|---|
| GET | /health | Health check |
| POST | /api/v1/chat/stream | Stream agent response + tool trace via SSE |
| GET | /api/v1/conversations | List all conversations |
| GET | /api/v1/conversations/{id} | Get conversation with full message history |
| DELETE | /api/v1/conversations/{id} | Delete a conversation |
Infrastructure with AWS CDK
Two stacks — simpler than the resume analyzer because the frontend is a pure static site with no server-side rendering.
BackendStack
VPC with 2 AZs, 1 NAT Gateway, ECS Fargate service, and ALB. Both API keys are injected from SSM Parameter Store at container startup.
# stacks/backend_stack.py (simplified)
fargate_service = ecs_patterns.ApplicationLoadBalancedFargateService(
self, "BackendService",
cluster=cluster,
cpu=512,
memory_limit_mib=1024,
task_image_options=ecs_patterns.ApplicationLoadBalancedTaskImageOptions(
image=ecs.ContainerImage.from_ecr_repository(ecr_repo),
environment={"DYNAMODB_TABLE": table.table_name},
secrets={
"ANTHROPIC_API_KEY": ecs.Secret.from_ssm_parameter(anthropic_key),
"TAVILY_API_KEY": ecs.Secret.from_ssm_parameter(tavily_key),
},
),
)
# Long agent reasoning chains need more than the default 60s
fargate_service.load_balancer.set_attribute("idle_timeout.timeout_seconds", "120")
FrontendStack
CloudFront with two important timeout settings for SSE:
# stacks/frontend_stack.py (simplified)
api_behaviour = cloudfront.BehaviorOptions(
origin=alb_origin,
viewer_protocol_policy=cloudfront.ViewerProtocolPolicy.REDIRECT_TO_HTTPS,
cache_policy=cloudfront.CachePolicy.CACHING_DISABLED,
origin_request_policy=cloudfront.OriginRequestPolicy.ALL_VIEWER,
# SSE streams can run for 60-120s — increase read timeout
# Default is 30s; streaming connections silently drop without this
)
The CloudFront read timeout (default 30s) must be extended for SSE. Unlike REST endpoints, SSE connections stay open while the agent reasons and writes — they can easily exceed 30 seconds on complex questions.
Testing Strategy
Backend — pytest with mocked astream_events and DynamoDB. No real API calls needed.
cd backend
pytest # all tests
pytest tests/agent/ # ReAct agent + tools
pytest tests/routers/ # API endpoints + SSE format
pytest --cov=app # with coverage report
| Test file | What it covers |
|---|---|
test_state.py | add_messages appends correctly, preserves history |
test_tools.py | Wikipedia found/not found/truncation, arXiv search, get_tools returns 3 tools |
test_chat.py | SSE event format, trace events, token events, done/error, input validation |
test_conversations.py | List/get/delete endpoints, 404 handling |
Frontend — Vitest + Testing Library.
cd frontend
npm test
npm run test:coverage
Lessons Learned
1. ReAct agents decide — you don't have to hardcode logic A fixed pipeline would need explicit code for "if scientific question → use arXiv". The ReAct loop lets Claude make that call based on context. The agent often combines all three tools on a single question.
2. CloudFront read timeout must be increased for SSE The default CloudFront read timeout is 30 seconds. SSE connections for complex research questions run longer. Without setting this to 120s, streams silently drop mid-response — no error, just a frozen browser. This was the hardest bug to diagnose.
3. ALB idle timeout must also be extended Both CloudFront (read timeout) and ALB (idle timeout) need to be set to 120s. Fixing only one still drops long connections. The ALB's default 60s kicks in before CloudFront's if not changed.
4. NAT Gateway is required for private ECS tasks ECS tasks in a private subnet need a NAT Gateway to reach Anthropic, Tavily, Wikipedia, and arXiv. Without it, all external API calls silently time out — no DNS error, just a hanging request.
5. add_messages is all you need for conversation memory
LangGraph's add_messages reducer appends new messages to the state list automatically. Passing the full DynamoDB history into the initial state gives the agent complete context of past turns with zero extra logic.
6. Build Docker images for linux/amd64 on Apple Silicon
ECS Fargate runs on x86 by default. Building on an M-series Mac without specifying the platform creates an arm64 image that won't start on Fargate. Always add --platform linux/amd64 to the Docker build, or set it in the Dockerfile.
7. Two SSM parameters — both injected at container startup This project needs two API keys: Anthropic and Tavily. Both are stored as SSM SecureString and injected as environment variables at container startup. Keys never appear in CDK output, CloudFormation templates, or Docker images.
GitHub
The full source code is available on GitHub:
👉 github.com/sanjaypatoliya/ai-research-agent
About the Author
I'm Sanjay Patoliya — AWS Certified Solutions Architect & DevOps Engineer building production-ready AI systems on AWS. If you're looking to build something similar or need a remote AWS + AI engineer, feel free to reach out.
- Portfolio: sanjaypatoliya.com
- LinkedIn: linkedin.com/in/sanjaykumar-patoliya-b234a287
- Email: sbpatoliya@gmail.com