The Aspire.Azure.Messaging.ServiceBus component registers a ServiceBusClient in your dependency injection container for connecting to Azure Service Bus. It automatically enables logging and distributed tracing.
public class OrderService{ private readonly ServiceBusClient _serviceBusClient; public OrderService(ServiceBusClient serviceBusClient) { _serviceBusClient = serviceBusClient; } public async Task SendOrderAsync(Order order) { await using var sender = _serviceBusClient.CreateSender("orders"); var message = new ServiceBusMessage( JsonSerializer.Serialize(order)) { ContentType = "application/json", MessageId = order.Id.ToString(), Subject = "NewOrder" }; await sender.SendMessageAsync(message); } public async Task SendBatchAsync(List<Order> orders) { await using var sender = _serviceBusClient.CreateSender("orders"); using var messageBatch = await sender.CreateMessageBatchAsync(); foreach (var order in orders) { var message = new ServiceBusMessage( JsonSerializer.Serialize(order)); if (!messageBatch.TryAddMessage(message)) { throw new Exception($"Message too large for batch"); } } await sender.SendMessagesAsync(messageBatch); }}
public async Task ScheduleMessageAsync(Order order, TimeSpan delay){ await using var sender = _serviceBusClient.CreateSender("orders"); var message = new ServiceBusMessage(JsonSerializer.Serialize(order)) { ScheduledEnqueueTime = DateTimeOffset.UtcNow.Add(delay) }; await sender.SendMessageAsync(message);}
public async Task ProcessDeadLetterQueueAsync(){ await using var receiver = _serviceBusClient.CreateReceiver( "orders", new ServiceBusReceiverOptions { SubQueue = SubQueue.DeadLetter }); await foreach (var message in receiver.ReceiveMessagesAsync()) { Console.WriteLine($"Dead letter: {message.Body}"); Console.WriteLine($"Reason: {message.DeadLetterReason}"); Console.WriteLine($"Error: {message.DeadLetterErrorDescription}"); // Process or log the dead letter message await receiver.CompleteMessageAsync(message); }}
public async Task SendWithDeduplicationAsync(Order order){ await using var sender = _serviceBusClient.CreateSender("orders"); var message = new ServiceBusMessage(JsonSerializer.Serialize(order)) { MessageId = order.Id.ToString() // Used for deduplication }; await sender.SendMessageAsync(message);}
Always use managed identity or DefaultAzureCredential in production instead of connection strings with shared access keys.
Implement proper error handling
Handle different failure scenarios appropriately:
try{ await ProcessMessageAsync(message); await args.CompleteMessageAsync(message);}catch (TemporaryException ex){ // Retry later await args.AbandonMessageAsync(message);}catch (PermanentException ex){ // Move to dead letter await args.DeadLetterMessageAsync( message, "ProcessingFailed", ex.Message);}
Set appropriate prefetch count
Configure prefetch for better performance:
var processor = _serviceBusClient.CreateProcessor( "orders", new ServiceBusProcessorOptions { PrefetchCount = 10, MaxConcurrentCalls = 5 });
Use batching for high throughput
Send messages in batches when possible:
using var messageBatch = await sender.CreateMessageBatchAsync();foreach (var order in orders){ var message = new ServiceBusMessage( JsonSerializer.Serialize(order)); if (!messageBatch.TryAddMessage(message)) { // Send current batch and create new one await sender.SendMessagesAsync(messageBatch); messageBatch.Clear(); messageBatch.TryAddMessage(message); }}if (messageBatch.Count > 0){ await sender.SendMessagesAsync(messageBatch);}
Monitor queue length and dead letters
Regularly monitor queue metrics to detect issues:
Active message count
Dead letter message count
Scheduled message count
Set message time-to-live
Prevent message accumulation:
var message = new ServiceBusMessage(body){ TimeToLive = TimeSpan.FromHours(24)};
Use topics for pub/sub patterns
Use topics when multiple subscribers need the same message: