Queue
- It can store more than 80 GB messages in a queue (different from azure storage queue)
- Under
Service Bus Explorer
you can send, receive or peek messages
Consume methods
-
Peek and Lock
-
Get a message and lock it for the message lock duration
- Prevents other apps to pick the same message during this time
- If the app crashes before processing the message then it will be available again in the queue after the lock period
-
Optionally you can "complete" the message to remove it from queue
-
Receive and Delete
-
Get a message and delete is right away from queue
- If the application crashes before it is processed then the message is lost
Sessions
- To guarantee the
FIFO
delivery of messages, you can enablesessions
for the queue - This is not achievable in Azure Storage Queue
Time to Live
- Maximum message lifetime
- If message is not consumed after that period the message is lost
Dead letter queue
- Messages that
couldn't be processed
orreached the TTL
will fall into the dead letter queue - Dead lettering on message expiration can be enabled on the queue overview
- Messages already in the dead letter queue do not expire
- To refer to the dead letter queue ->
queue-name/$DeadLetterQueue
Duplicate message detection
Duplicate detection
can be enabled only upon creation of the queue- Prevents the scenario when a sender sends a message and it crashes, then sends back the same message again
- A
duplicate time window
can be configured. No message can have samemessageID
in this period. Default of 10 minutes
Message properties
-
Broker/system defined properties
-
Body
: message payload ContentType
: applicantion/json, applicantion/xml, etcCorrelationId
: reference to the original messageId (for example when sending a new response message)MessageId
: for duplicate message detectionPartitionKey
ReplyTo
: queue to send the response to (e.g., message acknowledge). The correlationId of the response must contain reference to the messageId of the origin message!
message.ContentType = "application/json";
message.TimeToLive = TimeSpan.FromSeconds(30);
message.MessageId = "1";
-
To have a proper
audit trail
, you can either:- Use the
CorrelationId
(destination) to map to an existingMessageId
(source) - Use the
ReplyToSessionId
(destination) property to map to theSessionID
(source)
- Use the
-
User defined properties
-
Custom properties
message.ApplicationProperties.Add("Department", "HR");
message.ApplicationProperties.Add("School", "USP");
foreach (var key in message.ApplicationProperties.Keys)
{
Console.WriteLine(key.ToString());
Console.WriteLine(message.ApplicationProperties[key].ToString());
}
Dotnet SDK
- Nuget Package:
Azure.Messaging.ServiceBus
class Program
{
private static string connectionString = "Endpoint=sb://hvitoi.servicebus.windows.net/;SharedAccessKeyName=Send;SharedAccessKey=shared-access-key;EntityPath=appqueue";
private static string queueName = "appqueue";
static void Main(string[] args)
{
// Service Bus Client
ServiceBusClient client = new ServiceBusClient(connectionString);
// Send messages to Queue
ServiceBusSender sender = client.CreateSender(queueName);
List<Order> orders = new List<Order>()
{
new Order() {OrderID="O1",Quantity=10,UnitPrice=9.99m},
new Order() {OrderID="O2",Quantity=15,UnitPrice=10.99m },
new Order() {OrderID="O3",Quantity=20,UnitPrice=11.99m},
new Order() {OrderID="O4",Quantity=25,UnitPrice=12.99m},
new Order() {OrderID="O5",Quantity=30,UnitPrice=13.99m }
};
foreach(Order order in orders)
{
ServiceBusMessage message = new ServiceBusMessage(order.ToString());
message.ContentType = "application/json";
message.TimeToLive = TimeSpan.FromSeconds(30); // override the default TTL of the queue
message.MessageId = "1"; // Explicitly specify MessageId. If not specified it will generate an UUID
message.ApplicationProperties.Add("Department", "HR"); // custom key-value property
sender.SendMessageAsync(message).GetAwaiter().GetResult();
}
Console.WriteLine("All of the messages have been sent");
// Peek and Lock
ServiceBusReceiver receiver = client.CreateReceiver(queueName, new ServiceBusReceiverOptions() { ReceiveMode = ServiceBusReceiveMode.PeekLock });
ServiceBusReceivedMessage message = receiver.ReceiveMessageAsync().GetAwaiter().GetResult(); // ReceiveMessagesAsync(3) for multiple messages and omit the type
Console.WriteLine(message.Body);
Console.WriteLine($"The Sequence number is {message.SequenceNumber}");
receiver.CompleteMessageAsync(message); // optionally remove message from queue after it has been processed
// Receive and Delete
ServiceBusReceiver receiver = client.CreateReceiver(queueName, new ServiceBusReceiverOptions() { ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete });
ServiceBusReceivedMessage message = receiver.ReceiveMessageAsync().GetAwaiter().GetResult();
Console.WriteLine(message.Body);
Console.WriteLine($"The Sequence number is {message.SequenceNumber}");
// Get Application properties
ServiceBusReceiver receiver = client.CreateReceiver(queueName, new ServiceBusReceiverOptions() { ReceiveMode = ServiceBusReceiveMode.PeekLock });
ServiceBusReceivedMessage message = receiver.ReceiveMessageAsync().GetAwaiter().GetResult();
Console.WriteLine(message.Body);
foreach (var key in message.ApplicationProperties.Keys)
{
Console.WriteLine(key.ToString());
Console.WriteLine(message.ApplicationProperties[key].ToString());
}
}
}