Skip to content

Storage Account - Queues

  • Queues are used to decouple components of the application
  • Messages in a queue are processed in order and after they have been processed they are deleted
  • Applications cannot take the same message at the same time
  • Azure Storage Queue do not guarantee FIFO and the order of the messages

Message

  • On creating a message in a queue you can specify the expiration time and the encoding (base64)
  • Message elements:
  • UUID
  • Body
  • Insertion time
  • Expiration time
  • Dequeue count
  • When a message is read and processed, it is then dequeued
  • Dequeue is the action of reading the first element of the queue and then removing it from the queue
  • A good pratice is to sent a message that couldn't be processed multiple times to a poison queue (dead letter queue)

Dotnet SDK

  • Nuget package: Azure.Storage.Queues
class Program
{
  private static string connectionString = "DefaultEndpointsProtocol=https;AccountName=hvitoi;AccountKey=account-key;EndpointSuffix=core.windows.net";
  private static string queueName = "appqueue";

  static void Main(string[] args)
  {
    QueueClient queueClient = new QueueClient(connectionString, queueName);

    if (queueClient.Exists())
    {
      // Send message
      string message = $"This is test message to the queue {queueName}.";
      var messageBytes = System.Text.Encoding.UTF8.GetBytes(message);
      encodedMessage = System.Convert.ToBase64String(messageBytes); // encode msg to base64
      queueClient.SendMessage(encodedMessage);
      Console.WriteLine("Message sent to queue.");

      // Peek message (read but do not remove from queue)
      PeekedMessage[] peekedMessages = queueClient.PeekMessages(2); // Peek 2 messages
      foreach(PeekedMessage peekedMessage in peekedMessages)
      {
        Console.WriteLine($"Message ID is {peekedMessage.MessageId}");
        Console.WriteLine($"Message was inserted on {peekedMessage.InsertedOn}");
        Console.WriteLine($"Message body is {peekedMessage.Body.ToString()}");
      }

      // Receive message (read and remove from queue)
      QueueMessage queueMessage = queueClient.ReceiveMessage();
      Console.WriteLine(queueMessage.Body.ToString());
      queueClient.DeleteMessage(queueMessage.MessageId, queueMessage.PopReceipt);
      Console.WriteLine("Message processed and deleted");
    }
  }
}

Connection Queue <--> Table

public static class GetDetails
{
  [Function("QueueTable")]
  [return: Table("Orders", Connection = "hvitoi_STORAGE")] // connection to the table
  public static Order Run(
    [QueueTrigger("appqueue", Connection = "hvitoi_STORAGE")] JObject myQueueItem, // connection to the queue
    ILogger log
  )
  {
    Order order = new Order()
    {
      PartitionKey = myQueueItem["Category"].ToString(),
      RowKey = myQueueItem["OrderID"].ToString(),
      Quantity = Convert.ToInt32(myQueueItem["Quantity"]),
      UnitPrice = Convert.ToDecimal(myQueueItem["UnitPrice"])
    };

    log.LogInformation("Order written to table");

    return order; // The return object will then be saved to the table
  }
}

Connection Queue <--> Blob

public static class ProcessMessage
{
  [Function("SendtoBlob")]
  public static void Run(
    [QueueTrigger("appqueue", Connection = "hvitoi_STORAGE")] string myQueueItem,
    [Blob("trigger/{queueTrigger}", FileAccess.Read, Connection = "hvitoi_STORAGE")] Stream blob,
    ILogger log
  )
  {
    log.LogInformation($"C# Queue trigger function processed: {myQueueItem}");
    StreamReader reader = new StreamReader(blob);
    log.LogInformation(reader.ReadToEnd());
  }
}