Agent to Agent Communication Protocols, Choosing the Right One
TL;DR — Five options dominate agent-to-agent communication in 2025. Shared state and typed messages cover in-process work, MCP standardizes tool exposure, message brokers handle async durability, and HTTP-with-schemas is the cross-organization fallback.
The question “how do my agents talk to each other?” sounds trivial until you actually have to answer it across a real team. The choices are different at different scales. Two agents in one Python process is a function call. Twenty agents across three services in two languages is a distributed systems problem. The frameworks paper over this, but the abstractions leak the moment you push past their happy path.
This post is the comparison I do on a whiteboard when a team is choosing how their agents will communicate. I’ve used every option below in production. None of them is universally correct, and the wrong one will cost you weeks of debugging or a refactor.
I’ll walk through five protocols, when each fits, and what they look like in working Python. The code targets the March 2025 baselines, Python 3.12, LangGraph 0.2.74, AutoGen 0.4.5, MCP 1.2.0, and Redis 5.
The decision space
There are three independent axes that determine which protocol you want.
Locality. Are the agents in one process, multiple processes on one host, or across machines? Crossing a process boundary forces serialization. Crossing a network boundary adds latency and failure modes.
Coupling. Do the agents know each other by type, by name, or only by capability? Tight coupling lets you push more state across, loose coupling makes the system harder to break but harder to reason about.
Durability. Does a message survive a process restart? If yes, you need persistence. If no, you can keep things in memory.
in-process same host cross-host
+-------------+ +-------------+ +-------------+
ephemeral | shared | | typed | | HTTP |
| state | | messages | | + schema |
+-------------+ +-------------+ +-------------+
durable | checkpointed| | broker | | broker / |
| state | | (Redis) | | event bus |
+-------------+ +-------------+ +-------------+
That grid is the cheat sheet. Pick the cell, then pick the implementation.
1. Shared state, the in-process default
When agents live in one process, shared state via a typed dict is the simplest thing that works. LangGraph, CrewAI, and AutoGen 0.4 all support it. State is a dict, agents are functions that take the dict and return updates.
# langgraph shared state
from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
class State(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
research_notes: str
draft: str
def researcher(state: State) -> dict:
return {"research_notes": "8 facts about postgres 17"}
def writer(state: State) -> dict:
return {
"draft": f"Draft using: {state['research_notes']}",
"messages": [AIMessage(content="draft ready")]
}
g = StateGraph(State)
g.add_node("research", researcher)
g.add_node("write", writer)
g.set_entry_point("research")
g.add_edge("research", "write")
g.add_edge("write", END)
app = g.compile()
result = app.invoke({"messages": [HumanMessage(content="postgres 17")], "research_notes": "", "draft": ""})
The contract between agents is the state schema. If writer expects research_notes and researcher doesn’t produce it, you find out at runtime. Use TypedDicts and Pydantic models to make this a development-time error instead.
When shared state breaks
Two agents trying to update the same key concurrently. The framework either picks one winner silently or merges via a reducer. If you don’t specify a reducer, expect to lose data. LangGraph’s add_messages is the canonical example of a safe reducer, append-only semantics. For numeric counters, write operator.add.
from typing import Annotated
import operator
class CounterState(TypedDict):
count: Annotated[int, operator.add]
2. Typed messages between actors
When agents need to talk past a graph or operate concurrently with their own lifecycles, the actor model gives you typed message passing. This is AutoGen 0.4’s primitive, and you can implement it with asyncio.Queue or any actor library.
# autogen-core, typed messages
import asyncio
from dataclasses import dataclass
from autogen_core import (
AgentId, MessageContext, RoutedAgent,
SingleThreadedAgentRuntime, message_handler,
)
@dataclass
class ResearchRequest:
topic: str
request_id: str
@dataclass
class ResearchResult:
request_id: str
facts: list[str]
class Researcher(RoutedAgent):
def __init__(self):
super().__init__("researcher")
@message_handler
async def on_request(self, msg: ResearchRequest, ctx: MessageContext) -> ResearchResult:
return ResearchResult(request_id=msg.request_id, facts=[f"about {msg.topic}"])
async def main():
rt = SingleThreadedAgentRuntime()
await Researcher.register(rt, "researcher", lambda: Researcher())
rt.start()
result = await rt.send_message(
ResearchRequest(topic="postgres", request_id="r1"),
AgentId("researcher", "default")
)
print(result)
await rt.stop()
asyncio.run(main())
The contract here is the dataclass. Adding a field is backwards compatible if the receiver tolerates missing fields, removing a field is a breaking change. Versioning lives in the type system, not in the message body.
I use this when I have long-lived agents that handle multiple concurrent requests, where treating each as a queue consumer is cleaner than threading state through a graph.
3. MCP, the standard for tool exposure
If the communication is “agent calls a tool exposed by some external service”, MCP is now the right answer. It separates protocol from framework, so an MCP server you write in Python can be consumed by a LangGraph agent, a Claude desktop session, and a Cursor IDE simultaneously.
# mcp server, the tool side
from mcp.server.fastmcp import FastMCP
import httpx
mcp = FastMCP("invoices")
@mcp.tool()
async def get_invoice(invoice_id: str) -> dict:
"""Return invoice by ID with amount and status."""
async with httpx.AsyncClient() as c:
r = await c.get(f"https://api.example.com/invoices/{invoice_id}")
return r.json()
if __name__ == "__main__":
mcp.run(transport="sse", port=8765)
# consuming from any python client
from mcp import ClientSession
from mcp.client.sse import sse_client
async def use_invoice_tool():
async with sse_client("http://localhost:8765/sse") as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
tools = await session.list_tools()
result = await session.call_tool("get_invoice", {"invoice_id": "1234"})
return result
I covered MCP in depth in the MCP server tutorial. The short version, if it’s a tool, expose it via MCP. The integration cost is small and the optionality is large.
4. Message brokers for async durability
When work must survive a process restart, when you need horizontal scaling of workers, or when agents are in different services, you need a broker. Redis Streams is my default because the operational overhead is low and it’s good enough until you genuinely need Kafka.
# producer side
import json
import redis
r = redis.Redis(decode_responses=True)
def enqueue_research(topic: str, trace_id: str):
r.xadd("tasks.research", {
"data": json.dumps({"topic": topic, "trace_id": trace_id})
})
# consumer side, a research worker
import json
import time
import redis
from openai import OpenAI
r = redis.Redis(decode_responses=True)
client = OpenAI()
GROUP = "researchers"
CONSUMER = f"worker-{int(time.time())}"
def ensure_group():
try:
r.xgroup_create("tasks.research", GROUP, id="0", mkstream=True)
except redis.ResponseError:
pass
def consume_loop():
ensure_group()
while True:
msgs = r.xreadgroup(GROUP, CONSUMER, {"tasks.research": ">"}, count=1, block=5000)
if not msgs:
continue
for _, entries in msgs:
for msg_id, fields in entries:
payload = json.loads(fields["data"])
resp = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": f"Research: {payload['topic']}"}]
)
r.xadd("tasks.write", {"data": json.dumps({
"trace_id": payload["trace_id"],
"facts": resp.choices[0].message.content,
})})
r.xack("tasks.research", GROUP, msg_id)
if __name__ == "__main__":
consume_loop()
Consumer groups give you at-least-once delivery and horizontal scaling. Add a worker by starting another process pointing at the same group. Redis tracks who got what.
For ordering guarantees beyond a single stream, or for retention longer than days, move to Kafka. The protocol shape stays the same, you just trade operational complexity for throughput.
The schema problem
Brokers don’t enforce message schemas. You will eventually ship a producer that adds a field, a consumer that doesn’t handle it, and a 3am page. Two ways to handle this.
Schema registry, Confluent’s or Apicurio’s. Heavyweight but bulletproof. Producers register schemas, consumers fetch them, you reject incompatible messages at the broker.
Lightweight, just put a schema_version field in every message, and require consumers to handle the versions they care about. I’ve shipped both. Lightweight is fine for under five services. Beyond that, the registry pays for itself.
5. HTTP with schemas, the cross-org fallback
When agents live in different organizations or technologies, HTTP with a documented schema is the protocol of last resort. It’s painful for chat-style turn-taking but fine for request-response.
# fastapi service exposing an agent capability
from fastapi import FastAPI
from pydantic import BaseModel
api = FastAPI()
class ResearchRequest(BaseModel):
topic: str
max_facts: int = 10
class ResearchResponse(BaseModel):
topic: str
facts: list[str]
@api.post("/v1/research", response_model=ResearchResponse)
async def research(req: ResearchRequest) -> ResearchResponse:
# call your model, return facts
return ResearchResponse(topic=req.topic, facts=[f"fact about {req.topic}"])
Pair it with an OpenAPI spec, version your endpoints under /v1, /v2, and you have a contract any HTTP client can speak. The downside is latency, every interaction is a full request-response cycle. Don’t use HTTP for high-frequency intra-system agent chat, use it for the boundaries.
For streaming agent output across HTTP, SSE is the answer, as I covered in the LangGraph production tutorial.
How to pick
A short flowchart.
Same process, ephemeral work -> shared state (graph framework)
Same process, long-lived agents -> typed messages (actor model)
Cross-service, sync, internal -> typed messages over RPC or gRPC
Cross-service, async, internal -> message broker (Redis/Kafka)
Exposing tools to multiple LLMs -> MCP
Cross-organization -> HTTP + OpenAPI
I’d add one heuristic. Default to the simplest cell that meets your durability requirement. The temptation to start with Kafka is real and almost always wrong. Build with shared state, move to a broker when you hit a real reason, not a hypothetical one.
Common Pitfalls
The recurring ones.
- Treating shared state as a database. State is for the current run’s data. Long-term knowledge belongs in a real store, Postgres or a vector DB. Stuff it all into agent state and your context window dies.
- No schema discipline on broker messages. Ship without versioning and the first breaking change is a production incident. Add
schema_versionfrom day one. - Using MCP for things that aren’t tools. MCP isn’t a general agent-to-agent bus. It’s for exposing capabilities to a model. Don’t use it for fan-out work distribution, use a broker.
- HTTP between agents in the same process. I’ve seen teams write FastAPI services for two agents that share a Python file. The latency tax is real and unjustified. Use function calls or shared state.
Troubleshooting
Three real failure modes.
Messages get duplicated in the broker setup. Consumer group ack is missing or arrives after retry. Check XACK is called in your finally block, and that your consumer can handle a message twice idempotently. Idempotency keys on the producer side help here.
Two agents in shared state see each other’s stale data. Concurrent updates with no reducer. Either serialize the work, or add explicit reducers for every contested key. LangGraph will tell you about reducer mismatches if you turn on STRICT mode.
MCP tool call returns slowly only over HTTP transport. Stdio is faster because it skips network. Profile the server, but the usual cause is the HTTP transport doing connection setup per call. Use persistent SSE connections, not request-per-call.
Wrapping Up
The protocol you pick is more about coupling and durability than about taste. Start with the tightest, simplest fit, shared state in one process, and only step outward when you have a concrete reason. Each step outward, to typed actors, to a broker, to HTTP, adds operational surface area you’ll pay for in oncall rotations.
MCP is the new entry in the toolkit and deserves its place. For tool exposure across multiple frameworks and clients, nothing else gives you the same reach for the same effort. For everything else, the patterns above haven’t changed much from how we build distributed systems generally, just adapted to LLM-shaped failure modes.
The MCP specification is the best reference for the standardized protocol. For broker patterns, the Redis Streams docs and the Confluent design pages are both worth your time before committing to either.