When working with APIs that return paginated data, you often need to fetch all pages of results. This guide provides a general approach to fetching all paginated data from an API.
For scenarios with paginated external API calls, here are the best strategies:
Task.WhenAll is appropriate but with throttling to avoid overwhelming the external API:
public class DataService
{
private readonly HttpClient _httpClient;
private readonly SemaphoreSlim _semaphore;
public DataService(HttpClient httpClient)
{
_httpClient = httpClient;
_semaphore = new SemaphoreSlim(5); // Limit to 5 concurrent requests
}
public async Task<List<DataItem>> GetAllDataAsync()
{
// First call to get total count
var firstPage = await GetPageAsync(1);
var totalCount = firstPage.TotalCount;
var pageSize = firstPage.Data.Count; // Infer page size from first response
var totalPages = (int)Math.Ceiling((double)totalCount / pageSize);
var allData = new ConcurrentBag<DataItem>(firstPage.Data);
if (totalPages > 1)
{
// Create tasks for remaining pages
var tasks = Enumerable.Range(2, totalPages - 1)
.Select(async page =>
{
await _semaphore.WaitAsync();
try
{
var pageData = await GetPageAsync(page);
foreach (var item in pageData.Data)
{
allData.Add(item);
}
}
finally
{
_semaphore.Release();
}
});
await Task.WhenAll(tasks);
}
return allData.ToList();
}
private async Task<PagedResponse> GetPageAsync(int page)
{
var response = await _httpClient.GetAsync($"/api/data?page={page}");
response.EnsureSuccessStatusCode();
return await response.Content.ReadFromJsonAsync<PagedResponse>();
}
}
public record PagedResponse(List<DataItem> Data, int TotalCount);
public record DataItem(int[] ClientIds, string EmailOrDomain)
{
// Custom equality for deduplication
public virtual bool Equals(DataItem? other)
{
if (other == null) return false;
return ClientIds.SequenceEqual(other.ClientIds) &&
EmailOrDomain == other.EmailOrDomain;
}
public override int GetHashCode()
{
return HashCode.Combine(
ClientIds.Aggregate(0, (acc, id) => acc ^ id.GetHashCode()),
EmailOrDomain
);
}
}
Option 1: HashSet with custom equality
var uniqueData = new HashSet<DataItem>(allData);
return uniqueData.ToList();
Option 2: LINQ Distinct (if equality is properly implemented)
return allData.Distinct().ToList();
Option 3: Dictionary-based deduplication
var uniqueData = allData
.GroupBy(item => new {
ClientIdsHash = string.Join(",", item.ClientIds.OrderBy(x => x)),
item.EmailOrDomain
})
.Select(g => g.First())
.ToList();
1. Sequential with delay (if API has strict rate limits):
for (int page = 2; page <= totalPages; page++)
{
var pageData = await GetPageAsync(page);
allData.AddRange(pageData.Data);
await Task.Delay(100); // 100ms delay between requests
}
2. Batch processing:
var batches = Enumerable.Range(2, totalPages - 1)
.Chunk(5); // Process 5 pages at a time
foreach (var batch in batches)
{
var batchTasks = batch.Select(GetPageAsync);
var results = await Task.WhenAll(batchTasks);
foreach (var result in results)
allData.AddRange(result.Data);
}
Since you already have StandardResiliencyHandler, you have built-in retry logic, circuit breaker, and timeout handling:
public class DataService
{
private readonly HttpClient _httpClient; // Already configured with StandardResiliencyHandler
private readonly SemaphoreSlim _semaphore;
private readonly ILogger<DataService> _logger;
public DataService(HttpClient httpClient, ILogger<DataService> logger)
{
_httpClient = httpClient;
_logger = logger;
// Reduce concurrency since resilience handler will retry failed requests
_semaphore = new SemaphoreSlim(3); // Lower concurrency to avoid overwhelming
}
public async Task<(List<DataItem> Data, List<string> FailedPages)> GetAllDataWithPartialSuccessAsync()
{
var firstPage = await GetPageAsync(1);
var totalCount = firstPage.TotalCount;
var pageSize = firstPage.Data.Count;
var totalPages = (int)Math.Ceiling((double)totalCount / pageSize);
var allData = new ConcurrentBag<DataItem>(firstPage.Data);
var failedPages = new ConcurrentBag<string>();
if (totalPages > 1)
{
var tasks = Enumerable.Range(2, totalPages - 1)
.Select(async page =>
{
await _semaphore.WaitAsync();
try
{
var pageData = await GetPageAsync(page);
foreach (var item in pageData.Data)
{
allData.Add(item);
}
}
catch (Exception ex)
{
// StandardResiliencyHandler already exhausted all retries
failedPages.Add($"Page {page}: {ex.Message}");
_logger.LogError(ex, "Page {Page} failed after resilience handling", page);
}
finally
{
_semaphore.Release();
}
});
await Task.WhenAll(tasks); // Won't throw since we handle exceptions
}
return (allData.ToList(), failedPages.ToList());
}
private async Task<PagedResponse> GetPageAsync(int page)
{
var response = await _httpClient.GetAsync($"/api/data?page={page}");
response.EnsureSuccessStatusCode();
return await response.Content.ReadFromJsonAsync<PagedResponse>();
}
}
When using Task.WhenAll, if any task fails, it throws an AggregateException containing all exceptions from failed tasks:
try
{
await Task.WhenAll(tasks);
}
catch (AggregateException ex)
{
// Handle multiple exceptions
foreach (var innerException in ex.InnerExceptions)
{
_logger.LogError(innerException, "Failed to fetch page data");
}
throw; // Re-throw or handle appropriately
}
ConcurrentBag is a thread-safe collection designed for scenarios where multiple threads need to add items simultaneously:
- Thread-safe operations: Multiple threads can add items without causing race conditions
- Optimized for producer scenarios: Specifically designed for multiple threads adding items
- Perfect for concurrent API calls: Each task can safely call
allData.Add(item)
Task.WhenAll with semaphore throttling is usually the best approach as it:
- Maximizes concurrency while respecting API limits
- Ensures no duplicate data through proper deduplication
- Works well with StandardResiliencyHandler
- Provides partial success handling
public class GetAllDataEndpoint : Endpoint<EmptyRequest, List<DataItem>>
{
private readonly DataService _dataService;
public GetAllDataEndpoint(DataService dataService)
{
_dataService = dataService;
}
public override void Configure()
{
Get("/api/all-data");
AllowAnonymous();
}
public override async Task HandleAsync(EmptyRequest req, CancellationToken ct)
{
var data = await _dataService.GetAllDataAsync();
await SendOkAsync(data, ct);
}
}
public class WhitelistRefreshRequest
{
[QueryParam]
public bool Sync { get; set; } = true; // Default to sync since it's simple
}
public class WhitelistRefreshResponse
{
public string Message { get; set; } = string.Empty;
public int ItemCount { get; set; }
public List<string>? FailedPages { get; set; }
}
public class WhitelistRefreshEndpoint : Endpoint<WhitelistRefreshRequest, WhitelistRefreshResponse>
{
private readonly WhitelistDataService _dataService;
public WhitelistRefreshEndpoint(WhitelistDataService dataService)
{
_dataService = dataService;
}
public override void Configure()
{
Post("/api/whitelist/refresh");
Policies("AdminOnly");
Options(x => x.RequireRateLimiting("OncePerDay"));
Options(x => x.RequestTimeout(TimeSpan.FromMinutes(5)));
Summary(s => {
s.Summary = "Refresh whitelist data from external API";
s.Description = "Fetches all paginated whitelist data from external API and processes it.";
});
}
public override async Task HandleAsync(WhitelistRefreshRequest req, CancellationToken ct)
{
Logger.LogInformation("Starting whitelist refresh");
try
{
// Fetch all data with partial success handling
var (data, failedPages) = await _dataService.GetAllDataWithPartialSuccessAsync();
Logger.LogInformation("Fetched {ItemCount} whitelist items, {FailedPageCount} pages failed",
data.Count, failedPages.Count);
// Process the whitelist data (save to DB, update cache, etc.)
await ProcessWhitelistData(data, ct);
// Determine response message
var message = failedPages.Any()
? $"Whitelist refresh completed with {failedPages.Count} failed pages"
: "Whitelist refresh completed successfully";
Logger.LogInformation("Whitelist refresh completed successfully");
await SendOkAsync(new WhitelistRefreshResponse
{
Message = message,
ItemCount = data.Count,
FailedPages = failedPages.Any() ? failedPages : null
}, ct);
}
catch (Exception ex)
{
Logger.LogError(ex, "Whitelist refresh failed");
ThrowError("Whitelist refresh failed. Check logs for details.");
}
}
private async Task ProcessWhitelistData(List<DataItem> data, CancellationToken ct)
{
Logger.LogDebug("Processing {ItemCount} whitelist items", data.Count);
// Remove duplicates using the equality you defined earlier
var uniqueData = data.Distinct().ToList();
Logger.LogDebug("After deduplication: {UniqueItemCount} unique items", uniqueData.Count);
// Example: Save to database
// await _dbContext.BulkInsertAsync(uniqueData, ct);
// Example: Update cache
// await _cache.SetAsync("whitelist-data", uniqueData, TimeSpan.FromDays(1), ct);
await Task.CompletedTask; // Placeholder for actual processing
}
}
// Program.cs
var builder = WebApplication.CreateBuilder(args);
// Add rate limiting
builder.Services.AddRateLimiter(options =>
{
options.AddFixedWindowLimiter("OncePerDay", limiterOptions =>
{
limiterOptions.PermitLimit = 1;
limiterOptions.Window = TimeSpan.FromDays(1);
limiterOptions.QueueLimit = 0;
});
options.RejectionStatusCode = 429;
});
// Add FastEndpoints
builder.Services.AddFastEndpoints();
// Add your data service
builder.Services.AddHttpClient<WhitelistDataService>()
.AddStandardResilienceHandler();
var app = builder.Build();
app.UseRateLimiter();
app.UseFastEndpoints();
app.Run();
public async Task<List<DataItem>> GetAllDataAsync()
{
try
{
var firstPage = await GetPageAsync(1);
var totalCount = firstPage.TotalCount;
var pageSize = firstPage.Data.Count;
var totalPages = (int)Math.Ceiling((double)totalCount / pageSize);
var allData = new ConcurrentBag<DataItem>(firstPage.Data);
if (totalPages > 1)
{
var tasks = Enumerable.Range(2, totalPages - 1)
.Select(async page =>
{
await _semaphore.WaitAsync();
try
{
var pageData = await GetPageAsync(page);
foreach (var item in pageData.Data)
{
allData.Add(item);
}
}
finally
{
_semaphore.Release();
}
});
await Task.WhenAll(tasks);
}
return allData.ToList();
}
catch (AggregateException ex)
{
// Handle multiple exceptions
foreach (var innerException in ex.InnerExceptions)
{
_logger.LogError(innerException, "Failed to fetch page data");
}
throw; // Re-throw or handle appropriately
}
catch (Exception ex)
{
_logger.LogError(ex, "Error fetching all data");
throw;
}
}
public async Task<(List<DataItem> Data, List<Exception> Errors)> GetAllDataWithErrorsAsync()
{
var firstPage = await GetPageAsync(1);
var totalCount = firstPage.TotalCount;
var pageSize = firstPage.Data.Count;
var totalPages = (int)Math.Ceiling((double)totalCount / pageSize);
var allData = new ConcurrentBag<DataItem>(firstPage.Data);
var errors = new ConcurrentBag<Exception>();
if (totalPages > 1)
{
var tasks = Enumerable.Range(2, totalPages - 1)
.Select(async page =>
{
await _semaphore.WaitAsync();
try
{
var pageData = await GetPageAsync(page);
foreach (var item in pageData.Data)
{
allData.Add(item);
}
}
catch (Exception ex)
{
errors.Add(new Exception($"Failed to fetch page {page}", ex));
_logger.LogError(ex, "Failed to fetch page {Page}", page);
}
finally
{
_semaphore.Release();
}
});
// This won't throw AggregateException since we handle exceptions inside tasks
await Task.WhenAll(tasks);
}
return (allData.ToList(), errors.ToList());
}
public class GetAllDataEndpoint : Endpoint<EmptyRequest, GetAllDataResponse>
{
public override async Task HandleAsync(EmptyRequest req, CancellationToken ct)
{
try
{
var result = await _dataService.GetAllDataWithErrorsAsync();
if (result.Errors.Any())
{
await SendAsync(new GetAllDataResponse
{
Data = result.Data,
Errors = result.Errors.Select(e => e.Message).ToList(),
PartialSuccess = true
}, 206); // Partial Content
}
else
{
await SendOkAsync(new GetAllDataResponse { Data = result.Data });
}
}
catch (AggregateException ex)
{
await SendErrorsAsync(500);
Logger.LogError(ex, "Failed to fetch data");
}
}
}
ConcurrentBag is a thread-safe collection designed for scenarios where multiple threads need to add items simultaneously:
- Thread-safe operations: Multiple threads can add items without causing race conditions or data corruption
- Optimized for producer scenarios: Specifically designed for cases where you have multiple threads adding items
- Perfect for concurrent API calls: Each task can safely call
allData.Add(item)
var allData = new ConcurrentBag<DataItem>(firstPage.Data);
// Multiple tasks running concurrently
var tasks = Enumerable.Range(2, totalPages - 1)
.Select(async page =>
{
var pageData = await GetPageAsync(page);
foreach (var item in pageData.Data)
{
allData.Add(item); // Thread-safe addition
}
});
Option 1: Regular List with lock
var allData = new List<DataItem>();
var lockObject = new object();
// In each task:
lock (lockObject)
{
allData.AddRange(pageData.Data);
}
Option 2: ConcurrentQueue
var allData = new ConcurrentQueue<DataItem>();
// Then: allData.Enqueue(item)
Option 3: Collect results differently
var tasks = pages.Select(async page =>
{
var pageData = await GetPageAsync(page);
return pageData.Data; // Return the data instead
});
var results = await Task.WhenAll(tasks);
var allData = results.SelectMany(x => x).ToList();
The last approach is often cleaner since each task returns its own data, then you combine them all at the end.
- No Proper Error Handling: Exceptions get swallowed unless you explicitly handle them
- Memory Leaks: Tasks can hold references and prevent garbage collection
- App Shutdown Issues: Background tasks may not complete gracefully during shutdown
- Thread Pool Starvation: In high-load scenarios, it can exhaust the thread pool
- No Tracking: You lose visibility into what's running and when it completes
public class WhitelistRefreshService : BackgroundService
{
private readonly Channel<WhitelistRefreshJob> _channel;
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger<WhitelistRefreshService> _logger;
public WhitelistRefreshService(IServiceScopeFactory scopeFactory, ILogger<WhitelistRefreshService> logger)
{
_scopeFactory = scopeFactory;
_logger = logger;
var options = new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true,
SingleWriter = false
};
_channel = Channel.CreateBounded<WhitelistRefreshJob>(options);
}
public async Task<string> QueueRefreshAsync()
{
var job = new WhitelistRefreshJob { TrackingId = Guid.NewGuid().ToString() };
await _channel.Writer.WriteAsync(job);
return job.TrackingId;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach (var job in _channel.Reader.ReadAllAsync(stoppingToken))
{
using var scope = _scopeFactory.CreateScope();
var handler = scope.ServiceProvider.GetRequiredService<RefreshWhitelistHandler>();
try
{
_logger.LogInformation("Processing whitelist refresh job {TrackingId}", job.TrackingId);
var result = await handler.ExecuteAsync(new RefreshWhitelistCommand(), stoppingToken);
_logger.LogInformation("Completed whitelist refresh {TrackingId}. Items: {Count}",
job.TrackingId, result.ItemCount);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed whitelist refresh {TrackingId}", job.TrackingId);
}
}
}
}
public record WhitelistRefreshJob
{
public string TrackingId { get; init; } = string.Empty;
}
For endpoints called once per day, a simple synchronous approach is often best:
- Simple: No background services, channels, or Task.Run complexity
- Reliable: Proper error handling and response to caller
- Traceable: Easy to debug and monitor
- Rate Limited: Still protected by once-per-day limit
- Timeout Protected: Can set appropriate request timeout
- Frequency: Once per day means you're not blocking other requests
- Duration: Even if it takes 2-3 minutes, that's acceptable for this frequency
- Simplicity: Much easier to reason about, test, and debug
- Reliability: Direct feedback to caller about success/failure
- Rate Limiting: Already prevents abuse
- Use Task.WhenAll with semaphore throttling for concurrent API calls
- ConcurrentBag is perfect for thread-safe collection operations
- StandardResiliencyHandler provides built-in retry and circuit breaker logic
- Partial success strategies allow you to return data even when some pages fail
- For low-frequency operations (once per day), synchronous processing is often the best choice
- Rate limiting protects your endpoints from abuse
- Proper error handling and logging are crucial for production systems