Documentation Index
Fetch the complete documentation index at: https://mintlify.com/bluesky-social/atproto/llms.txt
Use this file to discover all available pages before exploring further.
@atproto/xrpc-server
TypeScript library for implementing atproto HTTP API services with Lexicon schema validation. Provides a robust framework for building XRPC servers with authentication, rate limiting, streaming, and error handling.
Installation
npm install @atproto/xrpc-server
Overview
The @atproto/xrpc-server package provides:
- XRPC method handlers (queries and procedures)
- WebSocket subscription support
- Lexicon schema validation
- Authentication middleware
- Rate limiting (memory and Redis)
- Request/response streaming
- Error handling and XRPC error types
- Express.js integration
Quick Start
import { LexiconDoc } from '@atproto/lexicon'
import * as xrpc from '@atproto/xrpc-server'
import express from 'express'
const lexicons: LexiconDoc[] = [
{
lexicon: 1,
id: 'io.example.ping',
defs: {
main: {
type: 'query',
parameters: {
type: 'params',
properties: { message: { type: 'string' } },
},
output: {
encoding: 'application/json',
},
},
},
},
]
// Create XRPC server
const server = xrpc.createServer(lexicons)
// Add method handler
function ping(ctx: xrpc.HandlerContext) {
return {
encoding: 'application/json',
body: { message: ctx.params.message },
}
}
server.method('io.example.ping', ping)
// Mount in Express
const app = express()
app.use(server.router)
app.listen(8080)
Main Exports
createServer
Creates a new XRPC server instance.
import { createServer } from '@atproto/xrpc-server'
const server = createServer(lexicons, options)
Options
interface Options {
validateResponse?: boolean
catchall?: CatchallHandler
payload?: RouteOptions
rateLimits?: RateLimitsConfig
errorParser?: (err: unknown) => XRPCError
}
validateResponse: Enable response validation (default: true)
catchall: Custom catchall handler for unimplemented methods
payload: Default payload size limits
interface RouteOptions {
blobLimit?: number // Max blob size
jsonLimit?: number // Max JSON size
textLimit?: number // Max text size
}
rateLimits: Rate limiting configuration
interface RateLimitsConfig {
creator: RateLimiterCreator
global?: ServerRateLimitDescription[]
shared?: ServerRateLimitDescription[]
bypass?: (ctx: HandlerContext) => boolean
}
errorParser: Custom error conversion to XRPCError
Server Class
The main server class.
class Server {
router: Express
lex: Lexicons
// Add Lexicon schemas
addLexicon(doc: LexiconDoc): void
addLexicons(docs: LexiconDoc[]): void
// Add method handlers
method(nsid: string, handler: MethodHandler): void
add(schema: LexiconMethod, config: MethodConfig): void
// Add streaming handlers
streamMethod(nsid: string, handler: StreamHandler): void
// Start listening
listen(port: number, callback?: () => void): http.Server
}
Method Handlers
Handler Context
All handlers receive a context object:
interface HandlerContext<
A extends Auth = Auth,
P extends Params = Params,
I extends Input = Input,
> {
auth: A // Authentication result
params: P // Query parameters
input: I // Request body
req: express.Request // Express request
res: express.Response // Express response
resetRouteRateLimits: () => Promise<void>
}
Query Handler
Handle GET requests:
server.method('com.example.getRecord', async (ctx) => {
const { did, collection, rkey } = ctx.params
// Fetch record
const record = await db.getRecord(did, collection, rkey)
return {
encoding: 'application/json',
body: { record },
}
})
Procedure Handler
Handle POST requests with input:
server.method('com.example.createRecord', async (ctx) => {
const { repo, collection } = ctx.params
const { record } = ctx.input.body
// Validate authentication
if (!ctx.auth) {
throw new AuthRequiredError()
}
// Create record
const uri = await db.createRecord(repo, collection, record)
return {
encoding: 'application/json',
body: { uri },
}
})
Handler Response Types
Success Response
interface HandlerSuccess {
encoding: string
body: unknown | Readable
headers?: Record<string, string>
}
return {
encoding: 'application/json',
body: { data: 'value' },
headers: { 'Cache-Control': 'max-age=60' },
}
Stream Response
import { Readable } from 'stream'
return {
encoding: 'application/json',
stream: Readable.from(asyncGenerator()),
headers: { 'Content-Type': 'application/json' },
}
Buffer Response
return {
encoding: 'image/jpeg',
buffer: imageBuffer,
headers: { 'Cache-Control': 'public, max-age=31536000' },
}
Authentication
Auth Verifier
Add authentication to methods:
import { AuthRequiredError } from '@atproto/xrpc-server'
interface AuthResult {
credentials: {
did: string
scope: string
}
}
async function authVerifier(ctx: {
req: express.Request
params: Params
}): Promise<AuthResult> {
const token = ctx.req.headers.authorization?.replace('Bearer ', '')
if (!token) {
throw new AuthRequiredError('Missing token')
}
const credentials = await verifyToken(token)
return { credentials }
}
server.method(
'com.example.protected',
{
auth: authVerifier,
handler: async (ctx) => {
const did = ctx.auth.credentials.did
// Handle authenticated request
return { encoding: 'application/json', body: { did } }
},
}
)
Auth Context Types
type Auth = void | AuthResult
interface AuthResult {
credentials: unknown
artifacts?: unknown
}
type AuthVerifier<C, A extends AuthResult> =
| ((ctx: C) => Promise<A | ErrorResult>)
| ((ctx: C) => Promise<A>)
Rate Limiting
Memory Rate Limiter
In-memory rate limiting:
import { MemoryRateLimiter, MINUTE } from '@atproto/xrpc-server'
const server = createServer(lexicons, {
rateLimits: {
creator: (opts) => new MemoryRateLimiter(opts),
global: [
{
name: 'global-ip',
durationMs: 5 * MINUTE,
points: 3000,
},
],
},
})
Redis Rate Limiter
Distributed rate limiting:
import { RedisRateLimiter } from '@atproto/xrpc-server'
import Redis from 'ioredis'
const redis = new Redis()
const server = createServer(lexicons, {
rateLimits: {
creator: (opts) => new RedisRateLimiter(redis, opts),
global: [
{
name: 'global-ip',
durationMs: 5 * MINUTE,
points: 3000,
},
],
},
})
Route-Specific Rate Limits
import { HOUR, DAY } from '@atproto/xrpc-server'
server.method(
'com.example.createPost',
{
handler: async (ctx) => {
// Handler logic
},
rateLimit: [
{
durationMs: HOUR,
points: 100, // 100 requests per hour
},
{
durationMs: DAY,
points: 1000, // 1000 requests per day
},
],
}
)
Shared Rate Limiters
Share rate limits across multiple routes:
const server = createServer(lexicons, {
rateLimits: {
creator: (opts) => new MemoryRateLimiter(opts),
shared: [
{
name: 'repo-write',
durationMs: HOUR,
points: 5000,
},
],
},
})
server.method('com.example.createRecord', {
handler: async (ctx) => { /* ... */ },
rateLimit: { name: 'repo-write' },
})
server.method('com.example.deleteRecord', {
handler: async (ctx) => { /* ... */ },
rateLimit: { name: 'repo-write' },
})
Rate Limit Bypass
const server = createServer(lexicons, {
rateLimits: {
creator: (opts) => new MemoryRateLimiter(opts),
bypass: (ctx) => {
// Bypass for trusted IPs
const trustedIps = ['10.0.0.1', '10.0.0.2']
return trustedIps.includes(ctx.req.ip)
},
},
})
Custom Rate Limit Keys
server.method('com.example.api', {
handler: async (ctx) => { /* ... */ },
rateLimit: {
durationMs: MINUTE,
points: 60,
calcKey: (ctx) => {
// Rate limit by authenticated user
return ctx.auth?.credentials.did ?? ctx.req.ip
},
calcPoints: (ctx) => {
// Premium users get more points
return ctx.auth?.isPremium ? 2 : 1
},
},
})
WebSocket Subscriptions
Handle real-time subscriptions:
server.streamMethod(
'com.example.subscribeRepos',
async function* (ctx) {
const { cursor } = ctx.params
// Stream events
for await (const event of eventStream(cursor)) {
yield {
seq: event.seq,
time: event.time,
repo: event.repo,
ops: event.ops,
}
}
}
)
Stream Context
interface StreamContext<A extends Auth, P extends Params> {
auth: A
params: P
req: IncomingMessage
signal: AbortSignal // Triggered on client disconnect
}
Frame Types
import { MessageFrame, ErrorFrame } from '@atproto/xrpc-server'
// Send message frame
yield MessageFrame.fromLexValue(data, nsid)
// Send error frame
yield ErrorFrame.fromError(new Error('Something went wrong'))
Error Handling
XRPC Errors
Built-in error types:
import {
InvalidRequestError,
AuthRequiredError,
ForbiddenError,
InternalServerError,
UpstreamFailureError,
MethodNotImplementedError,
} from '@atproto/xrpc-server'
// Throw errors from handlers
throw new InvalidRequestError('Invalid parameter value')
throw new AuthRequiredError('Authentication required')
throw new ForbiddenError('Insufficient permissions')
XRPCError Class
import { XRPCError, ResponseType } from '@atproto/xrpc-server'
throw new XRPCError(
ResponseType.InvalidRequest,
'Custom error message',
'CustomErrorName'
)
{
"error": "InvalidRequest",
"message": "Invalid parameter value"
}
Custom Error Parser
const server = createServer(lexicons, {
errorParser: (err) => {
if (err instanceof MyCustomError) {
return new XRPCError(
ResponseType.InvalidRequest,
err.message,
'MyCustomError'
)
}
return XRPCError.fromError(err)
},
})
Lexicon Schema Integration
Type-Safe Handlers with @atproto/lex
Use the @atproto/lex schema types:
import { l } from '@atproto/lex-schema'
const getProfile = l.procedure({
params: l.object({
actor: l.string(),
}),
output: l.object({
did: l.string(),
handle: l.string(),
displayName: l.optional(l.string()),
}),
})
server.add(getProfile, {
handler: async (ctx) => {
// ctx.params is typed as { actor: string }
// return type is validated
return {
encoding: 'application/json',
body: {
did: 'did:plc:abc123',
handle: 'user.bsky.social',
displayName: 'User',
},
}
},
})
Validation
Automatic validation of:
- Query parameters
- Request body (input)
- Response body (output)
- Content types and encodings
Advanced Features
Server Timing
Track performance metrics:
import { ServerTimer } from '@atproto/xrpc-server'
const timer = new ServerTimer()
timer.mark('fetch-data')
// ... fetch data ...
timer.measure('fetch-data')
timer.mark('process-data')
// ... process data ...
timer.measure('process-data')
// Add timing header to response
res.setHeader('Server-Timing', timer.toHeader())
Request Encoding
import { parseReqEncoding } from '@atproto/xrpc-server'
const encoding = parseReqEncoding(req)
// Returns content-type from header
NSID Parsing
import { parseReqNsid } from '@atproto/xrpc-server'
const nsid = parseReqNsid(req)
// Extracts NSID from /xrpc/{nsid} path
Complete Example
import express from 'express'
import {
createServer,
AuthRequiredError,
InvalidRequestError,
MemoryRateLimiter,
MINUTE,
HOUR,
} from '@atproto/xrpc-server'
import { LexiconDoc } from '@atproto/lexicon'
// Define lexicons
const lexicons: LexiconDoc[] = [
// ... lexicon definitions
]
// Authentication
async function authVerifier(ctx) {
const token = ctx.req.headers.authorization?.replace('Bearer ', '')
if (!token) throw new AuthRequiredError()
const did = await verifyToken(token)
return { credentials: { did } }
}
// Create server
const server = createServer(lexicons, {
validateResponse: true,
payload: {
jsonLimit: 100 * 1024,
blobLimit: 5 * 1024 * 1024,
},
rateLimits: {
creator: (opts) => new MemoryRateLimiter(opts),
global: [
{
name: 'global-ip',
durationMs: 5 * MINUTE,
points: 3000,
},
],
},
})
// Add handlers
server.method('com.example.getRecord', async (ctx) => {
const record = await db.getRecord(ctx.params.uri)
return { encoding: 'application/json', body: { record } }
})
server.method('com.example.createRecord', {
auth: authVerifier,
handler: async (ctx) => {
const uri = await db.createRecord(
ctx.auth.credentials.did,
ctx.input.body
)
return { encoding: 'application/json', body: { uri } }
},
rateLimit: {
durationMs: HOUR,
points: 100,
},
})
// Mount in Express
const app = express()
app.use(express.json())
app.use(server.router)
app.listen(3000, () => {
console.log('Server listening on port 3000')
})
Resources