Preventing Dog-Pile with an In-Memory Adaptive Queue in .NET

Shahar Shokrani
3 min readDec 30, 2024

--

Learn how to use TPL Dataflow’s ActionBlock to handle concurrency, avoid dog-pile scenarios, and dynamically reconfigure queue parameters at runtime with IOptionsMonitor.

Github: https://github.com/ShaharShokrani/DynamicActionBlock

1. The Dog-Pile Problem

A dog-pile (or cache stampede) occurs when multiple requests simultaneously try to fetch or generate the same resource — often overwhelming a service. Even outside caching, high concurrency can saturate resources if many tasks run at once.

In .NET applications, if you naïvely spin up unlimited parallel tasks — like reprocessing missed messages or recalculating expensive data — you risk:

  • Resource exhaustion (threads, memory).
  • Unnecessary duplication (everyone tries to do the same work).
  • Unbounded queue that eventually saturates.

2. ActionBlock: Your Concurrency Gate

  • ActionBlock (from System.Threading.Tasks.Dataflow) provides a built-in concurrency limit, acting as a channel for incoming jobs and a task orchestrator to process them.
  • You can configure an ActionBlock to run N tasks in parallel (MaxDegreeOfParallelism) and set an upper capacity to avoid indefinite queue growth.

3. The InMemoryAdaptiveQueue Class

Below is a simplified example of how we create a dynamic in-memory queue that:

  1. Listens to an IOptionsMonitor for AdaptiveQueueOptions changes (e.g., concurrency level).
  2. Recreates its internal ActionBlock with updated bounded capacity and max concurrency whenever the config changes.
  3. Posts or sends jobs to the ActionBlock—any job that doesn’t fit (because the queue is full) is rejected.
public class InMemoryAdaptiveQueue<TJob> : IAdaptiveQueue<TJob> where TJob : IJob
{
private readonly IOptionsMonitor<AdaptiveQueueOptions> _optionsMonitor;
private readonly ILogger<InMemoryAdaptiveQueue<TJob>> _logger;

private volatile ActionBlock<TJob> _actionBlock;
private CancellationToken _cancellationToken;
private bool _disposed;

public InMemoryAdaptiveQueue(IOptionsMonitor<AdaptiveQueueOptions> optionsMonitor,
ILogger<InMemoryAdaptiveQueue<TJob>> logger)
{
_optionsMonitor = optionsMonitor ?? throw new ArgumentNullException(nameof(optionsMonitor));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));

// Recreate the ActionBlock any time the options change:
_optionsMonitor.OnChange(OnOptionsChanged);
}

public Task StartAsync(CancellationToken cancellationToken)
{
_cancellationToken = cancellationToken;
RecreateActionBlock();
_logger.LogInformation("Queue started with concurrency={0}, capacity={1}",
_optionsMonitor.CurrentValue.MaxDegreeOfParallelism,
_optionsMonitor.CurrentValue.BoundedCapacity);
return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Completing the queue.");
_actionBlock?.Complete();
return Task.CompletedTask;
}

public bool TryEnqueue(TJob job)
{
// ...
}

public async Task<bool> TryEnqueueAsync(TJob job, CancellationToken ct)
{
// ...
}

public async ValueTask DisposeAsync()
{
if (_disposed) return;
_disposed = true;

_logger.LogInformation("Disposing queue...");
await StopAsync(CancellationToken.None);
}

private void OnOptionsChanged(AdaptiveQueueOptions newOptions)
{
_logger.LogInformation("Options changed, recreating ActionBlock...");
RecreateActionBlock();
}

private void RecreateActionBlock()
{
var options = _optionsMonitor.CurrentValue;
var dataflowOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = options.MaxDegreeOfParallelism,
BoundedCapacity = options.BoundedCapacity,
CancellationToken = _cancellationToken
};

var newBlock = new ActionBlock<TJob>(async job =>
{
try
{
job.CancellationToken.ThrowIfCancellationRequested();
_logger.LogDebug("Processing job {JobId}", job.Id);
await job.ExecuteAsync(job.CancellationToken);
job.TaskCompletionSource?.SetResult(true);
}
catch (OperationCanceledException)
{
job.TaskCompletionSource?.SetCanceled();
_logger.LogWarning("Job canceled {JobId}", job.Id);
}
catch (Exception ex)
{
job.TaskCompletionSource?.SetException(ex);
_logger.LogError(ex, "Error processing {JobId}", job.Id);
}
}, dataflowOptions);

var oldBlock = _actionBlock;
_actionBlock = newBlock;
oldBlock?.Complete();
}
}

Key Points

  • BoundedCapacity: Prevents the queue from endlessly growing. Once capacity is reached, new calls to TryEnqueue or SendAsync will fail (or block).
  • MaxDegreeOfParallelism: Caps how many items run in parallel. This helps avoid dog-piles (or at least mitigates the effect).
  • Options Changes: If your config changes concurrency from 2 to 4 (for example), OnOptionsChanged re-creates the block on the fly. New jobs use the new concurrency level.

4. Handling Dog-Piles

  1. Limit concurrency: Even if thousands of incoming requests want to do the same heavy-lifting, only N at a time actually run.
  2. Enforce capacity: Once the queue is at capacity, you can gracefully reject new tasks or push them to an alternative strategy (e.g., rate-limiting or exponential back-off).
  3. Graceful shutdown: StopAsync plus Complete ensure the queue finishes in-flight tasks but doesn’t accept new ones—helpful when your app or service is re-deployed.

5. Conclusion

Using TPL Dataflow’s ActionBlock is a powerful yet streamlined way to:

  • Throttle concurrency and avoid dog-pile scenarios.
  • Dynamically reconfigure concurrency or capacity at runtime (via IOptionsMonitor).
  • Provide a Channel-like flow with a built-in Task Orchestrator.

If you’re looking to handle bursts of traffic or CPU-intensive jobs without meltdown, InMemoryAdaptiveQueue and ActionBlock may be exactly what you need.

buy me a coffee: https://buymeacoffee.com/shaharshokrani

--

--

No responses yet