Skip to main content

Overview

The Aspire.Azure.Messaging.ServiceBus component registers a ServiceBusClient in your dependency injection container for connecting to Azure Service Bus. It automatically enables logging and distributed tracing.

Installation

Install the component using the .NET CLI:
dotnet add package Aspire.Azure.Messaging.ServiceBus

Prerequisites

Usage

Register the component

In your service’s Program.cs file, call the AddAzureServiceBusClient extension method:
builder.AddAzureServiceBusClient("servicebus");

Send messages to a queue

Inject and use the client to send messages:
public class OrderService
{
    private readonly ServiceBusClient _serviceBusClient;

    public OrderService(ServiceBusClient serviceBusClient)
    {
        _serviceBusClient = serviceBusClient;
    }

    public async Task SendOrderAsync(Order order)
    {
        await using var sender = _serviceBusClient.CreateSender("orders");
        
        var message = new ServiceBusMessage(
            JsonSerializer.Serialize(order))
        {
            ContentType = "application/json",
            MessageId = order.Id.ToString(),
            Subject = "NewOrder"
        };

        await sender.SendMessageAsync(message);
    }

    public async Task SendBatchAsync(List<Order> orders)
    {
        await using var sender = _serviceBusClient.CreateSender("orders");
        
        using var messageBatch = await sender.CreateMessageBatchAsync();

        foreach (var order in orders)
        {
            var message = new ServiceBusMessage(
                JsonSerializer.Serialize(order));

            if (!messageBatch.TryAddMessage(message))
            {
                throw new Exception($"Message too large for batch");
            }
        }

        await sender.SendMessagesAsync(messageBatch);
    }
}

Receive messages from a queue

public class OrderProcessor : BackgroundService
{
    private readonly ServiceBusClient _serviceBusClient;
    private readonly ILogger<OrderProcessor> _logger;

    public OrderProcessor(
        ServiceBusClient serviceBusClient,
        ILogger<OrderProcessor> logger)
    {
        _serviceBusClient = serviceBusClient;
        _logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await using var processor = _serviceBusClient.CreateProcessor(
            "orders",
            new ServiceBusProcessorOptions
            {
                MaxConcurrentCalls = 10,
                AutoCompleteMessages = false
            });

        processor.ProcessMessageAsync += MessageHandler;
        processor.ProcessErrorAsync += ErrorHandler;

        await processor.StartProcessingAsync(stoppingToken);

        try
        {
            await Task.Delay(Timeout.Infinite, stoppingToken);
        }
        finally
        {
            await processor.StopProcessingAsync();
        }
    }

    private async Task MessageHandler(ProcessMessageEventArgs args)
    {
        var body = args.Message.Body.ToString();
        _logger.LogInformation("Received message: {MessageId}", args.Message.MessageId);

        try
        {
            var order = JsonSerializer.Deserialize<Order>(body);
            await ProcessOrderAsync(order);

            // Complete the message
            await args.CompleteMessageAsync(args.Message);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error processing message");
            
            // Abandon the message to retry
            await args.AbandonMessageAsync(args.Message);
        }
    }

    private Task ErrorHandler(ProcessErrorEventArgs args)
    {
        _logger.LogError(args.Exception, "Error in message processing");
        return Task.CompletedTask;
    }

    private async Task ProcessOrderAsync(Order order)
    {
        // Process the order
        await Task.CompletedTask;
    }
}

Work with topics and subscriptions

public class EventPublisher
{
    private readonly ServiceBusClient _serviceBusClient;

    public EventPublisher(ServiceBusClient serviceBusClient)
    {
        _serviceBusClient = serviceBusClient;
    }

    public async Task PublishEventAsync(string eventType, object eventData)
    {
        await using var sender = _serviceBusClient.CreateSender("events");
        
        var message = new ServiceBusMessage(
            JsonSerializer.Serialize(eventData))
        {
            Subject = eventType,
            ApplicationProperties =
            {
                { "EventType", eventType },
                { "Timestamp", DateTime.UtcNow }
            }
        };

        await sender.SendMessageAsync(message);
    }
}

public class EventSubscriber : BackgroundService
{
    private readonly ServiceBusClient _serviceBusClient;

    public EventSubscriber(ServiceBusClient serviceBusClient)
    {
        _serviceBusClient = serviceBusClient;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await using var processor = _serviceBusClient.CreateProcessor(
            topicName: "events",
            subscriptionName: "order-events");

        processor.ProcessMessageAsync += async args =>
        {
            var eventType = args.Message.Subject;
            var body = args.Message.Body.ToString();
            
            // Handle event based on type
            await HandleEventAsync(eventType, body);
            await args.CompleteMessageAsync(args.Message);
        };

        processor.ProcessErrorAsync += args =>
        {
            Console.WriteLine(args.Exception);
            return Task.CompletedTask;
        };

        await processor.StartProcessingAsync(stoppingToken);
        await Task.Delay(Timeout.Infinite, stoppingToken);
    }

    private async Task HandleEventAsync(string eventType, string body)
    {
        // Handle event
        await Task.CompletedTask;
    }
}

Configuration

The component provides multiple configuration options based on your project requirements. Provide a fully qualified namespace in your appsettings.json:
{
  "ConnectionStrings": {
    "servicebus": "mynamespace.servicebus.windows.net"
  }
}
This approach works with DefaultAzureCredential for authentication.

Connection string

Alternatively, use a connection string:
{
  "ConnectionStrings": {
    "servicebus": "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=mykey"
  }
}

Configuration providers

Configure component settings using the Aspire:Azure:Messaging:ServiceBus configuration section:
{
  "Aspire": {
    "Azure": {
      "Messaging": {
        "ServiceBus": {
          "HealthCheckQueueName": "health",
          "DisableTracing": false,
          "ClientOptions": {
            "Identifier": "myapp"
          }
        }
      }
    }
  }
}

Inline configuration

Configure settings directly in code using a delegate:
builder.AddAzureServiceBusClient("servicebus", settings => 
{
    settings.HealthCheckQueueName = "health";
});
You can also configure the ServiceBusClientOptions:
builder.AddAzureServiceBusClient("servicebus",
    configureClientBuilder: clientBuilder => 
    {
        clientBuilder.ConfigureOptions(options => 
        {
            options.Identifier = "myapp";
            options.RetryOptions.MaxRetries = 5;
        });
    });

AppHost integration

In your AppHost project, install the Azure Service Bus hosting package:
dotnet add package Aspire.Hosting.Azure.ServiceBus
Register an Azure Service Bus namespace:
var serviceBus = builder.AddAzureServiceBus("servicebus")
                        .AddQueue("orders")
                        .AddTopic("events")
                        .AddSubscription("events", "order-events");

var myService = builder.AddProject<Projects.MyService>()
                       .WithReference(serviceBus);
For local development, you can use a connection string:
var serviceBus = builder.ExecutionContext.IsPublishMode
    ? builder.AddAzureServiceBus("servicebus")
    : builder.AddConnectionString("servicebus");
Consume in your service’s Program.cs:
builder.AddAzureServiceBusClient("servicebus");
The connection name passed to WithReference must match the name used in AddAzureServiceBusClient.

Configuration options

The following settings are available:
SettingDescriptionDefault
HealthCheckQueueNameQueue name for health checksnull
DisableTracingDisable OpenTelemetry tracingfalse

Health checks

The component automatically registers a health check when HealthCheckQueueName is specified. The health check:
  • Verifies connectivity to the Service Bus namespace
  • Checks the specified queue exists
  • Reports the status in the /health endpoint

Observability

Logging

The component integrates with .NET logging to provide:
  • Message send and receive events
  • Connection information
  • Error details

Tracing

Distributed tracing captures:
  • Message send operations
  • Message receive operations
  • Processing time
Traces appear in the Aspire dashboard and integrate with OpenTelemetry collectors.

Common scenarios

Scheduled messages

Schedule messages for future delivery:
public async Task ScheduleMessageAsync(Order order, TimeSpan delay)
{
    await using var sender = _serviceBusClient.CreateSender("orders");
    
    var message = new ServiceBusMessage(JsonSerializer.Serialize(order))
    {
        ScheduledEnqueueTime = DateTimeOffset.UtcNow.Add(delay)
    };

    await sender.SendMessageAsync(message);
}

Dead letter queue handling

Handle messages that fail processing:
public async Task ProcessDeadLetterQueueAsync()
{
    await using var receiver = _serviceBusClient.CreateReceiver(
        "orders",
        new ServiceBusReceiverOptions
        {
            SubQueue = SubQueue.DeadLetter
        });

    await foreach (var message in receiver.ReceiveMessagesAsync())
    {
        Console.WriteLine($"Dead letter: {message.Body}");
        Console.WriteLine($"Reason: {message.DeadLetterReason}");
        Console.WriteLine($"Error: {message.DeadLetterErrorDescription}");
        
        // Process or log the dead letter message
        await receiver.CompleteMessageAsync(message);
    }
}

Session-enabled queues

Process messages in order using sessions:
public async Task SendSessionMessageAsync(string sessionId, Order order)
{
    await using var sender = _serviceBusClient.CreateSender("orders");
    
    var message = new ServiceBusMessage(JsonSerializer.Serialize(order))
    {
        SessionId = sessionId
    };

    await sender.SendMessageAsync(message);
}

public async Task ProcessSessionMessagesAsync()
{
    await using var processor = _serviceBusClient.CreateSessionProcessor(
        "orders",
        new ServiceBusSessionProcessorOptions
        {
            MaxConcurrentSessions = 5
        });

    processor.ProcessMessageAsync += async args =>
    {
        Console.WriteLine($"Session: {args.SessionId}");
        var body = args.Message.Body.ToString();
        await ProcessOrderAsync(body);
        await args.CompleteMessageAsync(args.Message);
    };

    processor.ProcessErrorAsync += args =>
    {
        Console.WriteLine(args.Exception);
        return Task.CompletedTask;
    };

    await processor.StartProcessingAsync();
}

Message filtering

Use correlation filters for topic subscriptions:
// Create subscription with filter (using Azure SDK or portal)
var rule = new CreateRuleOptions
{
    Name = "OrderFilter",
    Filter = new CorrelationRuleFilter
    {
        Subject = "Order",
        ApplicationProperties =
        {
            { "Priority", "High" }
        }
    }
};

Duplicate detection

Enable duplicate detection when sending:
public async Task SendWithDeduplicationAsync(Order order)
{
    await using var sender = _serviceBusClient.CreateSender("orders");
    
    var message = new ServiceBusMessage(JsonSerializer.Serialize(order))
    {
        MessageId = order.Id.ToString()  // Used for deduplication
    };

    await sender.SendMessageAsync(message);
}

Best practices

Always use managed identity or DefaultAzureCredential in production instead of connection strings with shared access keys.
Handle different failure scenarios appropriately:
try
{
    await ProcessMessageAsync(message);
    await args.CompleteMessageAsync(message);
}
catch (TemporaryException ex)
{
    // Retry later
    await args.AbandonMessageAsync(message);
}
catch (PermanentException ex)
{
    // Move to dead letter
    await args.DeadLetterMessageAsync(
        message,
        "ProcessingFailed",
        ex.Message);
}
Configure prefetch for better performance:
var processor = _serviceBusClient.CreateProcessor(
    "orders",
    new ServiceBusProcessorOptions
    {
        PrefetchCount = 10,
        MaxConcurrentCalls = 5
    });
Send messages in batches when possible:
using var messageBatch = await sender.CreateMessageBatchAsync();

foreach (var order in orders)
{
    var message = new ServiceBusMessage(
        JsonSerializer.Serialize(order));
    
    if (!messageBatch.TryAddMessage(message))
    {
        // Send current batch and create new one
        await sender.SendMessagesAsync(messageBatch);
        messageBatch.Clear();
        messageBatch.TryAddMessage(message);
    }
}

if (messageBatch.Count > 0)
{
    await sender.SendMessagesAsync(messageBatch);
}
Regularly monitor queue metrics to detect issues:
  • Active message count
  • Dead letter message count
  • Scheduled message count
Prevent message accumulation:
var message = new ServiceBusMessage(body)
{
    TimeToLive = TimeSpan.FromHours(24)
};
Use topics when multiple subscribers need the same message:
  • Queues: Point-to-point (single consumer)
  • Topics: Publish-subscribe (multiple subscribers)

Additional resources

RabbitMQ

Connect to RabbitMQ brokers

Apache Kafka

Work with Apache Kafka

Azure Storage Queues

Use Azure Storage Queues

Service Bus Hosting

Learn about the Service Bus hosting integration

Build docs developers (and LLMs) love