Implementing azure functions with storage queue

Study Case + Tutorial

Shahar Shokrani
2 min readJun 23, 2021

The Problem:

We were trying to implement a solution that runs periodically (once in a week) and calls to an external api with 1,500,000 items metadata “{{domain}}/items”, then trying to figure out for each of the items if it needs to be updated or inserted to the database according by some arbitrary logic.

The catch about working with an item’s metadata, is that we need to refer the external api again to retrieve the item’s full details by “{{domain}}/items/{{id}}”, before updating the DB.

original sequence diagram

The problem was that this process was taking a lot of time (~4 hours, even after using parallel methodology), and azure function premium does not guarantee execution over 60 minutes.

The solution:

The solution was to separate the one long-lived azure function into two azure functions. The first one only refer to the external api and getting the item’s meta-data, queueing the items using the azure storage queue:

Enqueue Azure function

the second azure function is bind to the storage queue, and it dequeue one item at a time and only then refer to the external api for the item’s details:

Dequeue Azure function

Now, this solution meets the Azure function’s timeout requirement.

The code for EnqueueItemsMetaData:

[FunctionName("EnqueueItemsMetaData")]
public async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequest req,
[Queue("myqueue")] IAsyncCollector<ItemMetaData> queue)
{
List<ItemMetaData> itemsMetaData = await _exteranlApi.GetItemsMetaData();
foreach (ItemMetaData itemMetaData in itemsMetaData)
{
await queue.AddAsync(itemMetaData);
}
return new OkObjectResult("Done Enqueue items meta data");
}

The code for DequeueItemMetaData:

[FunctionName("DequeueItemMetaData")]
public async Task Run([QueueTrigger("myqueue", Connection = "AzureWebJobsStorage")] ItemMetaData item)
{
Item item = await _repository.GetById(item.Id);

VendorItem vendorItem = await this._externalApi.GetById(item.Id);
if (IsShouldBeUpdated(item, vendorItem))
return;
this._repository.Upsert(item);
}

Pay attention that `AzureWebJobsStorage` is the key value for the Azure storage connection string, for local you can use `UseDevelopmentStorage=true` (How to configure Azure web storage connection strings in production here):

{
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true"
...
}
}

--

--