Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/bluesky-social/atproto/llms.txt

Use this file to discover all available pages before exploring further.

@atproto/xrpc-server

TypeScript library for implementing atproto HTTP API services with Lexicon schema validation. Provides a robust framework for building XRPC servers with authentication, rate limiting, streaming, and error handling.

Installation

npm install @atproto/xrpc-server

Overview

The @atproto/xrpc-server package provides:
  • XRPC method handlers (queries and procedures)
  • WebSocket subscription support
  • Lexicon schema validation
  • Authentication middleware
  • Rate limiting (memory and Redis)
  • Request/response streaming
  • Error handling and XRPC error types
  • Express.js integration

Quick Start

import { LexiconDoc } from '@atproto/lexicon'
import * as xrpc from '@atproto/xrpc-server'
import express from 'express'

const lexicons: LexiconDoc[] = [
  {
    lexicon: 1,
    id: 'io.example.ping',
    defs: {
      main: {
        type: 'query',
        parameters: {
          type: 'params',
          properties: { message: { type: 'string' } },
        },
        output: {
          encoding: 'application/json',
        },
      },
    },
  },
]

// Create XRPC server
const server = xrpc.createServer(lexicons)

// Add method handler
function ping(ctx: xrpc.HandlerContext) {
  return {
    encoding: 'application/json',
    body: { message: ctx.params.message },
  }
}

server.method('io.example.ping', ping)

// Mount in Express
const app = express()
app.use(server.router)
app.listen(8080)

Main Exports

createServer

Creates a new XRPC server instance.
import { createServer } from '@atproto/xrpc-server'

const server = createServer(lexicons, options)

Options

interface Options {
  validateResponse?: boolean
  catchall?: CatchallHandler
  payload?: RouteOptions
  rateLimits?: RateLimitsConfig
  errorParser?: (err: unknown) => XRPCError
}
validateResponse: Enable response validation (default: true) catchall: Custom catchall handler for unimplemented methods payload: Default payload size limits
interface RouteOptions {
  blobLimit?: number   // Max blob size
  jsonLimit?: number   // Max JSON size
  textLimit?: number   // Max text size
}
rateLimits: Rate limiting configuration
interface RateLimitsConfig {
  creator: RateLimiterCreator
  global?: ServerRateLimitDescription[]
  shared?: ServerRateLimitDescription[]
  bypass?: (ctx: HandlerContext) => boolean
}
errorParser: Custom error conversion to XRPCError

Server Class

The main server class.
class Server {
  router: Express
  lex: Lexicons

  // Add Lexicon schemas
  addLexicon(doc: LexiconDoc): void
  addLexicons(docs: LexiconDoc[]): void

  // Add method handlers
  method(nsid: string, handler: MethodHandler): void
  add(schema: LexiconMethod, config: MethodConfig): void

  // Add streaming handlers
  streamMethod(nsid: string, handler: StreamHandler): void

  // Start listening
  listen(port: number, callback?: () => void): http.Server
}

Method Handlers

Handler Context

All handlers receive a context object:
interface HandlerContext<
  A extends Auth = Auth,
  P extends Params = Params,
  I extends Input = Input,
> {
  auth: A                          // Authentication result
  params: P                        // Query parameters
  input: I                         // Request body
  req: express.Request             // Express request
  res: express.Response            // Express response
  resetRouteRateLimits: () => Promise<void>
}

Query Handler

Handle GET requests:
server.method('com.example.getRecord', async (ctx) => {
  const { did, collection, rkey } = ctx.params

  // Fetch record
  const record = await db.getRecord(did, collection, rkey)

  return {
    encoding: 'application/json',
    body: { record },
  }
})

Procedure Handler

Handle POST requests with input:
server.method('com.example.createRecord', async (ctx) => {
  const { repo, collection } = ctx.params
  const { record } = ctx.input.body

  // Validate authentication
  if (!ctx.auth) {
    throw new AuthRequiredError()
  }

  // Create record
  const uri = await db.createRecord(repo, collection, record)

  return {
    encoding: 'application/json',
    body: { uri },
  }
})

Handler Response Types

Success Response

interface HandlerSuccess {
  encoding: string
  body: unknown | Readable
  headers?: Record<string, string>
}

return {
  encoding: 'application/json',
  body: { data: 'value' },
  headers: { 'Cache-Control': 'max-age=60' },
}

Stream Response

import { Readable } from 'stream'

return {
  encoding: 'application/json',
  stream: Readable.from(asyncGenerator()),
  headers: { 'Content-Type': 'application/json' },
}

Buffer Response

return {
  encoding: 'image/jpeg',
  buffer: imageBuffer,
  headers: { 'Cache-Control': 'public, max-age=31536000' },
}

Authentication

Auth Verifier

Add authentication to methods:
import { AuthRequiredError } from '@atproto/xrpc-server'

interface AuthResult {
  credentials: {
    did: string
    scope: string
  }
}

async function authVerifier(ctx: {
  req: express.Request
  params: Params
}): Promise<AuthResult> {
  const token = ctx.req.headers.authorization?.replace('Bearer ', '')

  if (!token) {
    throw new AuthRequiredError('Missing token')
  }

  const credentials = await verifyToken(token)

  return { credentials }
}

server.method(
  'com.example.protected',
  {
    auth: authVerifier,
    handler: async (ctx) => {
      const did = ctx.auth.credentials.did
      // Handle authenticated request
      return { encoding: 'application/json', body: { did } }
    },
  }
)

Auth Context Types

type Auth = void | AuthResult

interface AuthResult {
  credentials: unknown
  artifacts?: unknown
}

type AuthVerifier<C, A extends AuthResult> =
  | ((ctx: C) => Promise<A | ErrorResult>)
  | ((ctx: C) => Promise<A>)

Rate Limiting

Memory Rate Limiter

In-memory rate limiting:
import { MemoryRateLimiter, MINUTE } from '@atproto/xrpc-server'

const server = createServer(lexicons, {
  rateLimits: {
    creator: (opts) => new MemoryRateLimiter(opts),
    global: [
      {
        name: 'global-ip',
        durationMs: 5 * MINUTE,
        points: 3000,
      },
    ],
  },
})

Redis Rate Limiter

Distributed rate limiting:
import { RedisRateLimiter } from '@atproto/xrpc-server'
import Redis from 'ioredis'

const redis = new Redis()

const server = createServer(lexicons, {
  rateLimits: {
    creator: (opts) => new RedisRateLimiter(redis, opts),
    global: [
      {
        name: 'global-ip',
        durationMs: 5 * MINUTE,
        points: 3000,
      },
    ],
  },
})

Route-Specific Rate Limits

import { HOUR, DAY } from '@atproto/xrpc-server'

server.method(
  'com.example.createPost',
  {
    handler: async (ctx) => {
      // Handler logic
    },
    rateLimit: [
      {
        durationMs: HOUR,
        points: 100,  // 100 requests per hour
      },
      {
        durationMs: DAY,
        points: 1000, // 1000 requests per day
      },
    ],
  }
)

Shared Rate Limiters

Share rate limits across multiple routes:
const server = createServer(lexicons, {
  rateLimits: {
    creator: (opts) => new MemoryRateLimiter(opts),
    shared: [
      {
        name: 'repo-write',
        durationMs: HOUR,
        points: 5000,
      },
    ],
  },
})

server.method('com.example.createRecord', {
  handler: async (ctx) => { /* ... */ },
  rateLimit: { name: 'repo-write' },
})

server.method('com.example.deleteRecord', {
  handler: async (ctx) => { /* ... */ },
  rateLimit: { name: 'repo-write' },
})

Rate Limit Bypass

const server = createServer(lexicons, {
  rateLimits: {
    creator: (opts) => new MemoryRateLimiter(opts),
    bypass: (ctx) => {
      // Bypass for trusted IPs
      const trustedIps = ['10.0.0.1', '10.0.0.2']
      return trustedIps.includes(ctx.req.ip)
    },
  },
})

Custom Rate Limit Keys

server.method('com.example.api', {
  handler: async (ctx) => { /* ... */ },
  rateLimit: {
    durationMs: MINUTE,
    points: 60,
    calcKey: (ctx) => {
      // Rate limit by authenticated user
      return ctx.auth?.credentials.did ?? ctx.req.ip
    },
    calcPoints: (ctx) => {
      // Premium users get more points
      return ctx.auth?.isPremium ? 2 : 1
    },
  },
})

WebSocket Subscriptions

Handle real-time subscriptions:
server.streamMethod(
  'com.example.subscribeRepos',
  async function* (ctx) {
    const { cursor } = ctx.params

    // Stream events
    for await (const event of eventStream(cursor)) {
      yield {
        seq: event.seq,
        time: event.time,
        repo: event.repo,
        ops: event.ops,
      }
    }
  }
)

Stream Context

interface StreamContext<A extends Auth, P extends Params> {
  auth: A
  params: P
  req: IncomingMessage
  signal: AbortSignal  // Triggered on client disconnect
}

Frame Types

import { MessageFrame, ErrorFrame } from '@atproto/xrpc-server'

// Send message frame
yield MessageFrame.fromLexValue(data, nsid)

// Send error frame
yield ErrorFrame.fromError(new Error('Something went wrong'))

Error Handling

XRPC Errors

Built-in error types:
import {
  InvalidRequestError,
  AuthRequiredError,
  ForbiddenError,
  InternalServerError,
  UpstreamFailureError,
  MethodNotImplementedError,
} from '@atproto/xrpc-server'

// Throw errors from handlers
throw new InvalidRequestError('Invalid parameter value')
throw new AuthRequiredError('Authentication required')
throw new ForbiddenError('Insufficient permissions')

XRPCError Class

import { XRPCError, ResponseType } from '@atproto/xrpc-server'

throw new XRPCError(
  ResponseType.InvalidRequest,
  'Custom error message',
  'CustomErrorName'
)

Error Response Format

{
  "error": "InvalidRequest",
  "message": "Invalid parameter value"
}

Custom Error Parser

const server = createServer(lexicons, {
  errorParser: (err) => {
    if (err instanceof MyCustomError) {
      return new XRPCError(
        ResponseType.InvalidRequest,
        err.message,
        'MyCustomError'
      )
    }
    return XRPCError.fromError(err)
  },
})

Lexicon Schema Integration

Type-Safe Handlers with @atproto/lex

Use the @atproto/lex schema types:
import { l } from '@atproto/lex-schema'

const getProfile = l.procedure({
  params: l.object({
    actor: l.string(),
  }),
  output: l.object({
    did: l.string(),
    handle: l.string(),
    displayName: l.optional(l.string()),
  }),
})

server.add(getProfile, {
  handler: async (ctx) => {
    // ctx.params is typed as { actor: string }
    // return type is validated
    return {
      encoding: 'application/json',
      body: {
        did: 'did:plc:abc123',
        handle: 'user.bsky.social',
        displayName: 'User',
      },
    }
  },
})

Validation

Automatic validation of:
  • Query parameters
  • Request body (input)
  • Response body (output)
  • Content types and encodings

Advanced Features

Server Timing

Track performance metrics:
import { ServerTimer } from '@atproto/xrpc-server'

const timer = new ServerTimer()
timer.mark('fetch-data')
// ... fetch data ...
timer.measure('fetch-data')

timer.mark('process-data')
// ... process data ...
timer.measure('process-data')

// Add timing header to response
res.setHeader('Server-Timing', timer.toHeader())

Request Encoding

import { parseReqEncoding } from '@atproto/xrpc-server'

const encoding = parseReqEncoding(req)
// Returns content-type from header

NSID Parsing

import { parseReqNsid } from '@atproto/xrpc-server'

const nsid = parseReqNsid(req)
// Extracts NSID from /xrpc/{nsid} path

Complete Example

import express from 'express'
import {
  createServer,
  AuthRequiredError,
  InvalidRequestError,
  MemoryRateLimiter,
  MINUTE,
  HOUR,
} from '@atproto/xrpc-server'
import { LexiconDoc } from '@atproto/lexicon'

// Define lexicons
const lexicons: LexiconDoc[] = [
  // ... lexicon definitions
]

// Authentication
async function authVerifier(ctx) {
  const token = ctx.req.headers.authorization?.replace('Bearer ', '')
  if (!token) throw new AuthRequiredError()
  const did = await verifyToken(token)
  return { credentials: { did } }
}

// Create server
const server = createServer(lexicons, {
  validateResponse: true,
  payload: {
    jsonLimit: 100 * 1024,
    blobLimit: 5 * 1024 * 1024,
  },
  rateLimits: {
    creator: (opts) => new MemoryRateLimiter(opts),
    global: [
      {
        name: 'global-ip',
        durationMs: 5 * MINUTE,
        points: 3000,
      },
    ],
  },
})

// Add handlers
server.method('com.example.getRecord', async (ctx) => {
  const record = await db.getRecord(ctx.params.uri)
  return { encoding: 'application/json', body: { record } }
})

server.method('com.example.createRecord', {
  auth: authVerifier,
  handler: async (ctx) => {
    const uri = await db.createRecord(
      ctx.auth.credentials.did,
      ctx.input.body
    )
    return { encoding: 'application/json', body: { uri } }
  },
  rateLimit: {
    durationMs: HOUR,
    points: 100,
  },
})

// Mount in Express
const app = express()
app.use(express.json())
app.use(server.router)

app.listen(3000, () => {
  console.log('Server listening on port 3000')
})

Resources

Build docs developers (and LLMs) love