Overview
Receivers are responsible for reading messages from message queues and distributing them to workers for transmission. The current implementation supports Amazon SQS.
SQS Receiver
Long Polling Strategy
The SQS receiver uses long polling to efficiently receive messages without overwhelming the SQS API:
res, err := p.client.ReceiveMessage(p.ctx, &sqs.ReceiveMessageInput{
QueueUrl: &p.queueURL,
MaxNumberOfMessages: p.batchSize,
VisibilityTimeout: p.visibilityTimeout,
WaitTimeSeconds: 20, // Long polling wait time
AttributeNames: []types.QueueAttributeName{
types.QueueAttributeName(SQSAttributeApproxomiteReceiveCount),
types.QueueAttributeName(SQSAttributeApproxomiteFirstReceiveTimestamp),
},
MessageAttributeNames: []string{
SQSMessageAttributeBodyContentType,
},
})
Benefits of long polling:
- Reduces API calls and costs
- Minimizes empty receives
- 20-second wait provides near-real-time message delivery
Batch Processing
Messages are fetched and processed in batches for efficiency:
type ReceiverConfig struct {
LogHandler slog.Handler
SQSClient MessageReadWriter
SQSQueueName string
VisibilityTimeout int
BatchSize int // Number of messages per batch
MaxWorkers int // Concurrent workers
Transmitter Transmitter
Ctx context.Context
}
Configuration:
SQS_BATCH_SIZE: Number of messages to fetch per API call (1-10)
SQS_RECEIVER_WORKERS: Number of concurrent workers per receiver
Message Attributes Handling
The receiver extracts SQS message attributes and converts them to transmit attributes:
SQS Attributes
From receiver/sqs/sqs.go:18-30:
const (
SQSAttributeApproxomiteReceiveCount = "ApproximateReceiveCount"
SQSAttributeApproxomiteFirstReceiveTimestamp = "ApproximateFirstReceiveTimestamp"
SQSMessageAttributeBodyContentType = "Body.ContentType"
TransmitAttributeReceiveCount = "Receive-Count"
TransmitAttributeFirstReceiveTime = "First-Receive-Time"
TransmitAttributeContentType = "Content-Type"
)
Attribute Conversion
From receiver/sqs/sqs.go:135-155:
func (h *handler) generateAttributes(m *message) transmitter.TransmitAttributes {
attributes := make(transmitter.TransmitAttributes)
for k, v := range m.Attributes {
switch k {
case SQSAttributeApproxomiteReceiveCount:
attributes[TransmitAttributeReceiveCount] = v
case SQSAttributeApproxomiteFirstReceiveTimestamp:
attributes[TransmitAttributeFirstReceiveTime] = v
}
}
for k, v := range m.MessageAttributes {
switch k {
case SQSMessageAttributeBodyContentType:
if v.StringValue != nil {
attributes[TransmitAttributeContentType] = *v.StringValue
}
}
}
return attributes
}
These attributes are passed to the transmitter as HTTP headers (with the X-Carrier- prefix).
Interfaces
MessageReader Interface
Defines the read operations for SQS:
type MessageReader interface {
GetQueueUrl(context.Context, *sqs.GetQueueUrlInput, ...func(*sqs.Options)) (*sqs.GetQueueUrlOutput, error)
ReceiveMessage(context.Context, *sqs.ReceiveMessageInput, ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error)
}
MessageWriter Interface
Defines the write operations for SQS:
type MessageWriter interface {
DeleteMessageBatch(context.Context, *sqs.DeleteMessageBatchInput, ...func(*sqs.Options)) (*sqs.DeleteMessageBatchOutput, error)
ChangeMessageVisibilityBatch(context.Context, *sqs.ChangeMessageVisibilityBatchInput, ...func(*sqs.Options)) (*sqs.ChangeMessageVisibilityBatchOutput, error)
}
MessageReadWriter Interface
Combines both interfaces:
type MessageReadWriter interface {
MessageReader
MessageWriter
}
Testing: These interfaces allow for easy mocking of the SQS client in unit tests.
Worker Pool Architecture
Each receiver maintains its own pool of workers:
workers := c.BatchSize
if c.MaxWorkers != 0 {
workers = c.MaxWorkers
}
messages := make(chan *message, c.BatchSize)
results := make(chan *transmitResult, c.BatchSize)
ctx, cancel := context.WithCancel(context.Background())
p := pool.NewPool(&pool.PoolConfig{
Size: workers,
BufferSize: workers,
Ctx: ctx,
})
for range workers {
h := newHandler(&handlerConfig{
Transmitter: c.Transmitter,
Ctx: ctx,
Work: messages,
Results: results,
})
p.Run(h.handleMessages)
}
Message Processing Flow
- Receive batch from SQS
- Distribute messages to worker channel
- Workers transmit messages concurrently
- Collect results from all workers
- Batch delete successful messages
- Batch update visibility for retryable errors
Result Handling
Success Path
From receiver/sqs/sqs.go:263-268:
if r.err == nil {
deleteEntries = append(deleteEntries, types.DeleteMessageBatchRequestEntry{
Id: r.MessageID,
ReceiptHandle: r.ReceiptHandle,
})
}
Retryable Error Path
From receiver/sqs/sqs.go:252-260:
if r.err != nil {
var err *transmitter.TransmitRetryableError
if errors.As(r.err, &err) {
// update visibility timeout on retryable errors
retryEntries = append(retryEntries, types.ChangeMessageVisibilityBatchRequestEntry{
Id: r.MessageID,
ReceiptHandle: r.ReceiptHandle,
VisibilityTimeout: int32(err.RetryAfter.Seconds()),
})
}
}
Non-Retryable Error Path
else {
p.log.Error("failed to transmit message", "error", r.err)
}
Non-retryable errors are logged but the message remains in the queue. It will eventually be reprocessed or moved to a dead-letter queue based on SQS configuration.
Batch Operations
Batch Delete
From receiver/sqs/sqs.go:270-280:
if len(deleteEntries) > 0 {
_, err := p.client.DeleteMessageBatch(p.ctx, &sqs.DeleteMessageBatchInput{
QueueUrl: &p.queueURL,
Entries: deleteEntries,
})
if err != nil {
p.log.Error("failed to delete messages", "error", err)
} else {
p.log.Debug("deleted messages", "count", len(deleteEntries))
}
}
Batch Visibility Update
From receiver/sqs/sqs.go:282-293:
if len(retryEntries) > 0 {
_, err := p.client.ChangeMessageVisibilityBatch(p.ctx, &sqs.ChangeMessageVisibilityBatchInput{
QueueUrl: &p.queueURL,
Entries: retryEntries,
})
if err != nil {
p.log.Error("failed to update message visibility", "error", err)
} else {
p.log.Debug("updated message visibility", "count", len(retryEntries))
}
}
If batch operations fail, messages may be reprocessed. Ensure your webhook endpoints are idempotent.
Event Loop
The main receive loop runs until shutdown:
func (p *Receiver) Rx() {
p.log.Info("starting event loop", "batch_size", p.batchSize, "max_workers", p.maxWorkers)
for {
select {
case <-p.ctx.Done():
p.log.Info("stopping event loop")
p.cancel()
p.pool.Stop(true)
return
default:
// poll for new messages on the queue
res, err := p.client.ReceiveMessage(/* ... */)
if err != nil {
if !errors.Is(err, context.Canceled) {
p.log.Error("failed to receive messages", "error", err)
}
continue
}
p.log.Debug("received messages", "count", len(res.Messages))
p.processMessages(res)
}
}
}
Configuration Reference
| Environment Variable | Description | Default |
|---|
SQS_ENDPOINT | SQS service endpoint | Required |
SQS_QUEUE_NAME | Name of the SQS queue | Required |
SQS_RECEIVERS | Number of concurrent receivers | 1 |
SQS_RECEIVER_WORKERS | Workers per receiver | 1 |
SQS_BATCH_SIZE | Messages per batch | 1 |
For high throughput, increase SQS_RECEIVERS and SQS_RECEIVER_WORKERS. Monitor your webhook endpoint’s capacity to handle concurrent requests.