Agentic Techniques Deep Dive: What I Learned from Reading Claude Code
I spent a few days reading through openclaude — the open-source TypeScript codebase behind Claude Code. My goal was selfish: extract every production-grade agentic technique I could find and understand them well enough to build them in Python.
This post is the result. Nine techniques, Python-ready pseudocode for each, and a build-order recommendation at the end. If you are building AI agents and want to go beyond the basic “call model → get response” loop, this is for you.
1. The Agent Loop (Core of Everything)
Every AI agent reduces to this pattern:
loop:
1. Build context (messages + tools + system prompt)
2. Call model → stream response
3. If model emits tool_use → execute tools in parallel
4. Append tool results to messages
5. Go back to step 2
6. If model emits stop_sequence or terminal condition → break
The key implementation detail: use async generators so every token flows to the UI in real time while the agent continues working.
async def query_loop(messages, tools, model):
while True:
async for event in stream_from_model(messages, tools, model):
yield event # stream tokens to UI in real-time
if event.type == "tool_use":
pending_tools.append(event)
if event.type == "message_stop":
break
if not pending_tools:
break # no tools called = agent is done
tool_results = await execute_tools(pending_tools)
messages.append(AssistantMessage(content=stream_content))
messages.append(UserMessage(tool_results=tool_results))
pending_tools = []
Key insight: The loop only terminates when the model makes a turn without calling any tools. Everything else is just deciding when to terminate and what to yield.
The codebase tracks six terminal conditions:
| Condition | Meaning |
|---|---|
completed |
Normal finish |
max_turns |
Hit turn limit |
tool_failure_loop |
Same tool keeps failing |
prompt_too_long |
Context window exceeded |
model_error |
API error after retries |
blocking_limit |
Permission denied, needs human |
2. Streaming Tool Execution with Concurrency
This is the most underrated technique. Instead of waiting for the full model response before running tools, you can run tools as the model is still streaming:
Model streaming: [text...] [tool_A_start] [text...] [tool_B_start] [stop]
Execution: [tool_A runs immediately] [tool_B runs immediately]
Results: [tool_A done] [tool_B done]
Two categories of tools:
- Concurrency-safe (read-only): Read, Glob, Grep — run in parallel
- Exclusive tools: Bash, Edit, Write — acquire a lock, run one at a time
import asyncio
async def streaming_tool_executor(model_stream, tools):
lock = asyncio.Lock()
pending = []
async for event in model_stream:
if event.type == "tool_use":
tool = get_tool(event.name)
if tool.is_concurrency_safe:
task = asyncio.create_task(tool.execute(event.input))
else:
async def with_lock():
async with lock:
return await tool.execute(event.input)
task = asyncio.create_task(with_lock())
pending.append((event.id, task))
results = []
for tool_id, task in pending:
result = await task
results.append(ToolResult(id=tool_id, content=result))
return results
On a turn that calls 5 read-only tools, you save 4x latency because they all run simultaneously while the model is still writing its response.
3. Tool Failure Loop Guard
A common failure mode: the model keeps calling the same tool that keeps failing. Without a guard, you burn tokens forever.
class ToolFailureLoopGuard:
def __init__(self, max_consecutive=3):
self.consecutive_failures = 0
self.last_failed_tool = None
self.max = max_consecutive
def record(self, tool_name, success):
if not success:
if tool_name == self.last_failed_tool:
self.consecutive_failures += 1
else:
self.consecutive_failures = 1
self.last_failed_tool = tool_name
else:
self.consecutive_failures = 0
self.last_failed_tool = None
def should_abort(self):
return self.consecutive_failures >= self.max
Simple but essential. This small class prevents a stuck agent from running indefinitely and surfaces the real error to the user.
4. Token Budget with Diminishing Returns Detection
When an agent is deep in a long loop, it may keep “continuing” but making less and less progress. This pattern detects that by watching how many new tokens are added each turn:
class TokenBudgetTracker:
THRESHOLD = 0.90 # 90% of context used
MIN_DELTA = 500 # meaningful progress threshold
DIMINISHING_RUNS = 3 # how many bad runs before giving up
def __init__(self, context_window):
self.context_window = context_window
self.continuation_count = 0
self.bad_run_count = 0
def check(self, tokens_used, tokens_added_this_turn):
usage_pct = tokens_used / self.context_window
if usage_pct < self.THRESHOLD:
return {"action": "continue"}
if tokens_added_this_turn < self.MIN_DELTA:
self.bad_run_count += 1
else:
self.bad_run_count = 0
self.continuation_count += 1
if self.bad_run_count >= self.DIMINISHING_RUNS:
return {"action": "stop", "reason": "diminishing_returns"}
return {
"action": "warn_user",
"message": f"Context {usage_pct:.0%} full. Continuing..."
}
5. Context Compaction (Keeping Context Fresh)
When conversation history gets too long, you can’t just truncate — you lose context. The answer is semantic compaction: summarize old messages while keeping recent ones verbatim.
Three modes in the codebase:
| Mode | Trigger | Use case |
|---|---|---|
| Auto-compact | tokens_used > threshold at turn start |
Proactive |
| Reactive-compact | prompt_too_long API error mid-stream |
Recovery |
| Micro-compact | Lightweight, within a single turn | Fine-grained |
async def compact_messages(messages, model, keep_recent_n=10):
if len(messages) <= keep_recent_n:
return messages
old_messages = messages[:-keep_recent_n]
recent_messages = messages[-keep_recent_n:]
summary = await model.complete(
system="You are a context compaction assistant.",
messages=[
*old_messages,
{"role": "user", "content": "Summarize the above conversation "
"preserving all key decisions, facts, and current task state."}
]
)
return [
{"role": "user", "content": f"[Previous context summary]: {summary}"},
*recent_messages
]
The reactive variant is what makes this production-grade: if you get a context_length_exceeded error mid-turn, you compact right then and retry without losing the user’s current request.
6. Prefetch While Streaming (Latency Hiding)
While the model is generating output, you can be doing useful work in the background. The codebase does this for loading relevant memories, discovering skill files, and prefetching MCP resources.
async def query_with_prefetch(messages, tools, model, memory_store):
# Start memory search BEFORE awaiting the model
memory_task = asyncio.create_task(
memory_store.find_relevant(messages[-1])
)
full_response = ""
async for token in model.stream(messages, tools):
full_response += token
yield token # stream to UI immediately
# By the time model finishes, memory search is likely done
relevant_memories = await memory_task
if relevant_memories:
messages.append(attach_memories(relevant_memories))
asyncio.create_task() is your friend here. Fire and forget, collect the result when you need it.
7. Multi-Agent Orchestration
This is what makes it a multi-agent system rather than a single agent. The model can spawn child agents with isolated query loops. There are five sub-patterns worth knowing:
a) Agent routing — different agents on different models
AGENT_ROUTING = {
"Explore": "claude-haiku-4-5", # fast, cheap for search
"Plan": "claude-opus-4-8", # slow, smart for planning
"code-reviewer": "claude-sonnet-4-6",
}
def get_model_for_agent(agent_type, default_model):
return AGENT_ROUTING.get(agent_type, default_model)
b) Worktree isolation — parallel agents that write files don’t conflict
# Each file-writing agent gets its own git worktree
git worktree add /tmp/agent-xyz-worktree HEAD
# Agent runs in isolation; changes merged back or discarded on completion
c) Structured output — force parseable responses from subagents
Instead of parsing free-form text, instruct the subagent to call a StructuredOutput tool with a JSON schema. The schema is validated at the tool-call layer, so the model retries on mismatch automatically.
BUGS_SCHEMA = {
"type": "object",
"properties": {
"bugs": {
"type": "array",
"items": {
"type": "object",
"properties": {
"file": {"type": "string"},
"line": {"type": "integer"},
"description": {"type": "string"},
"severity": {"type": "string", "enum": ["low", "medium", "high"]}
}
}
}
}
}
result = await run_agent(prompt, output_schema=BUGS_SCHEMA)
bugs = result["bugs"] # guaranteed to match schema
d) Background execution — auto-background long-running agents
async def run_agent_with_auto_background(prompt, timeout_s=120):
task = asyncio.create_task(run_agent(prompt))
try:
result = await asyncio.wait_for(asyncio.shield(task), timeout=timeout_s)
return result
except asyncio.TimeoutError:
task_id = generate_id()
background_tasks[task_id] = task
return TaskHandle(id=task_id, status="running")
e) Pipeline orchestration — no barrier between stages
async def pipeline(items, *stages):
"""Run each item through all stages — no barrier between stages.
Item A can be in stage 3 while item B is still in stage 1."""
tasks = [run_item_through_stages(item, stages) for item in items]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r for r in results if not isinstance(r, Exception)]
async def run_item_through_stages(item, stages):
result = item
for stage in stages:
result = await stage(result)
return result
# Review code across multiple dimensions in parallel
DIMENSIONS = ["bugs", "security", "performance"]
results = await pipeline(
DIMENSIONS,
lambda dim: run_agent(f"Find {dim} issues in this PR", schema=FINDINGS_SCHEMA),
lambda findings: run_agent(f"Verify these findings: {findings}", schema=VERDICT_SCHEMA),
)
8. The Hooks System (Extensible Lifecycle Events)
Hooks let external processes intercept agent behavior at lifecycle points without touching the agent code.
UserPromptSubmit → PreToolUse → PostToolUse → Stop → SessionEnd
Each hook is a subprocess: configure a shell command, agent serializes event data as JSON, pipes it to stdin, reads stdout for the response.
import subprocess, json, asyncio
async def fire_hook(hook_type, event_data, registered_hooks):
hooks_for_type = registered_hooks.get(hook_type, [])
results = []
for hook_cmd in hooks_for_type:
proc = await asyncio.create_subprocess_shell(
hook_cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
)
stdout, _ = await proc.communicate(
input=json.dumps(event_data).encode()
)
if stdout:
results.append(json.loads(stdout))
return results
async def can_use_tool(tool_name, tool_input):
results = await fire_hook("PreToolUse", {
"tool_name": tool_name,
"tool_input": tool_input
}, hooks)
for r in results:
if r.get("action") == "block":
return False, r.get("reason")
return True, None
Configuration in settings.json:
{
"hooks": {
"PreToolUse": ["python3 my_security_check.py"],
"PostToolUse": ["./log_tool_usage.sh"],
"Stop": ["python3 notify_slack.py"]
}
}
9. MCP Integration (Model Context Protocol)
MCP is a standard for connecting external tools to agents as a plugin system. Instead of hardcoding tools, you connect to MCP servers that expose tools dynamically.
Agent ←→ MCP Client ←→ MCP Server (subprocess or HTTP)
↓
tools, resources, prompts
| Transport | Description |
|---|---|
stdio |
Subprocess communicating over stdin/stdout (most common) |
sse |
HTTP Server-Sent Events |
http |
HTTP with streaming |
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
async def connect_mcp_server(command, args):
server_params = StdioServerParameters(command=command, args=args)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
# Discover tools dynamically
tools = await session.list_tools()
result = await session.call_tool("my_tool", {"arg": "value"})
return result
Key insight: Tools from MCP servers are surfaced to the model exactly like built-in tools. The model cannot tell the difference. This means you can add capabilities at runtime without changing your agent code.
How It All Fits Together
User Input
│
▼
REPL (terminal UI)
│
▼
query() ─────────────── prefetch memories (asyncio.create_task)
│ │
│ Build context: │ (runs in parallel with model call)
│ system prompt │
│ + memories ◄───────────┘
│ + messages
│
▼
Model API (streaming)
│
├── token ──────────────────► yield to UI (real-time display)
│
├── tool_use ───────────────► StreamingToolExecutor
│ ├── concurrent-safe → asyncio.gather()
│ └── exclusive → serialized with Lock
│
▼
Tool results → append to messages → loop back to model
│
├── ToolFailureLoopGuard checks → abort if stuck
├── TokenBudgetTracker checks → compact or stop if full
│
▼ (no tools called OR terminal condition)
Final response
│
▼
PostTurn hooks → done
What to Build First (Priority Order)
| # | Technique | Python libs | Effort | Impact |
|---|---|---|---|---|
| 1 | Agent loop with streaming | anthropic, asyncio |
Low | High |
| 2 | Parallel tool execution | asyncio.gather() |
Low | High |
| 3 | Tool failure loop guard | Pure Python | Low | Medium |
| 4 | Token budget tracker | Pure Python | Low | Medium |
| 5 | Context compaction | anthropic |
Medium | High |
| 6 | Prefetch while streaming | asyncio.create_task |
Low | Medium |
| 7 | Multi-agent routing | anthropic |
Medium | High |
| 8 | Hooks system | subprocess, asyncio |
Medium | Medium |
| 9 | MCP integration | mcp (official SDK) |
Medium | High |
Recommended build order:
- Phase 1 (single agent): Items 1 + 2 + 3 + 4 — production-quality single agent
- Phase 2 (multi-agent): Add items 5 + 7 — full multi-agent behavior
- Phase 3 (extensible): Add items 6 + 8 + 9 — plugin system and latency optimization
The first four items are low-effort and high-impact. If you only take one thing from this post: implement the streaming tool executor. The latency savings on tool-heavy turns are real and immediately visible.