Storage Account - Queues
- Queues are used to decouple components of the application
- Messages in a queue are processed in order and after they have been processed they are deleted
- Applications cannot take the same message at the same time
- Azure Storage Queue
do not guarantee FIFO and the order of the messages
Message
- On
creating a message
in a queue you can specify the expiration time
and the encoding
(base64)
- Message elements:
- UUID
- Body
- Insertion time
- Expiration time
- Dequeue count
- When a message is read and processed, it is then
dequeued
Dequeue
is the action of reading the first element of the queue and then removing it from the queue
- A good pratice is to sent a message that couldn't be processed multiple times to a poison queue (dead letter queue)
Dotnet SDK
- Nuget package:
Azure.Storage.Queues
class Program
{
private static string connectionString = "DefaultEndpointsProtocol=https;AccountName=hvitoi;AccountKey=account-key;EndpointSuffix=core.windows.net";
private static string queueName = "appqueue";
static void Main(string[] args)
{
QueueClient queueClient = new QueueClient(connectionString, queueName);
if (queueClient.Exists())
{
// Send message
string message = $"This is test message to the queue {queueName}.";
var messageBytes = System.Text.Encoding.UTF8.GetBytes(message);
encodedMessage = System.Convert.ToBase64String(messageBytes); // encode msg to base64
queueClient.SendMessage(encodedMessage);
Console.WriteLine("Message sent to queue.");
// Peek message (read but do not remove from queue)
PeekedMessage[] peekedMessages = queueClient.PeekMessages(2); // Peek 2 messages
foreach(PeekedMessage peekedMessage in peekedMessages)
{
Console.WriteLine($"Message ID is {peekedMessage.MessageId}");
Console.WriteLine($"Message was inserted on {peekedMessage.InsertedOn}");
Console.WriteLine($"Message body is {peekedMessage.Body.ToString()}");
}
// Receive message (read and remove from queue)
QueueMessage queueMessage = queueClient.ReceiveMessage();
Console.WriteLine(queueMessage.Body.ToString());
queueClient.DeleteMessage(queueMessage.MessageId, queueMessage.PopReceipt);
Console.WriteLine("Message processed and deleted");
}
}
}
Connection Queue <--> Table
public static class GetDetails
{
[Function("QueueTable")]
[return: Table("Orders", Connection = "hvitoi_STORAGE")] // connection to the table
public static Order Run(
[QueueTrigger("appqueue", Connection = "hvitoi_STORAGE")] JObject myQueueItem, // connection to the queue
ILogger log
)
{
Order order = new Order()
{
PartitionKey = myQueueItem["Category"].ToString(),
RowKey = myQueueItem["OrderID"].ToString(),
Quantity = Convert.ToInt32(myQueueItem["Quantity"]),
UnitPrice = Convert.ToDecimal(myQueueItem["UnitPrice"])
};
log.LogInformation("Order written to table");
return order; // The return object will then be saved to the table
}
}
Connection Queue <--> Blob
public static class ProcessMessage
{
[Function("SendtoBlob")]
public static void Run(
[QueueTrigger("appqueue", Connection = "hvitoi_STORAGE")] string myQueueItem,
[Blob("trigger/{queueTrigger}", FileAccess.Read, Connection = "hvitoi_STORAGE")] Stream blob,
ILogger log
)
{
log.LogInformation($"C# Queue trigger function processed: {myQueueItem}");
StreamReader reader = new StreamReader(blob);
log.LogInformation(reader.ReadToEnd());
}
}