Created
April 14, 2025 12:07
-
-
Save xsahil03x/9a099ef228e297e5d81966e263870279 to your computer and use it in GitHub Desktop.
Attachment Uploader
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import 'dart:async'; | |
import 'dart:collection'; | |
import 'package:collection/collection.dart'; | |
import 'package:dio/dio.dart'; | |
import 'package:equatable/equatable.dart'; | |
import 'package:logging/logging.dart'; | |
import 'package:rate_limiter/rate_limiter.dart'; | |
import 'package:stream_chat/src/client/channel.dart'; | |
import 'package:stream_chat/src/client/retry_policy.dart'; | |
import 'package:stream_chat/src/core/api/responses.dart'; | |
import 'package:stream_chat/src/core/error/stream_chat_error.dart'; | |
import 'package:stream_chat/src/core/models/attachment.dart'; | |
/// Configuration for attachment uploads | |
class AttachmentUploadConfig { | |
const AttachmentUploadConfig({ | |
this.maxConcurrentUploads = 3, | |
this.progressUpdateInterval = const Duration(milliseconds: 500), | |
required this.retryPolicy, | |
}) : assert(maxConcurrentUploads > 0, 'maxConcurrentUploads must be > 0'); | |
/// How often to update progress during uploads. | |
/// | |
/// Defaults to 500 milliseconds. | |
final Duration progressUpdateInterval; | |
/// Maximum number of concurrent uploads allowed. | |
/// | |
/// Defaults to 3. | |
final int maxConcurrentUploads; | |
/// Policy for retrying failed uploads. | |
/// | |
/// This is used to determine if a failed upload should be retried and when. | |
final RetryPolicy retryPolicy; | |
} | |
typedef OnAttachmentUploadStarted = void Function( | |
String batchId, | |
Attachment attachment, | |
); | |
typedef OnAttachmentUploadProgress = void Function( | |
String batchId, | |
Attachment attachment, | |
int sent, | |
int total, | |
); | |
typedef OnAttachmentUploadSuccess = void Function( | |
String batchId, | |
Attachment attachment, | |
SendAttachmentResponse response, | |
); | |
typedef OnAttachmentUploadFailed = void Function( | |
String batchId, | |
Attachment attachment, | |
Object? error, | |
StackTrace stackTrace, | |
); | |
typedef OnBatchUploadStarted = void Function( | |
String batchId, | |
List<Attachment> attachments, | |
); | |
typedef OnBatchUploadSuccess = void Function( | |
String batchId, | |
List<Attachment> attachments, | |
); | |
typedef OnBatchUploadFailed = void Function( | |
String batchId, | |
List<Attachment> attachments, | |
Object? error, | |
StackTrace stackTrace, | |
); | |
typedef OnBatchUploadCompleted = void Function( | |
String batchId, | |
List<Attachment> attachments, | |
); | |
class AttachmentUploadManager { | |
AttachmentUploadManager({ | |
required this.channel, | |
required this.config, | |
this.onAttachmentUploadStarted, | |
this.onAttachmentUploadProgress, | |
this.onAttachmentUploadSuccess, | |
this.onAttachmentUploadFailed, | |
this.onBatchUploadStarted, | |
this.onBatchUploadSuccess, | |
this.onBatchUploadFailed, | |
this.logger, | |
}); | |
final Channel channel; | |
/// Configuration for the attachment uploader. | |
final AttachmentUploadConfig config; | |
final OnAttachmentUploadStarted? onAttachmentUploadStarted; | |
final OnAttachmentUploadProgress? onAttachmentUploadProgress; | |
final OnAttachmentUploadSuccess? onAttachmentUploadSuccess; | |
final OnAttachmentUploadFailed? onAttachmentUploadFailed; | |
final OnBatchUploadStarted? onBatchUploadStarted; | |
final OnBatchUploadSuccess? onBatchUploadSuccess; | |
final OnBatchUploadFailed? onBatchUploadFailed; | |
/// The logger associated to this queue. | |
final Logger? logger; | |
final _batches = <String /* BatchId */, _UploadBatch>{}; | |
final _pendingTasks = Queue<_UploadTask>(); | |
/// Uploads an attachment. | |
void enqueue({ | |
required String batchId, | |
required List<Attachment> attachments, | |
}) { | |
if (_batches.containsKey(batchId)) { | |
logger?.info('Batch $batchId already exists, skipping upload'); | |
return; | |
} | |
if (attachments.isEmpty) { | |
logger?.warning('No attachments to upload in batch $batchId'); | |
return; | |
} | |
final uploadableAttachments = attachments.where((attachment) { | |
if (attachment.file == null) { | |
logger?.warning('Skipping attachment without file: ${attachment.id}'); | |
return false; | |
} | |
return true; | |
}); | |
if (uploadableAttachments.isEmpty) { | |
logger?.warning('No valid attachments to upload in batch $batchId'); | |
return; | |
} | |
// Create a new batch for the attachments. | |
_batches[batchId] = _UploadBatch( | |
id: batchId, | |
attachments: [...uploadableAttachments], | |
); | |
return _processBatch(batchId); | |
} | |
void _processBatch(String batchId) { | |
final batch = _batches[batchId]; | |
if (batch == null) { | |
logger?.warning('Cannot process batch $batchId: batch not found'); | |
return; | |
} | |
logger?.info( | |
'Processing batch $batchId with ${batch.attachments.length} attachments', | |
); | |
final attachments = batch.attachments; | |
final failedAttachments = batch.failedAttachments; | |
final uploadedAttachments = batch.uploadedAttachments; | |
for (final attachment in attachments) { | |
// Skip if already uploaded or failed. | |
if (failedAttachments.any((it) => it.id == attachment.id)) continue; | |
if (uploadedAttachments.any((it) => it.id == attachment.id)) continue; | |
final task = _UploadTask( | |
batchId: batchId, | |
attachment: attachment, | |
cancelToken: CancelToken(), | |
); | |
// Skip if already in the queue. | |
if (_pendingTasks.contains(task)) continue; | |
_pendingTasks.add(task); | |
} | |
// Notify that the batch upload is starting. | |
onBatchUploadStarted?.call(batchId, attachments); | |
// Start processing the queue | |
_processQueue(); | |
} | |
bool _isProcessingQueue = false; | |
Future<void> _processQueue() async { | |
if (_isProcessingQueue) return; | |
_isProcessingQueue = true; | |
logger?.info('Started processing upload queue'); | |
if (_pendingTasks.isEmpty) { | |
logger?.info('No pending tasks to process'); | |
_isProcessingQueue = false; | |
return; | |
} | |
while (_pendingTasks.isNotEmpty) { | |
final task = _pendingTasks.first; | |
await _processTask(task); | |
_pendingTasks.removeFirst(); | |
} | |
logger?.info('Finished processing upload queue'); | |
_isProcessingQueue = false; | |
} | |
Future<void> _processTask(_UploadTask task) async { | |
final batchId = task.batchId; | |
final attachment = task.attachment; | |
logger?.info('Uploading attachment ${attachment.id} in batch $batchId'); | |
onAttachmentUploadStarted?.call(batchId, attachment); | |
final attachmentFile = attachment.file; | |
if (attachmentFile == null) { | |
logger?.warning('Failed to upload attachment ${attachment.id}: no file'); | |
onAttachmentUploadFailed?.call( | |
batchId, | |
attachment, | |
const StreamChatError('Attachment has no file to upload'), | |
StackTrace.current, | |
); | |
return; | |
} | |
try { | |
final response = await backOff( | |
() => switch (attachment.type) { | |
AttachmentType.image => channel.sendImage, | |
_ => channel.sendFile, | |
}( | |
attachmentFile, | |
cancelToken: task.cancelToken, | |
extraData: attachment.extraData, | |
onSendProgress: (sent, total) { | |
onAttachmentUploadProgress?.call(batchId, attachment, sent, total); | |
}, | |
), | |
delayFactor: config.retryPolicy.delayFactor, | |
randomizationFactor: config.retryPolicy.randomizationFactor, | |
maxDelay: config.retryPolicy.maxDelay, | |
maxAttempts: config.retryPolicy.maxRetryAttempts, | |
retryIf: (error, attempt) { | |
if (error is! StreamChatError) return false; | |
return config.retryPolicy.shouldRetry(channel.client, attempt, error); | |
}, | |
); | |
logger?.info('Attachment ${attachment.id} uploaded successfully'); | |
onAttachmentUploadSuccess?.call(batchId, attachment, response); | |
if (_batches[batchId] case final batch?) { | |
final uploaded = [...batch.uploadedAttachments, attachment]; | |
_batches[batchId] = _UploadBatch( | |
id: batchId, | |
attachments: batch.attachments, | |
uploadedAttachments: uploaded, | |
failedAttachments: batch.failedAttachments, | |
); | |
// Check if all attachments for this batch are uploaded successfully. | |
if (batch.attachments.length == uploaded.length) { | |
logger?.info('Batch $batchId uploaded successfully'); | |
onBatchUploadSuccess?.call(batchId, uploaded); | |
_batches.remove(batchId); | |
} | |
} | |
} catch (e, stk) { | |
logger?.severe('Error uploading attachment ${attachment.id}', e, stk); | |
onAttachmentUploadFailed?.call(batchId, attachment, e, stk); | |
if (_batches[batchId] case final batch?) { | |
final failed = [...batch.failedAttachments, attachment]; | |
_batches[batchId] = _UploadBatch( | |
id: batchId, | |
attachments: batch.attachments, | |
uploadedAttachments: batch.uploadedAttachments, | |
failedAttachments: failed, | |
); | |
// Check if all attachments for this batch are failed. | |
if (batch.attachments.length == failed.length) { | |
logger?.info('Batch $batchId failed to upload'); | |
onBatchUploadFailed?.call(batchId, failed, e, stk); | |
_batches.remove(batchId); | |
} | |
} | |
} | |
} | |
/// Cancels the upload of a specific attachment in a batch. | |
/// | |
/// Returns `true` if the attachment was found and cancellation was initiated, | |
/// `false` otherwise. | |
bool cancelAttachment({ | |
required String batchId, | |
required String attachmentId, | |
String? reason, | |
}) { | |
logger?.info('Cancelling attachment $attachmentId in batch $batchId'); | |
final batch = _batches[batchId]; | |
if (batch == null) { | |
logger?.warning('Batch $batchId not found, cannot cancel'); | |
return false; | |
} | |
// Find the attachment in the batch | |
final attachment = batch.attachments.firstWhereOrNull( | |
(it) => it.id == attachmentId, | |
); | |
if (attachment == null) { | |
logger?.warning('Attachment $attachmentId not found, cannot cancel'); | |
return false; | |
} | |
// If already completed or failed, do nothing | |
if (batch.uploadedAttachments.any((it) => it.id == attachmentId)) { | |
logger?.info('Attachment $attachmentId already uploaded, cannot cancel'); | |
return false; | |
} | |
if (batch.failedAttachments.any((it) => it.id == attachmentId)) { | |
logger?.info('Attachment $attachmentId already failed, cannot cancel'); | |
return false; | |
} | |
// Find the task for this attachment | |
final task = _pendingTasks.firstWhereOrNull( | |
(task) => task.batchId == batchId && task.attachment.id == attachmentId, | |
); | |
if (task == null) { | |
logger?.info('Attachment $attachmentId task not found, cannot cancel'); | |
return false; | |
} | |
final cancelToken = task.cancelToken; | |
if (cancelToken.isCancelled) { | |
logger?.info('Attachment $attachmentId already cancelled'); | |
return false; | |
} | |
// Cancel the task | |
cancelToken.cancel(reason); | |
logger?.info('Successfully cancelled attachment $attachmentId'); | |
return true; | |
} | |
/// Retries the upload of a failed attachment. | |
/// | |
/// Returns `true` if the attachment was found and retry was initiated, | |
/// `false` otherwise. | |
bool retryAttachment({ | |
required String batchId, | |
required String attachmentId, | |
}) { | |
logger?.info( | |
'Retrying upload of attachment $attachmentId in batch $batchId', | |
); | |
final batch = _batches[batchId]; | |
if (batch == null) { | |
logger?.warning('Batch $batchId not found: cannot retry'); | |
return false; | |
} | |
// Find the failed attachment | |
final failedAttachment = batch.failedAttachments.firstWhereOrNull((it) { | |
return it.id == attachmentId; | |
}); | |
if (failedAttachment == null) { | |
logger?.warning('Attachment $attachmentId: not found: cannot retry'); | |
return false; | |
} | |
final task = _pendingTasks.firstWhereOrNull( | |
(task) => task.batchId == batchId && task.attachment.id == attachmentId, | |
); | |
if (task != null) { | |
logger?.info('Attachment $attachmentId: already in pending tasks'); | |
return false; | |
} | |
final retryTask = _UploadTask( | |
batchId: batchId, | |
attachment: failedAttachment, | |
cancelToken: CancelToken(), | |
); | |
_pendingTasks.add(retryTask); | |
_processQueue(); | |
logger?.info('Retry initiated for attachment $attachmentId'); | |
return true; | |
} | |
} | |
class _UploadBatch extends Equatable { | |
const _UploadBatch({ | |
required this.id, | |
required this.attachments, | |
this.uploadedAttachments = const [], | |
this.failedAttachments = const [], | |
}); | |
final String id; | |
final List<Attachment> attachments; | |
final List<Attachment> uploadedAttachments; | |
final List<Attachment> failedAttachments; | |
@override | |
List<Object?> get props => [id]; | |
} | |
class _UploadTask extends Equatable { | |
const _UploadTask({ | |
required this.batchId, | |
required this.attachment, | |
required this.cancelToken, | |
}); | |
final String batchId; | |
final Attachment attachment; | |
final CancelToken cancelToken; | |
@override | |
List<Object> get props => [batchId, attachment.id]; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Tests: