Documentation Index
Fetch the complete documentation index at: https://mintlify.com/cloudflare/pingora/llms.txt
Use this file to discover all available pages before exploring further.
The pingora-limits crate provides a Rate struct that implements a sliding-window event counter. By storing one Rate instance per logical client key (such as an API appid header), you can efficiently track how many requests each client has sent within the current time window and enforce a ceiling before those requests reach your upstream. The counter is lock-free and safe to share across threads, making it well-suited for use in a global static.
Dependencies
Add the following to your Cargo.toml:
[dependencies]
async-trait = "0.1"
pingora = { version = "0.8", features = ["lb", "openssl"] }
pingora-limits = "0.8.0"
once_cell = "1.19.0"
Implementation
Declare the global rate limiter
Create a process-wide Rate instance with the desired window duration. Rate::new(Duration::from_secs(1)) creates a one-second sliding window. Because it is wrapped in Lazy, initialization is deferred to the first access and the value lives for the lifetime of the process.use once_cell::sync::Lazy;
use pingora_limits::rate::Rate;
use std::time::Duration;
// Rate limiter: one-second sliding window
static RATE_LIMITER: Lazy<Rate> = Lazy::new(|| Rate::new(Duration::from_secs(1)));
Define the per-client request ceiling
A simple constant is enough. Here we allow at most one request per second per appid:// Maximum requests per second per client
static MAX_REQ_PER_SEC: isize = 1;
Extract the client identifier
Add a helper on your proxy struct that reads the appid header from the incoming request. Returning None means the request carries no identifier and will bypass rate limiting:impl LB {
pub fn get_request_appid(&self, session: &mut Session) -> Option<String> {
match session
.req_header()
.headers
.get("appid")
.map(|v| v.to_str())
{
None => None,
Some(v) => match v {
Ok(v) => Some(v.to_string()),
Err(_) => None,
},
}
}
}
Enforce the limit in request_filter
Override request_filter in your ProxyHttp implementation. Call RATE_LIMITER.observe(&appid, 1) to record the event and get the current window count back. If the count exceeds the ceiling, write a 429 response directly and return Ok(true) to short-circuit the proxy pipeline. Returning Ok(false) allows the request to continue normally.async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool>
where
Self::CTX: Send + Sync,
{
let appid = match self.get_request_appid(session) {
None => return Ok(false), // no appid found, skip rate limiting
Some(addr) => addr,
};
// Record the event and retrieve the current window count
let curr_window_requests = RATE_LIMITER.observe(&appid, 1);
if curr_window_requests > MAX_REQ_PER_SEC {
// Rate limited — return 429 with standard rate-limit headers
let mut header = ResponseHeader::build(429, None).unwrap();
header
.insert_header("X-Rate-Limit-Limit", MAX_REQ_PER_SEC.to_string())
.unwrap();
header.insert_header("X-Rate-Limit-Remaining", "0").unwrap();
header.insert_header("X-Rate-Limit-Reset", "1").unwrap();
session.set_keepalive(None);
session
.write_response_header(Box::new(header), true)
.await?;
return Ok(true);
}
Ok(false)
}
Complete Example
The following is the full working example combining all the pieces above, including the server bootstrap and load-balancer setup:
use async_trait::async_trait;
use once_cell::sync::Lazy;
use pingora::http::ResponseHeader;
use pingora::prelude::*;
use pingora_limits::rate::Rate;
use std::sync::Arc;
use std::time::Duration;
fn main() {
let mut server = Server::new(Some(Opt::default())).unwrap();
server.bootstrap();
let mut upstreams =
LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443"]).unwrap();
// Set health check
let hc = TcpHealthCheck::new();
upstreams.set_health_check(hc);
upstreams.health_check_frequency = Some(Duration::from_secs(1));
// Set background service
let background = background_service("health check", upstreams);
let upstreams = background.task();
// Set load balancer
let mut lb = http_proxy_service(&server.configuration, LB(upstreams));
lb.add_tcp("0.0.0.0:6188");
server.add_service(background);
server.add_service(lb);
server.run_forever();
}
pub struct LB(Arc<LoadBalancer<RoundRobin>>);
impl LB {
pub fn get_request_appid(&self, session: &mut Session) -> Option<String> {
match session
.req_header()
.headers
.get("appid")
.map(|v| v.to_str())
{
None => None,
Some(v) => match v {
Ok(v) => Some(v.to_string()),
Err(_) => None,
},
}
}
}
// Rate limiter: one-second sliding window
static RATE_LIMITER: Lazy<Rate> = Lazy::new(|| Rate::new(Duration::from_secs(1)));
// Maximum requests per second per client
static MAX_REQ_PER_SEC: isize = 1;
#[async_trait]
impl ProxyHttp for LB {
type CTX = ();
fn new_ctx(&self) {}
async fn upstream_peer(
&self,
_session: &mut Session,
_ctx: &mut Self::CTX,
) -> Result<Box<HttpPeer>> {
let upstream = self.0.select(b"", 256).unwrap();
// Set SNI
let peer = Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to_string()));
Ok(peer)
}
async fn upstream_request_filter(
&self,
_session: &mut Session,
upstream_request: &mut RequestHeader,
_ctx: &mut Self::CTX,
) -> Result<()>
where
Self::CTX: Send + Sync,
{
upstream_request
.insert_header("Host", "one.one.one.one")
.unwrap();
Ok(())
}
async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool>
where
Self::CTX: Send + Sync,
{
let appid = match self.get_request_appid(session) {
None => return Ok(false), // no client appid found, skip rate limiting
Some(addr) => addr,
};
// Retrieve the current window request count
let curr_window_requests = RATE_LIMITER.observe(&appid, 1);
if curr_window_requests > MAX_REQ_PER_SEC {
// Rate limited — return 429
let mut header = ResponseHeader::build(429, None).unwrap();
header
.insert_header("X-Rate-Limit-Limit", MAX_REQ_PER_SEC.to_string())
.unwrap();
header.insert_header("X-Rate-Limit-Remaining", "0").unwrap();
header.insert_header("X-Rate-Limit-Reset", "1").unwrap();
session.set_keepalive(None);
session
.write_response_header(Box::new(header), true)
.await?;
return Ok(true);
}
Ok(false)
}
}
Testing
Send requests with an appid header
curl localhost:6188 -H "appid:1" -v
The first request within any one-second window succeeds. Any subsequent request that arrives before the window resets receives a 429 Too Many Requests response:* Trying 127.0.0.1:6188...
* Connected to localhost (127.0.0.1) port 6188 (#0)
> GET / HTTP/1.1
> Host: localhost:6188
> User-Agent: curl/7.88.1
> Accept: */*
> appid:1
>
< HTTP/1.1 429 Too Many Requests
< X-Rate-Limit-Limit: 1
< X-Rate-Limit-Remaining: 0
< X-Rate-Limit-Reset: 1
< Date: Sun, 14 Jul 2024 20:29:02 GMT
< Connection: close
<
* Closing connection 0
Rate::observe returns the number of events recorded in the current window, not a per-second rate. The window resets once every Duration passed to Rate::new. In this example that is one second, so the counter resets every second.