Preventing Dog-Pile with an In-Memory Adaptive Queue in .NET
Learn how to use TPL Dataflow’s ActionBlock
to handle concurrency, avoid dog-pile scenarios, and dynamically reconfigure queue parameters at runtime with IOptionsMonitor
.
Github: https://github.com/ShaharShokrani/DynamicActionBlock
1. The Dog-Pile Problem
A dog-pile (or cache stampede) occurs when multiple requests simultaneously try to fetch or generate the same resource — often overwhelming a service. Even outside caching, high concurrency can saturate resources if many tasks run at once.
In .NET applications, if you naïvely spin up unlimited parallel tasks — like reprocessing missed messages or recalculating expensive data — you risk:
- Resource exhaustion (threads, memory).
- Unnecessary duplication (everyone tries to do the same work).
- Unbounded queue that eventually saturates.
2. ActionBlock
: Your Concurrency Gate
ActionBlock
(fromSystem.Threading.Tasks.Dataflow
) provides a built-in concurrency limit, acting as a channel for incoming jobs and a task orchestrator to process them.- You can configure an
ActionBlock
to run N tasks in parallel (MaxDegreeOfParallelism
) and set an upper capacity to avoid indefinite queue growth.
3. The InMemoryAdaptiveQueue
Class
Below is a simplified example of how we create a dynamic in-memory queue that:
- Listens to an
IOptionsMonitor
forAdaptiveQueueOptions
changes (e.g., concurrency level). - Recreates its internal
ActionBlock
with updated bounded capacity and max concurrency whenever the config changes. - Posts or sends jobs to the
ActionBlock
—any job that doesn’t fit (because the queue is full) is rejected.
public class InMemoryAdaptiveQueue<TJob> : IAdaptiveQueue<TJob> where TJob : IJob
{
private readonly IOptionsMonitor<AdaptiveQueueOptions> _optionsMonitor;
private readonly ILogger<InMemoryAdaptiveQueue<TJob>> _logger;
private volatile ActionBlock<TJob> _actionBlock;
private CancellationToken _cancellationToken;
private bool _disposed;
public InMemoryAdaptiveQueue(IOptionsMonitor<AdaptiveQueueOptions> optionsMonitor,
ILogger<InMemoryAdaptiveQueue<TJob>> logger)
{
_optionsMonitor = optionsMonitor ?? throw new ArgumentNullException(nameof(optionsMonitor));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
// Recreate the ActionBlock any time the options change:
_optionsMonitor.OnChange(OnOptionsChanged);
}
public Task StartAsync(CancellationToken cancellationToken)
{
_cancellationToken = cancellationToken;
RecreateActionBlock();
_logger.LogInformation("Queue started with concurrency={0}, capacity={1}",
_optionsMonitor.CurrentValue.MaxDegreeOfParallelism,
_optionsMonitor.CurrentValue.BoundedCapacity);
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Completing the queue.");
_actionBlock?.Complete();
return Task.CompletedTask;
}
public bool TryEnqueue(TJob job)
{
// ...
}
public async Task<bool> TryEnqueueAsync(TJob job, CancellationToken ct)
{
// ...
}
public async ValueTask DisposeAsync()
{
if (_disposed) return;
_disposed = true;
_logger.LogInformation("Disposing queue...");
await StopAsync(CancellationToken.None);
}
private void OnOptionsChanged(AdaptiveQueueOptions newOptions)
{
_logger.LogInformation("Options changed, recreating ActionBlock...");
RecreateActionBlock();
}
private void RecreateActionBlock()
{
var options = _optionsMonitor.CurrentValue;
var dataflowOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = options.MaxDegreeOfParallelism,
BoundedCapacity = options.BoundedCapacity,
CancellationToken = _cancellationToken
};
var newBlock = new ActionBlock<TJob>(async job =>
{
try
{
job.CancellationToken.ThrowIfCancellationRequested();
_logger.LogDebug("Processing job {JobId}", job.Id);
await job.ExecuteAsync(job.CancellationToken);
job.TaskCompletionSource?.SetResult(true);
}
catch (OperationCanceledException)
{
job.TaskCompletionSource?.SetCanceled();
_logger.LogWarning("Job canceled {JobId}", job.Id);
}
catch (Exception ex)
{
job.TaskCompletionSource?.SetException(ex);
_logger.LogError(ex, "Error processing {JobId}", job.Id);
}
}, dataflowOptions);
var oldBlock = _actionBlock;
_actionBlock = newBlock;
oldBlock?.Complete();
}
}
Key Points
- BoundedCapacity: Prevents the queue from endlessly growing. Once capacity is reached, new calls to
TryEnqueue
orSendAsync
will fail (or block). - MaxDegreeOfParallelism: Caps how many items run in parallel. This helps avoid dog-piles (or at least mitigates the effect).
- Options Changes: If your config changes concurrency from 2 to 4 (for example),
OnOptionsChanged
re-creates the block on the fly. New jobs use the new concurrency level.
4. Handling Dog-Piles
- Limit concurrency: Even if thousands of incoming requests want to do the same heavy-lifting, only N at a time actually run.
- Enforce capacity: Once the queue is at capacity, you can gracefully reject new tasks or push them to an alternative strategy (e.g., rate-limiting or exponential back-off).
- Graceful shutdown:
StopAsync
plusComplete
ensure the queue finishes in-flight tasks but doesn’t accept new ones—helpful when your app or service is re-deployed.
5. Conclusion
Using TPL Dataflow’s ActionBlock
is a powerful yet streamlined way to:
- Throttle concurrency and avoid dog-pile scenarios.
- Dynamically reconfigure concurrency or capacity at runtime (via
IOptionsMonitor
). - Provide a Channel-like flow with a built-in Task Orchestrator.
If you’re looking to handle bursts of traffic or CPU-intensive jobs without meltdown, InMemoryAdaptiveQueue
and ActionBlock
may be exactly what you need.
buy me a coffee: https://buymeacoffee.com/shaharshokrani