Skip to content

Service Bus Topic

  • The receiver has to subscribe a topic in order to receive the messages
  • The receiver can apply filters to decide which messages to get

Subscription

  • Similar to a Consumer Group in Kafka
  • The max delivery count defines the maximum amount of messages can be delivered at once to a receiver (subscriber)
  • Each subscription receives all the messages (if no filter is applied). When the message is read and deleted, it's deleted for that subscription only!

Filters

  • Filter the messages the receiver in a subscription wants to receive
  • Filters are configured per subscription!

  • SQL Filters

  • SQL-like conditions for the evaluation against system (sys prefix) or custom properties

  • Boolean Filters are a kind of SQL filter (TrueFilter or FalseFilter)
1=1 -- accept all messages
1=0 -- do not receive any message
sys.messageid='3' -- accept only messages with id 3
  • Correlation Filters

  • Match against system or custom properties that must all match

  • Faster than SQL
# System properties
contentType=application/json

# Custom properties
department=HR

Dotnet SDK

  • Nuget Package: Azure.Messaging.ServiceBus
class Program
{
  private static string connectionString = "Endpoint=sb://hvitoi.servicebus.windows.net/;SharedAccessKeyName=Send;SharedAccessKey=shared-access-key;EntityPath=apptopic";
  private static string topicName = "apptopic";
  private static string subscriptionName = "SubscriptionA";
  static void Main(string[] args)
  {
    // Service Bus Client
    ServiceBusClient client = new ServiceBusClient(connectionString);

    // Send messages to Topic
    ServiceBusSender sender = client.CreateSender(topicName);
    List<Order> orders = new List<Order>()
    {
      new Order() {OrderID="O1",Quantity=10,UnitPrice=9.99m},
      new Order() {OrderID="O2",Quantity=15,UnitPrice=10.99m },
      new Order() {OrderID="O3",Quantity=20,UnitPrice=11.99m},
      new Order() {OrderID="O4",Quantity=25,UnitPrice=12.99m},
      new Order() {OrderID="O5",Quantity=30,UnitPrice=13.99m }
    };
    foreach(Order order in orders)
    {
      ServiceBusMessage message = new ServiceBusMessage(order.ToString());
      sender.SendMessageAsync(message).GetAwaiter().GetResult();
    }
    Console.WriteLine("Messages sent to topic");

    // Receive message from topic (in a subscription)
    ServiceBusReceiver receiver = client.CreateReceiver(topicName, subscriptionName, new ServiceBusReceiverOptions() { ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete });
    ServiceBusReceivedMessage message = receiver.ReceiveMessageAsync().GetAwaiter().GetResult();
    Console.WriteLine(message.Body);
    Console.WriteLine($"The Sequence number is {message.SequenceNumber}");
    receiver.CompleteMessageAsync(message);

  }
}