MCP (Model Context Protocol) is the open standard that lets AI assistants like Claude talk to external tools: databases, file systems, APIs, internal services. But there’s a catch: out of the box, MCP servers run locally on each developer’s machine as a child process.
That’s fine for a single person. For a team, it’s a headache.
Every developer installs and maintains their own copy of every MCP server. Tool updates get rolled out one laptop at a time. Access to internal resources (like your production database or a shared file store) means exposing them to every dev machine. There’s no audit trail, no centralized config, no way to revoke someone’s access without touching their machine.
This post walks through building a central MCP Gateway: one shared service your whole team points Claude Code at. It handles auth, routes requests to the right backend, and keeps your internal resources inside your network.
Everything runs in Docker Compose, so you can spin the whole setup up on any machine with a single command.
How it works Link to heading
The core insight is that MCP supports two transports:
- stdio — the server is a child process; your AI client talks to it over stdin/stdout. Simple, but local-only.
- HTTP + SSE — the server is a remote HTTP service. The client opens a long-lived Server-Sent Events stream, and POSTs JSON-RPC messages to it. This is what enables shared, remote access.
The gateway uses both. It speaks HTTP/SSE to Claude Code clients (remote transport), while maintaining persistent SSE connections to backend MCP servers running inside Docker. When a tool call comes in, the gateway validates the API key, checks permissions, and proxies the call to the right backend.
The MCP servers and the database are on an internal Docker network — not reachable from outside. Only the gateway has a published port.
Project layout Link to heading
mcp-gateway/
├── docker-compose.yml
├── gateway/ ← the Python gateway service
│ ├── Dockerfile
│ ├── requirements.txt
│ ├── config/
│ │ ├── routes.yaml ← maps server names to backend URLs
│ │ └── api-keys.yaml ← maps API keys to users and permissions
│ └── src/
│ ├── main.py ← FastAPI app, SSE + messages endpoints
│ ├── auth.py ← API key validation
│ ├── aggregator.py ← connects to backends, routes tool calls
│ └── config.py ← loads YAML config files
├── servers/
│ ├── everything/ ← demo tools: echo, time, math
│ ├── filesystem/ ← read/write files in a shared /data volume
│ └── postgres/ ← read-only SQL queries against Postgres
├── data/
│ └── sample-files/ ← volume mounted into the filesystem server
└── postgres/
└── init.sql ← seed schema and data
Phase 1 — Docker Compose skeleton Link to heading
The Compose file defines five services and wires them together with health checks. The key rule: the gateway must not start until all three backend servers are healthy, and the postgres server must not start until the database is ready.
# docker-compose.yml
services:
mcp-gateway:
build: ./gateway
ports:
- "8080:8080" # the only published port
environment:
ROUTES_FILE: /app/config/routes.yaml
API_KEYS_FILE: /app/config/api-keys.yaml
volumes:
- ./gateway/config:/app/config:ro
depends_on:
mcp-server-everything:
condition: service_healthy
mcp-server-filesystem:
condition: service_healthy
mcp-server-postgres:
condition: service_healthy
networks:
- mcp-internal
mcp-server-everything:
build: ./servers/everything
networks:
- mcp-internal
healthcheck:
test: ["CMD", "curl", "-sf", "http://localhost:8000/health"]
interval: 5s
retries: 5
start_period: 10s
mcp-server-filesystem:
build: ./servers/filesystem
volumes:
- ./data:/data # shared file storage
networks:
- mcp-internal
healthcheck:
test: ["CMD", "curl", "-sf", "http://localhost:8000/health"]
interval: 5s
retries: 5
start_period: 10s
mcp-server-postgres:
build: ./servers/postgres
environment:
DATABASE_URL: postgresql://lab:lab@postgres:5432/labdb
depends_on:
postgres:
condition: service_healthy
networks:
- mcp-internal
healthcheck:
test: ["CMD", "curl", "-sf", "http://localhost:8000/health"]
interval: 5s
retries: 5
start_period: 15s
postgres:
image: postgres:16
environment:
POSTGRES_USER: lab
POSTGRES_PASSWORD: lab
POSTGRES_DB: labdb
volumes:
- ./postgres/init.sql:/docker-entrypoint-initdb.d/init.sql:ro
networks:
- mcp-internal
healthcheck:
test: ["CMD-SHELL", "pg_isready -U lab -d labdb"]
interval: 5s
retries: 10
networks:
mcp-internal:
driver: bridge
Notice the networks block: all five services live on mcp-internal, an internal bridge
network. Only mcp-gateway has a ports entry, so only it is reachable from your machine
or the internet. The MCP servers and Postgres are invisible from outside Docker.
Phase 2 — Backend MCP servers Link to heading
Each backend is a standalone Python MCP server that speaks HTTP/SSE. They all follow the same structure: define tools with the MCP SDK, wrap the server in a Starlette app, serve it with uvicorn.
All three share the same requirements.txt and Dockerfile:
# servers/everything|filesystem|postgres/requirements.txt
mcp>=1.0.0
uvicorn[standard]>=0.30.0
starlette>=0.41.0
asyncpg>=0.29.0 # postgres only
# servers/everything|filesystem|postgres/Dockerfile
FROM python:3.12-slim
RUN apt-get update && apt-get install -y curl && rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY server.py .
CMD ["python", "server.py"]
The “everything” server (demo tools) Link to heading
This server exists purely to verify the gateway works. It has three tools: echo,
get_time, and add.
# servers/everything/server.py
import datetime
import mcp.types as types
import uvicorn
from mcp.server import Server
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Mount, Route
server = Server("everything")
@server.list_tools()
async def list_tools() -> list[types.Tool]:
return [
types.Tool(
name="echo",
description="Echo a message back",
inputSchema={
"type": "object",
"properties": {"message": {"type": "string"}},
"required": ["message"],
},
),
types.Tool(
name="get_time",
description="Return the current UTC time",
inputSchema={"type": "object", "properties": {}},
),
types.Tool(
name="add",
description="Add two numbers",
inputSchema={
"type": "object",
"properties": {
"a": {"type": "number"},
"b": {"type": "number"},
},
"required": ["a", "b"],
},
),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[types.TextContent]:
if name == "echo":
return [types.TextContent(type="text", text=arguments["message"])]
if name == "get_time":
now = datetime.datetime.now(datetime.timezone.utc).isoformat()
return [types.TextContent(type="text", text=now)]
if name == "add":
return [types.TextContent(type="text", text=str(arguments["a"] + arguments["b"]))]
raise ValueError(f"Unknown tool: {name}")
# --- HTTP/SSE serving boilerplate (same for every backend) ---
sse = SseServerTransport("/messages/")
async def handle_sse(request: Request) -> None:
async with sse.connect_sse(request.scope, request.receive, request._send) as (r, w):
await server.run(r, w, server.create_initialization_options())
app = Starlette(routes=[
Route("/health", lambda _: JSONResponse({"status": "ok"})),
Route("/sse", handle_sse),
Mount("/messages/", app=sse.handle_post_message),
])
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
Three things to call out:
SseServerTransport("/messages/")— this tells the transport what path to advertise to clients for posting messages. When a client opens the SSE stream, it receives anendpointevent pointing here./healthroute — Docker’sHEALTHCHECKhits this. Keeps it dead simple: if the app is up, return 200.The
handle_ssecoroutine — passes the raw ASGIscope/receive/sendto the transport. This is necessary because the SSE response needs to write directly to the underlying connection, not through FastAPI’s response abstraction.
The filesystem server Link to heading
The filesystem server exposes a shared /data directory with three tools: list_files,
read_file, and write_file. The important detail is path traversal protection — users
must not be able to escape the /data boundary.
# servers/filesystem/server.py (tools only — serving boilerplate is identical)
from pathlib import Path
import mcp.types as types
from mcp.server import Server
DATA_ROOT = Path("/data").resolve()
server = Server("filesystem")
def safe_path(relative: str) -> Path:
path = (DATA_ROOT / relative).resolve()
if not path.is_relative_to(DATA_ROOT):
raise ValueError("Path escapes /data boundary")
return path
@server.list_tools()
async def list_tools() -> list[types.Tool]:
return [
types.Tool(name="list_files", description="List files and directories inside /data",
inputSchema={"type": "object", "properties": {
"path": {"type": "string", "default": "."}}}),
types.Tool(name="read_file", description="Read a text file from /data",
inputSchema={"type": "object", "properties": {
"path": {"type": "string"}}, "required": ["path"]}),
types.Tool(name="write_file", description="Write a text file to /data",
inputSchema={"type": "object", "properties": {
"path": {"type": "string"}, "content": {"type": "string"}},
"required": ["path", "content"]}),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[types.TextContent]:
if name == "list_files":
path = safe_path(arguments.get("path", "."))
entries = sorted(path.iterdir(), key=lambda p: (p.is_file(), p.name))
lines = [f"{'[DIR] ' if e.is_dir() else ' '}{e.name}" for e in entries]
return [types.TextContent(type="text", text="\n".join(lines) or "(empty)")]
if name == "read_file":
path = safe_path(arguments["path"])
return [types.TextContent(type="text", text=path.read_text())]
if name == "write_file":
path = safe_path(arguments["path"])
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(arguments["content"])
return [types.TextContent(type="text", text=f"Wrote to {path.relative_to(DATA_ROOT)}")]
raise ValueError(f"Unknown tool: {name}")
The Postgres server Link to heading
The Postgres server uses asyncpg for async database access and exposes three tools:
list_tables, describe_table, and query. The query tool enforces read-only access
by rejecting anything that doesn’t start with SELECT.
# servers/postgres/server.py (tools only — serving boilerplate is identical)
import os
import asyncpg
import mcp.types as types
from mcp.server import Server
DATABASE_URL = os.environ["DATABASE_URL"]
server = Server("postgres")
@server.list_tools()
async def list_tools() -> list[types.Tool]:
return [
types.Tool(name="list_tables", description="List all tables",
inputSchema={"type": "object", "properties": {}}),
types.Tool(name="describe_table", description="Show columns for a table",
inputSchema={"type": "object", "properties": {
"table": {"type": "string"}}, "required": ["table"]}),
types.Tool(name="query", description="Run a read-only SELECT query",
inputSchema={"type": "object", "properties": {
"sql": {"type": "string"}}, "required": ["sql"]}),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[types.TextContent]:
conn = await asyncpg.connect(DATABASE_URL)
try:
if name == "list_tables":
rows = await conn.fetch(
"SELECT table_name FROM information_schema.tables "
"WHERE table_schema = 'public' ORDER BY table_name")
return [types.TextContent(type="text",
text="\n".join(r["table_name"] for r in rows))]
if name == "describe_table":
rows = await conn.fetch(
"SELECT column_name, data_type, is_nullable "
"FROM information_schema.columns "
"WHERE table_schema='public' AND table_name=$1 "
"ORDER BY ordinal_position", arguments["table"])
lines = [f"{r['column_name']} ({r['data_type']})" for r in rows]
return [types.TextContent(type="text", text="\n".join(lines))]
if name == "query":
sql = arguments["sql"].strip()
if not sql.upper().lstrip("(").startswith("SELECT"):
return [types.TextContent(type="text",
text="Error: only SELECT queries are permitted")]
rows = await conn.fetch(sql)
if not rows:
return [types.TextContent(type="text", text="(no rows)")]
cols = list(rows[0].keys())
header = " | ".join(cols)
body = "\n".join(" | ".join(str(r[c]) for c in cols) for r in rows)
return [types.TextContent(type="text", text=f"{header}\n{'-'*len(header)}\n{body}")]
raise ValueError(f"Unknown tool: {name}")
finally:
await conn.close()
The database is seeded via postgres/init.sql, bind-mounted into the Postgres container at startup. It creates the three tables the smoke tests reference and populates them with sample data:
-- postgres/init.sql
CREATE TABLE employees (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
department VARCHAR(50),
salary NUMERIC(10, 2),
joined_at DATE
);
CREATE TABLE projects (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
status VARCHAR(20) DEFAULT 'active',
team VARCHAR(50),
started_at DATE
);
CREATE TABLE project_members (
project_id INT REFERENCES projects(id),
employee_id INT REFERENCES employees(id),
role VARCHAR(50),
PRIMARY KEY (project_id, employee_id)
);
INSERT INTO employees (name, department, salary, joined_at) VALUES
('Alice Chen', 'Platform', 95000, '2022-03-15'),
('Bob Smith', 'Data', 88000, '2021-07-01'),
('Carol White', 'Platform', 102000, '2020-11-20'),
('David Lee', 'Data', 91000, '2023-01-10'),
('Eve Johnson', 'Security', 97000, '2022-06-05');
INSERT INTO projects (name, status, team, started_at) VALUES
('MCP Gateway', 'active', 'Platform', '2024-01-01'),
('Data Pipeline v2', 'active', 'Data', '2024-03-01'),
('Auth Overhaul', 'planning', 'Security', '2024-06-01');
INSERT INTO project_members (project_id, employee_id, role) VALUES
(1, 1, 'lead'),
(1, 3, 'engineer'),
(2, 2, 'lead'),
(2, 4, 'engineer'),
(3, 5, 'lead');
Phase 3 — The gateway: config and auth Link to heading
The gateway has two YAML files that control everything without touching code.
gateway/config/routes.yaml maps server names to backend URLs:
servers:
everything:
url: http://mcp-server-everything:8000/sse
description: Demo tools — echo, current time, math
filesystem:
url: http://mcp-server-filesystem:8000/sse
description: Shared file storage mounted at /data
postgres:
url: http://mcp-server-postgres:8000/sse
description: Company database — read-only SELECT queries
gateway/config/api-keys.yaml maps API keys to users and their allowed servers:
keys:
- key: alice-key-abc123
user: alice
team: platform
allowed_servers: [everything, filesystem, postgres]
- key: bob-key-xyz789
user: bob
team: data
allowed_servers: [postgres, everything]
Bob cannot access the filesystem tools. If he tries, the gateway rejects the call — the backend server never sees the request.
The auth middleware is intentionally thin:
# gateway/src/auth.py
from fastapi import HTTPException, Request
def authenticate(request: Request, api_keys: dict[str, dict]) -> dict:
# Accept key from Authorization header or ?api_key= query param.
# Query param support is needed because some Claude Code versions don't
# forward custom headers to SSE MCP servers.
key = request.query_params.get("api_key", "")
if not key:
header = request.headers.get("Authorization", "")
if header.startswith("Bearer "):
key = header.removeprefix("Bearer ").strip()
if not key:
raise HTTPException(status_code=401, detail="Missing API key")
user = api_keys.get(key)
if not user:
raise HTTPException(status_code=401, detail="Invalid API key")
return user
The config loader reads both files at startup and builds a lookup dict:
# gateway/src/config.py
import os, yaml
def load_routes() -> dict:
path = os.getenv("ROUTES_FILE", "/app/config/routes.yaml")
with open(path) as f:
return yaml.safe_load(f)["servers"]
def load_api_keys() -> dict[str, dict]:
path = os.getenv("API_KEYS_FILE", "/app/config/api-keys.yaml")
with open(path) as f:
entries = yaml.safe_load(f)["keys"]
return {entry["key"]: entry for entry in entries}
Phase 4 — The gateway: tool aggregation Link to heading
This is the heart of the gateway. The Aggregator class connects to every backend over
SSE at startup and holds those connections open for the lifetime of the process.
# gateway/src/aggregator.py
from contextlib import AsyncExitStack
import mcp.types as types
from mcp import ClientSession
from mcp.client.sse import sse_client
class Aggregator:
def __init__(self, routes: dict) -> None:
self.routes = routes
self.sessions: dict[str, ClientSession] = {}
self.tool_server_map: dict[str, str] = {}
self._stack = AsyncExitStack()
async def start(self) -> None:
await self._stack.__aenter__()
for name, cfg in self.routes.items():
# Open an SSE connection to the backend and keep it alive
read, write = await self._stack.enter_async_context(
sse_client(cfg["url"])
)
session = await self._stack.enter_async_context(
ClientSession(read, write)
)
await session.initialize()
self.sessions[name] = session
# Build a tool → server reverse map for routing
result = await session.list_tools()
for tool in result.tools:
self.tool_server_map[tool.name] = name
async def stop(self) -> None:
await self._stack.aclose()
async def list_tools(self, allowed: list[str]) -> list[types.Tool]:
tools = []
for name, session in self.sessions.items():
if name in allowed: # per-user filtering
result = await session.list_tools()
tools.extend(result.tools)
return tools
async def call_tool(self, tool_name: str, arguments: dict, allowed: list[str]):
server_name = self.tool_server_map.get(tool_name)
if not server_name:
raise ValueError(f"Unknown tool: {tool_name}")
if server_name not in allowed:
raise PermissionError(f"Access denied to tool: {tool_name}")
result = await self.sessions[server_name].call_tool(tool_name, arguments)
return result.content
The AsyncExitStack is a clean way to manage multiple async context managers that need
to stay open indefinitely. Each enter_async_context call stacks another context on top;
when aclose() is called on shutdown, they all close in reverse order.
The tool_server_map is built once at startup: it records which backend owns each tool
name. When a tool call comes in, the gateway looks up the owner and routes there without
scanning every backend.
Phase 5 — The gateway: FastAPI app Link to heading
The main app wires everything together. Two endpoints handle the MCP protocol; one handles health checks.
# gateway/src/main.py
from contextlib import asynccontextmanager
from contextvars import ContextVar
from typing import Any
import mcp.types as types
from fastapi import FastAPI, Request
from mcp.server import Server
from mcp.server.sse import SseServerTransport
from .aggregator import Aggregator
from .auth import authenticate
from .config import load_api_keys, load_routes
_current_user: ContextVar[dict] = ContextVar("current_user")
routes = load_routes()
api_keys = load_api_keys()
aggregator = Aggregator(routes)
mcp_server = Server("mcp-gateway")
sse_transport = SseServerTransport("/messages/")
@mcp_server.list_tools()
async def list_tools() -> list[types.Tool]:
user = _current_user.get()
return await aggregator.list_tools(user["allowed_servers"])
@mcp_server.call_tool()
async def call_tool(name: str, arguments: dict | None) -> list[Any]:
user = _current_user.get()
return await aggregator.call_tool(name, arguments or {}, user["allowed_servers"])
@asynccontextmanager
async def lifespan(app: FastAPI):
await aggregator.start() # connect to all backends at startup
yield
await aggregator.stop() # cleanly close all SSE connections on shutdown
app = FastAPI(lifespan=lifespan, redirect_slashes=False)
@app.get("/sse")
async def handle_sse(request: Request) -> None:
user = authenticate(request, api_keys) # 401 if key is bad
_current_user.set(user) # store user in async context
async with sse_transport.connect_sse(
request.scope, request.receive, request._send
) as (read, write):
await mcp_server.run(read, write, mcp_server.create_initialization_options())
@app.post("/messages/")
async def handle_messages(request: Request) -> None:
await sse_transport.handle_post_message(
request.scope, request.receive, request._send
)
@app.get("/health")
async def health() -> dict:
return {"status": "ok", "backends": list(aggregator.sessions.keys())}
The _current_user context variable is the key to per-user authorization. Python’s
contextvars are async-aware — each async task has its own copy. When an SSE connection
is opened, _current_user.set(user) writes into the context of that specific connection’s
coroutine. When list_tools or call_tool later runs in the same coroutine chain, it
reads the same value.
This means user A and user B can be connected simultaneously, and their tool lists are filtered independently — no shared mutable state, no locks.
The gateway’s Dockerfile and requirements.txt differ from the backends. It uses FastAPI
instead of Starlette, adds PyYAML for config loading, and copies src/ rather than a single
file:
# gateway/Dockerfile
FROM python:3.12-slim
RUN apt-get update && apt-get install -y curl && rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY src/ src/
CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8080"]
# gateway/requirements.txt
mcp>=1.0.0
fastapi>=0.115.0
uvicorn[standard]>=0.30.0
pyyaml>=6.0.0
Phase 6 — Running and connecting Claude Code Link to heading
Start everything:
docker compose up --build
Verify the gateway is up:
curl http://localhost:8080/health
# {"status":"ok","backends":["everything","filesystem","postgres"]}
Connecting Claude Code Link to heading
Add Alice’s config to your Claude Code MCP settings (~/.claude.json or the project-level .mcp.json):
{
"mcpServers": {
"company": {
"type": "sse",
"url": "http://localhost:8080/sse?api_key=alice-key-abc123"
}
}
}
That’s it. One URL, one API key. Claude Code now has access to all nine tools across the three backend servers — filtered by what Alice’s key is allowed to use.
To simulate a second team member (Bob, who can’t access filesystem tools), use his key:
{
"mcpServers": {
"company": {
"type": "sse",
"url": "http://localhost:8080/sse?api_key=bob-key-xyz789"
}
}
}
Bob’s Claude session will see list_tables, describe_table, query, echo, get_time,
and add — but not list_files, read_file, or write_file.
Quick smoke tests Link to heading
Alice’s session ask Claude:
❯ list the tables in the database
Called company (ctrl+o to expand)
⏺ The database has 3 tables:
- employees
- project_members
- projects
❯ describe the employees table
Called company (ctrl+o to expand)
⏺ The employees table has 5 columns:
┌────────────┬─────────┬──────────┐
│ Column │ Type │ Nullable │
├────────────┼─────────┼──────────┤
│ id │ integer │ not null │
├────────────┼─────────┼──────────┤
│ name │ varchar │ not null │
├────────────┼─────────┼──────────┤
│ department │ varchar │ nullable │
├────────────┼─────────┼──────────┤
│ salary │ numeric │ nullable │
├────────────┼─────────┼──────────┤
│ joined_at │ date │ nullable │
└────────────┴─────────┴──────────┘
❯ read the file sample-files/readme.txt
Called company (ctrl+o to expand)
⏺ Here's the contents of sample-files/readme.txt:
It's a lab overview file describing the shared file storage. Key notes:
- The directory is mounted at /data in the filesystem MCP server
- Also contains a report.csv with sample regional sales data
- Available tools: list_files, read_file, and write_file
❯ write 'hello from the lab' to sample-files/notes.txt
Called company (ctrl+o to expand)
⏺ Done. Written 18 bytes to sample-files/notes.txt.
Bob’s session ask Claude:
❯ read the file sample-files/readme.txt
Read 1 file (ctrl+o to expand)
⏺ The file sample-files/readme.txt doesn't exist in /Users/adamvu/delete-me. Let me check what's in the directory.
Listed 1 directory (ctrl+o to expand)
⏺ The directory appears to be empty — there are no files or subdirectories present, including sample-files/readme.txt.
Note that Bob doesn’t have permission to read the file
Going further Link to heading
This lab uses static API keys in YAML. A production version would swap that for OAuth or company’s SSO — the auth middleware is the only thing that changes.
Adding a new MCP server to the fleet is two steps:
- Add a service to
docker-compose.yml. - Add an entry to
routes.yaml.
No gateway code changes. No developer machine changes. The next time users reconnect, they see the new tools.