Skip to content

Instantly share code, notes, and snippets.

@xsahil03x
Created April 14, 2025 12:07
Show Gist options
  • Save xsahil03x/9a099ef228e297e5d81966e263870279 to your computer and use it in GitHub Desktop.
Save xsahil03x/9a099ef228e297e5d81966e263870279 to your computer and use it in GitHub Desktop.
Attachment Uploader
import 'dart:async';
import 'dart:collection';
import 'package:collection/collection.dart';
import 'package:dio/dio.dart';
import 'package:equatable/equatable.dart';
import 'package:logging/logging.dart';
import 'package:rate_limiter/rate_limiter.dart';
import 'package:stream_chat/src/client/channel.dart';
import 'package:stream_chat/src/client/retry_policy.dart';
import 'package:stream_chat/src/core/api/responses.dart';
import 'package:stream_chat/src/core/error/stream_chat_error.dart';
import 'package:stream_chat/src/core/models/attachment.dart';
/// Configuration for attachment uploads
class AttachmentUploadConfig {
const AttachmentUploadConfig({
this.maxConcurrentUploads = 3,
this.progressUpdateInterval = const Duration(milliseconds: 500),
required this.retryPolicy,
}) : assert(maxConcurrentUploads > 0, 'maxConcurrentUploads must be > 0');
/// How often to update progress during uploads.
///
/// Defaults to 500 milliseconds.
final Duration progressUpdateInterval;
/// Maximum number of concurrent uploads allowed.
///
/// Defaults to 3.
final int maxConcurrentUploads;
/// Policy for retrying failed uploads.
///
/// This is used to determine if a failed upload should be retried and when.
final RetryPolicy retryPolicy;
}
typedef OnAttachmentUploadStarted = void Function(
String batchId,
Attachment attachment,
);
typedef OnAttachmentUploadProgress = void Function(
String batchId,
Attachment attachment,
int sent,
int total,
);
typedef OnAttachmentUploadSuccess = void Function(
String batchId,
Attachment attachment,
SendAttachmentResponse response,
);
typedef OnAttachmentUploadFailed = void Function(
String batchId,
Attachment attachment,
Object? error,
StackTrace stackTrace,
);
typedef OnBatchUploadStarted = void Function(
String batchId,
List<Attachment> attachments,
);
typedef OnBatchUploadSuccess = void Function(
String batchId,
List<Attachment> attachments,
);
typedef OnBatchUploadFailed = void Function(
String batchId,
List<Attachment> attachments,
Object? error,
StackTrace stackTrace,
);
typedef OnBatchUploadCompleted = void Function(
String batchId,
List<Attachment> attachments,
);
class AttachmentUploadManager {
AttachmentUploadManager({
required this.channel,
required this.config,
this.onAttachmentUploadStarted,
this.onAttachmentUploadProgress,
this.onAttachmentUploadSuccess,
this.onAttachmentUploadFailed,
this.onBatchUploadStarted,
this.onBatchUploadSuccess,
this.onBatchUploadFailed,
this.logger,
});
final Channel channel;
/// Configuration for the attachment uploader.
final AttachmentUploadConfig config;
final OnAttachmentUploadStarted? onAttachmentUploadStarted;
final OnAttachmentUploadProgress? onAttachmentUploadProgress;
final OnAttachmentUploadSuccess? onAttachmentUploadSuccess;
final OnAttachmentUploadFailed? onAttachmentUploadFailed;
final OnBatchUploadStarted? onBatchUploadStarted;
final OnBatchUploadSuccess? onBatchUploadSuccess;
final OnBatchUploadFailed? onBatchUploadFailed;
/// The logger associated to this queue.
final Logger? logger;
final _batches = <String /* BatchId */, _UploadBatch>{};
final _pendingTasks = Queue<_UploadTask>();
/// Uploads an attachment.
void enqueue({
required String batchId,
required List<Attachment> attachments,
}) {
if (_batches.containsKey(batchId)) {
logger?.info('Batch $batchId already exists, skipping upload');
return;
}
if (attachments.isEmpty) {
logger?.warning('No attachments to upload in batch $batchId');
return;
}
final uploadableAttachments = attachments.where((attachment) {
if (attachment.file == null) {
logger?.warning('Skipping attachment without file: ${attachment.id}');
return false;
}
return true;
});
if (uploadableAttachments.isEmpty) {
logger?.warning('No valid attachments to upload in batch $batchId');
return;
}
// Create a new batch for the attachments.
_batches[batchId] = _UploadBatch(
id: batchId,
attachments: [...uploadableAttachments],
);
return _processBatch(batchId);
}
void _processBatch(String batchId) {
final batch = _batches[batchId];
if (batch == null) {
logger?.warning('Cannot process batch $batchId: batch not found');
return;
}
logger?.info(
'Processing batch $batchId with ${batch.attachments.length} attachments',
);
final attachments = batch.attachments;
final failedAttachments = batch.failedAttachments;
final uploadedAttachments = batch.uploadedAttachments;
for (final attachment in attachments) {
// Skip if already uploaded or failed.
if (failedAttachments.any((it) => it.id == attachment.id)) continue;
if (uploadedAttachments.any((it) => it.id == attachment.id)) continue;
final task = _UploadTask(
batchId: batchId,
attachment: attachment,
cancelToken: CancelToken(),
);
// Skip if already in the queue.
if (_pendingTasks.contains(task)) continue;
_pendingTasks.add(task);
}
// Notify that the batch upload is starting.
onBatchUploadStarted?.call(batchId, attachments);
// Start processing the queue
_processQueue();
}
bool _isProcessingQueue = false;
Future<void> _processQueue() async {
if (_isProcessingQueue) return;
_isProcessingQueue = true;
logger?.info('Started processing upload queue');
if (_pendingTasks.isEmpty) {
logger?.info('No pending tasks to process');
_isProcessingQueue = false;
return;
}
while (_pendingTasks.isNotEmpty) {
final task = _pendingTasks.first;
await _processTask(task);
_pendingTasks.removeFirst();
}
logger?.info('Finished processing upload queue');
_isProcessingQueue = false;
}
Future<void> _processTask(_UploadTask task) async {
final batchId = task.batchId;
final attachment = task.attachment;
logger?.info('Uploading attachment ${attachment.id} in batch $batchId');
onAttachmentUploadStarted?.call(batchId, attachment);
final attachmentFile = attachment.file;
if (attachmentFile == null) {
logger?.warning('Failed to upload attachment ${attachment.id}: no file');
onAttachmentUploadFailed?.call(
batchId,
attachment,
const StreamChatError('Attachment has no file to upload'),
StackTrace.current,
);
return;
}
try {
final response = await backOff(
() => switch (attachment.type) {
AttachmentType.image => channel.sendImage,
_ => channel.sendFile,
}(
attachmentFile,
cancelToken: task.cancelToken,
extraData: attachment.extraData,
onSendProgress: (sent, total) {
onAttachmentUploadProgress?.call(batchId, attachment, sent, total);
},
),
delayFactor: config.retryPolicy.delayFactor,
randomizationFactor: config.retryPolicy.randomizationFactor,
maxDelay: config.retryPolicy.maxDelay,
maxAttempts: config.retryPolicy.maxRetryAttempts,
retryIf: (error, attempt) {
if (error is! StreamChatError) return false;
return config.retryPolicy.shouldRetry(channel.client, attempt, error);
},
);
logger?.info('Attachment ${attachment.id} uploaded successfully');
onAttachmentUploadSuccess?.call(batchId, attachment, response);
if (_batches[batchId] case final batch?) {
final uploaded = [...batch.uploadedAttachments, attachment];
_batches[batchId] = _UploadBatch(
id: batchId,
attachments: batch.attachments,
uploadedAttachments: uploaded,
failedAttachments: batch.failedAttachments,
);
// Check if all attachments for this batch are uploaded successfully.
if (batch.attachments.length == uploaded.length) {
logger?.info('Batch $batchId uploaded successfully');
onBatchUploadSuccess?.call(batchId, uploaded);
_batches.remove(batchId);
}
}
} catch (e, stk) {
logger?.severe('Error uploading attachment ${attachment.id}', e, stk);
onAttachmentUploadFailed?.call(batchId, attachment, e, stk);
if (_batches[batchId] case final batch?) {
final failed = [...batch.failedAttachments, attachment];
_batches[batchId] = _UploadBatch(
id: batchId,
attachments: batch.attachments,
uploadedAttachments: batch.uploadedAttachments,
failedAttachments: failed,
);
// Check if all attachments for this batch are failed.
if (batch.attachments.length == failed.length) {
logger?.info('Batch $batchId failed to upload');
onBatchUploadFailed?.call(batchId, failed, e, stk);
_batches.remove(batchId);
}
}
}
}
/// Cancels the upload of a specific attachment in a batch.
///
/// Returns `true` if the attachment was found and cancellation was initiated,
/// `false` otherwise.
bool cancelAttachment({
required String batchId,
required String attachmentId,
String? reason,
}) {
logger?.info('Cancelling attachment $attachmentId in batch $batchId');
final batch = _batches[batchId];
if (batch == null) {
logger?.warning('Batch $batchId not found, cannot cancel');
return false;
}
// Find the attachment in the batch
final attachment = batch.attachments.firstWhereOrNull(
(it) => it.id == attachmentId,
);
if (attachment == null) {
logger?.warning('Attachment $attachmentId not found, cannot cancel');
return false;
}
// If already completed or failed, do nothing
if (batch.uploadedAttachments.any((it) => it.id == attachmentId)) {
logger?.info('Attachment $attachmentId already uploaded, cannot cancel');
return false;
}
if (batch.failedAttachments.any((it) => it.id == attachmentId)) {
logger?.info('Attachment $attachmentId already failed, cannot cancel');
return false;
}
// Find the task for this attachment
final task = _pendingTasks.firstWhereOrNull(
(task) => task.batchId == batchId && task.attachment.id == attachmentId,
);
if (task == null) {
logger?.info('Attachment $attachmentId task not found, cannot cancel');
return false;
}
final cancelToken = task.cancelToken;
if (cancelToken.isCancelled) {
logger?.info('Attachment $attachmentId already cancelled');
return false;
}
// Cancel the task
cancelToken.cancel(reason);
logger?.info('Successfully cancelled attachment $attachmentId');
return true;
}
/// Retries the upload of a failed attachment.
///
/// Returns `true` if the attachment was found and retry was initiated,
/// `false` otherwise.
bool retryAttachment({
required String batchId,
required String attachmentId,
}) {
logger?.info(
'Retrying upload of attachment $attachmentId in batch $batchId',
);
final batch = _batches[batchId];
if (batch == null) {
logger?.warning('Batch $batchId not found: cannot retry');
return false;
}
// Find the failed attachment
final failedAttachment = batch.failedAttachments.firstWhereOrNull((it) {
return it.id == attachmentId;
});
if (failedAttachment == null) {
logger?.warning('Attachment $attachmentId: not found: cannot retry');
return false;
}
final task = _pendingTasks.firstWhereOrNull(
(task) => task.batchId == batchId && task.attachment.id == attachmentId,
);
if (task != null) {
logger?.info('Attachment $attachmentId: already in pending tasks');
return false;
}
final retryTask = _UploadTask(
batchId: batchId,
attachment: failedAttachment,
cancelToken: CancelToken(),
);
_pendingTasks.add(retryTask);
_processQueue();
logger?.info('Retry initiated for attachment $attachmentId');
return true;
}
}
class _UploadBatch extends Equatable {
const _UploadBatch({
required this.id,
required this.attachments,
this.uploadedAttachments = const [],
this.failedAttachments = const [],
});
final String id;
final List<Attachment> attachments;
final List<Attachment> uploadedAttachments;
final List<Attachment> failedAttachments;
@override
List<Object?> get props => [id];
}
class _UploadTask extends Equatable {
const _UploadTask({
required this.batchId,
required this.attachment,
required this.cancelToken,
});
final String batchId;
final Attachment attachment;
final CancelToken cancelToken;
@override
List<Object> get props => [batchId, attachment.id];
}
@xsahil03x
Copy link
Author

Tests:

// ignore_for_file: cascade_invocations

import 'dart:async';

import 'package:dio/dio.dart';
import 'package:mocktail/mocktail.dart';
import 'package:stream_chat/src/client/attachment_uploader.dart';
import 'package:stream_chat/src/client/retry_policy.dart';
import 'package:stream_chat/stream_chat.dart';
import 'package:test/test.dart';

import '../mocks.dart';

class MockAttachmentFile extends Mock implements AttachmentFile {}

class MockCancelToken extends Mock implements CancelToken {}

class MockSendFileResponse extends Mock implements SendFileResponse {
  MockSendFileResponse([this.thumbUrl]);

  @override
  String? get file => 'file-url';

  @override
  final String? thumbUrl;
}

class MockSendImageResponse extends Mock implements SendImageResponse {
  @override
  String? get file => 'image-url';
}

void main() {
  final config = AttachmentUploadConfig(
    maxConcurrentUploads: 3,
    progressUpdateInterval: const Duration(milliseconds: 100),
    retryPolicy: RetryPolicy(
      maxRetryAttempts: 3,
      shouldRetry: (_, __, error) {
        return error is StreamChatNetworkError && error.isRetriable;
      },
    ),
  );

  late Channel mockChannel;

  setUp(() => mockChannel = MockRetryQueueChannel());

  tearDown(() => reset(mockChannel));

  setUpAll(() => registerFallbackValue(MockAttachmentFile()));

  group('AttachmentUploadManager', () {
    test('enqueuing empty batch does not start uploads', () {
      final uploadManager = AttachmentUploadManager(
        channel: mockChannel,
        config: config,
      );

      uploadManager.enqueue(batchId: 'test-batch', attachments: []);

      verifyNever(() => mockChannel.sendFile(any()));
      verifyNever(() => mockChannel.sendImage(any()));
    });

    test('duplicate batch ID is skipped', () {
      final uploadManager = AttachmentUploadManager(
        channel: mockChannel,
        config: config,
      );

      final mockFile = MockAttachmentFile();
      when(() => mockFile.path).thenReturn('/path/to/file.pdf');

      final fileAttachment = Attachment(
        id: 'test-attachment',
        type: AttachmentType.file,
        file: mockFile,
      );

      when(
        () => mockChannel.sendFile(
          any(),
          cancelToken: any(named: 'cancelToken'),
          extraData: any(named: 'extraData'),
          onSendProgress: any(named: 'onSendProgress'),
        ),
      ).thenAnswer((_) async => MockSendFileResponse());

      // First batch is processed
      uploadManager.enqueue(
        batchId: 'test-batch',
        attachments: [fileAttachment],
      );

      // Second batch with same ID is ignored
      uploadManager.enqueue(
        batchId: 'test-batch',
        attachments: [fileAttachment],
      );

      // Verify sendFile is called only once (for the first batch)
      verify(
        () => mockChannel.sendFile(
          any(),
          cancelToken: any(named: 'cancelToken'),
          extraData: any(named: 'extraData'),
          onSendProgress: any(named: 'onSendProgress'),
        ),
      ).called(1);
    });

    test('attachments without files are skipped', () {
      final uploadManager = AttachmentUploadManager(
        channel: mockChannel,
        config: config,
      );

      final attachment = Attachment(
        id: 'test-attachment',
        type: AttachmentType.file,
      );

      uploadManager.enqueue(
        batchId: 'test-batch',
        attachments: [attachment],
      );

      verifyNever(() => mockChannel.sendFile(any()));
      verifyNever(() => mockChannel.sendImage(any()));
    });

    group('file attachment uploads', () {
      late Attachment fileAttachment;
      late MockAttachmentFile mockFile;

      setUp(() {
        mockFile = MockAttachmentFile();
        when(() => mockFile.size).thenReturn(100);
        when(() => mockFile.path).thenReturn('/path/to/file.pdf');

        fileAttachment = Attachment(
          id: 'file-attachment',
          type: AttachmentType.file,
          file: mockFile,
        );

        when(
          () => mockChannel.sendFile(
            any(),
            cancelToken: any(named: 'cancelToken'),
            extraData: any(named: 'extraData'),
            onSendProgress: any(named: 'onSendProgress'),
          ),
        ).thenAnswer((_) async => MockSendFileResponse());
      });

      test('uploads file attachment successfully', () async {
        final completer = Completer<Attachment>();

        final uploadManager = AttachmentUploadManager(
          channel: mockChannel,
          config: config,
          onAttachmentUploadSuccess: (batchId, attachment, response) {
            completer.complete(
              attachment.copyWith(
                assetUrl: response.file,
                uploadState: const UploadState.success(),
              ),
            );
          },
        );

        uploadManager.enqueue(
          batchId: 'test-batch',
          attachments: [fileAttachment],
        );

        final uploadedAttachment = await completer.future;

        expect(uploadedAttachment.assetUrl, 'file-url');
        expect(uploadedAttachment.uploadState, const UploadState.success());

        verify(
          () => mockChannel.sendFile(
            mockFile,
            cancelToken: any(named: 'cancelToken'),
            extraData: any(named: 'extraData'),
            onSendProgress: any(named: 'onSendProgress'),
          ),
        ).called(1);
      });

      test('reports upload progress', () async {
        final progressEvents = <double>[];
        final completer = Completer<void>();

        final uploadManager = AttachmentUploadManager(
          channel: mockChannel,
          config: config,
          onAttachmentUploadProgress: (_, __, sent, total) {
            progressEvents.add(sent / total);
          },
          onAttachmentUploadSuccess: (_, __, ___) {
            completer.complete();
          },
        );

        // Capture and trigger the progress callback
        when(
          () => mockChannel.sendFile(
            any(),
            cancelToken: any(named: 'cancelToken'),
            extraData: any(named: 'extraData'),
            onSendProgress: any(named: 'onSendProgress'),
          ),
        ).thenAnswer((invocation) {
          final progressCallback = invocation.namedArguments[#onSendProgress];

          progressCallback(25, 100); // 25%
          progressCallback(50, 100); // 50%
          progressCallback(100, 100); // 100%

          return Future.value(MockSendFileResponse());
        });

        uploadManager.enqueue(
          batchId: 'test-batch',
          attachments: [fileAttachment],
        );

        await completer.future;

        expect(progressEvents, [0.25, 0.5, 1.0]);
      });

      test('handles upload failure', () async {
        final failureCompleter = Completer<Object?>();

        when(
          () => mockChannel.sendFile(
            any(),
            cancelToken: any(named: 'cancelToken'),
            extraData: any(named: 'extraData'),
            onSendProgress: any(named: 'onSendProgress'),
          ),
        ).thenThrow(StreamChatNetworkError(ChatErrorCode.requestTimeout));

        final uploadManager = AttachmentUploadManager(
          channel: mockChannel,
          config: config,
          onAttachmentUploadFailed: (_, __, error, ___) {
            failureCompleter.complete(error);
          },
        );

        uploadManager.enqueue(
          batchId: 'test-batch',
          attachments: [fileAttachment],
        );

        final error = await failureCompleter.future;
        expect(error, isA<StreamChatNetworkError>());
      });

      test('includes attachment extraData in upload', () async {
        final completer = Completer<void>();
        Map<String, Object?>? capturedExtraData;

        // Set up mock to capture extraData
        when(
          () => mockChannel.sendFile(
            any(),
            cancelToken: any(named: 'cancelToken'),
            extraData: any(named: 'extraData'),
            onSendProgress: any(named: 'onSendProgress'),
          ),
        ).thenAnswer(
          (invocation) {
            capturedExtraData = invocation.namedArguments[#extraData];
            return Future.value(MockSendFileResponse());
          },
        );

        final uploadManager = AttachmentUploadManager(
          channel: mockChannel,
          config: config,
          onAttachmentUploadSuccess: (_, __, ___) {
            completer.complete();
          },
        );

        final attachmentWithExtraData = fileAttachment.copyWith(
          extraData: const {'custom_field': 'custom_value'},
        );

        uploadManager.enqueue(
          batchId: 'test-batch',
          attachments: [attachmentWithExtraData],
        );

        await completer.future;

        // Verify sendFile was called
        verify(
          () => mockChannel.sendFile(
            mockFile,
            cancelToken: any(named: 'cancelToken'),
            extraData: any(named: 'extraData'),
            onSendProgress: any(named: 'onSendProgress'),
          ),
        ).called(1);

        // Verify the captured extraData contains our custom field
        expect(capturedExtraData, isNotNull);
        expect(capturedExtraData!.containsKey('custom_field'), isTrue);
        expect(capturedExtraData!['custom_field'], equals('custom_value'));
      });
    });

    group('image attachment uploads', () {
      late Attachment imageAttachment;
      late MockAttachmentFile mockFile;

      setUp(() {
        mockFile = MockAttachmentFile();
        when(() => mockFile.size).thenReturn(100);
        when(() => mockFile.path).thenReturn('/path/to/image.jpg');

        imageAttachment = Attachment(
          id: 'image-attachment',
          type: AttachmentType.image,
          file: mockFile,
        );

        when(
          () => mockChannel.sendImage(
            any(),
            cancelToken: any(named: 'cancelToken'),
            extraData: any(named: 'extraData'),
            onSendProgress: any(named: 'onSendProgress'),
          ),
        ).thenAnswer((_) async => MockSendImageResponse());
      });

      test('uploads image attachment successfully', () async {
        final completer = Completer<Attachment>();

        final uploadManager = AttachmentUploadManager(
          channel: mockChannel,
          config: config,
          onAttachmentUploadSuccess: (batchId, attachment, response) {
            completer.complete(
              attachment.copyWith(
                imageUrl: response.file,
                uploadState: const UploadState.success(),
              ),
            );
          },
        );

        uploadManager.enqueue(
          batchId: 'test-batch',
          attachments: [imageAttachment],
        );

        final uploadedAttachment = await completer.future;

        expect(uploadedAttachment.imageUrl, 'image-url');
        expect(uploadedAttachment.uploadState, const UploadState.success());

        verify(
          () => mockChannel.sendImage(
            mockFile,
            cancelToken: any(named: 'cancelToken'),
            extraData: any(named: 'extraData'),
            onSendProgress: any(named: 'onSendProgress'),
          ),
        ).called(1);
      });
    });

    group('video attachment uploads', () {
      late Attachment videoAttachment;
      late MockAttachmentFile mockFile;

      setUp(() {
        mockFile = MockAttachmentFile();
        when(() => mockFile.size).thenReturn(1000);
        when(() => mockFile.path).thenReturn('/path/to/video.mp4');

        videoAttachment = Attachment(
          id: 'video-attachment',
          type: AttachmentType.video,
          file: mockFile,
        );

        when(
          () => mockChannel.sendFile(
            any(),
            cancelToken: any(named: 'cancelToken'),
            extraData: any(named: 'extraData'),
            onSendProgress: any(named: 'onSendProgress'),
          ),
        ).thenAnswer((_) async => MockSendFileResponse('thumb-url'));
      });

      test('uploads video with thumbnail URL', () async {
        final completer = Completer<Attachment>();

        final uploadManager = AttachmentUploadManager(
          channel: mockChannel,
          config: config,
          onAttachmentUploadSuccess: (batchId, attachment, response) {
            if (response is SendFileResponse) {
              completer.complete(
                attachment.copyWith(
                  assetUrl: response.file,
                  thumbUrl: response.thumbUrl,
                  uploadState: const UploadState.success(),
                ),
              );
            }
          },
        );

        uploadManager.enqueue(
          batchId: 'test-batch',
          attachments: [videoAttachment],
        );

        final uploadedAttachment = await completer.future;

        expect(uploadedAttachment.assetUrl, 'file-url');
        expect(uploadedAttachment.thumbUrl, 'thumb-url');
        expect(uploadedAttachment.uploadState, const UploadState.success());

        verify(
          () => mockChannel.sendFile(
            mockFile,
            cancelToken: any(named: 'cancelToken'),
            extraData: any(named: 'extraData'),
            onSendProgress: any(named: 'onSendProgress'),
          ),
        ).called(1);
      });
    });

    group('batch operations', () {
      late Attachment fileAttachment;
      late Attachment imageAttachment;
      late MockAttachmentFile mockFileAttachment;
      late MockAttachmentFile mockImageAttachment;

      setUp(() {
        mockFileAttachment = MockAttachmentFile();
        when(() => mockFileAttachment.size).thenReturn(100);
        when(() => mockFileAttachment.path).thenReturn('/path/to/file.pdf');

        mockImageAttachment = MockAttachmentFile();
        when(() => mockImageAttachment.size).thenReturn(100);
        when(() => mockImageAttachment.path).thenReturn('/path/to/image.jpg');

        fileAttachment = Attachment(
          id: 'file-attachment',
          type: AttachmentType.file,
          file: mockFileAttachment,
        );

        imageAttachment = Attachment(
          id: 'image-attachment',
          type: AttachmentType.image,
          file: mockImageAttachment,
        );

        when(
          () => mockChannel.sendFile(
            any(),
            cancelToken: any(named: 'cancelToken'),
            extraData: any(named: 'extraData'),
            onSendProgress: any(named: 'onSendProgress'),
          ),
        ).thenAnswer((_) async => MockSendFileResponse());

        when(
          () => mockChannel.sendImage(
            any(),
            cancelToken: any(named: 'cancelToken'),
            extraData: any(named: 'extraData'),
            onSendProgress: any(named: 'onSendProgress'),
          ),
        ).thenAnswer((_) async => MockSendImageResponse());
      });

      test('batch upload started callback is triggered', () {
        final completer = Completer<List<Attachment>>();

        final uploadManager = AttachmentUploadManager(
          channel: mockChannel,
          config: config,
          onBatchUploadStarted: (_, attachments) {
            completer.complete(attachments);
          },
        );

        uploadManager.enqueue(
          batchId: 'test-batch',
          attachments: [fileAttachment, imageAttachment],
        );

        expect(completer.isCompleted, isTrue);
        completer.future.then((attachments) {
          expect(attachments.length, 2);
          expect(attachments.first.id, 'file-attachment');
          expect(attachments.last.id, 'image-attachment');
        });
      });

      test('batch completes when all attachments uploaded', () async {
        final batchCompleter = Completer<List<Attachment>>();
        final uploadedAttachments = <Attachment>[];

        final uploadManager = AttachmentUploadManager(
          channel: mockChannel,
          config: config,
          onAttachmentUploadSuccess: (_, attachment, response) {
            uploadedAttachments.add(attachment);
          },
          onBatchUploadSuccess: (_, attachments) {
            batchCompleter.complete(attachments);
          },
        );

        uploadManager.enqueue(
          batchId: 'test-batch',
          attachments: [fileAttachment, imageAttachment],
        );

        final completedBatch = await batchCompleter.future;

        expect(completedBatch.length, 2);
        expect(uploadedAttachments.length, 2);

        // Verify both types of upload methods were called
        verify(
          () => mockChannel.sendFile(
            any(),
            cancelToken: any(named: 'cancelToken'),
            extraData: any(named: 'extraData'),
            onSendProgress: any(named: 'onSendProgress'),
          ),
        ).called(1);

        verify(
          () => mockChannel.sendImage(
            any(),
            cancelToken: any(named: 'cancelToken'),
            extraData: any(named: 'extraData'),
            onSendProgress: any(named: 'onSendProgress'),
          ),
        ).called(1);
      });

      test('batch fails when all attachments fail', () async {
        final batchFailureCompleter = Completer<List<Attachment>>();

        when(
          () => mockChannel.sendFile(
            any(),
            cancelToken: any(named: 'cancelToken'),
            extraData: any(named: 'extraData'),
            onSendProgress: any(named: 'onSendProgress'),
          ),
        ).thenThrow(StreamChatNetworkError(ChatErrorCode.requestTimeout));

        when(
          () => mockChannel.sendImage(
            any(),
            cancelToken: any(named: 'cancelToken'),
            extraData: any(named: 'extraData'),
            onSendProgress: any(named: 'onSendProgress'),
          ),
        ).thenThrow(StreamChatNetworkError(ChatErrorCode.requestTimeout));

        final uploadManager = AttachmentUploadManager(
          channel: mockChannel,
          config: config,
          onBatchUploadFailed: (_, attachments, __, ___) {
            batchFailureCompleter.complete(attachments);
          },
        );

        uploadManager.enqueue(
          batchId: 'test-batch',
          attachments: [fileAttachment, imageAttachment],
        );

        final failedBatch = await batchFailureCompleter.future;

        expect(failedBatch.length, 2);
      });

      test(
        'batch with mixed success/failure does not trigger batch callbacks',
        () async {
          final completedAttachments = <Attachment>[];
          final failedAttachments = <Attachment>[];
          final batchCompletedCompleter = Completer<void>();
          final batchFailedCompleter = Completer<void>();

          // Make file upload succeed but image upload fail
          when(
            () => mockChannel.sendFile(
              any(),
              cancelToken: any(named: 'cancelToken'),
              extraData: any(named: 'extraData'),
              onSendProgress: any(named: 'onSendProgress'),
            ),
          ).thenAnswer((_) async => MockSendFileResponse());

          when(
            () => mockChannel.sendImage(
              any(),
              cancelToken: any(named: 'cancelToken'),
              extraData: any(named: 'extraData'),
              onSendProgress: any(named: 'onSendProgress'),
            ),
          ).thenThrow(StreamChatNetworkError(ChatErrorCode.requestTimeout));

          final uploadManager = AttachmentUploadManager(
            channel: mockChannel,
            config: config,
            onAttachmentUploadSuccess: (_, attachment, response) {
              completedAttachments.add(attachment);
            },
            onAttachmentUploadFailed: (_, attachment, __, ___) {
              failedAttachments.add(attachment);
            },
            onBatchUploadSuccess: (_, __) {
              batchCompletedCompleter.complete();
            },
            onBatchUploadFailed: (_, __, ___, ____) {
              batchFailedCompleter.complete();
            },
          );

          uploadManager.enqueue(
            batchId: 'test-batch',
            attachments: [fileAttachment, imageAttachment],
          );

          // Give time for all operations to complete
          await Future.delayed(const Duration(seconds: 3));

          expect(completedAttachments.length, 1);
          expect(failedAttachments.length, 1);
          expect(batchCompletedCompleter.isCompleted, isFalse);
          expect(batchFailedCompleter.isCompleted, isFalse);
        },
      );
    });

    group('retry policy', () {
      late Attachment fileAttachment;
      late MockAttachmentFile mockFile;

      setUp(() {
        mockFile = MockAttachmentFile();
        when(() => mockFile.size).thenReturn(100);
        when(() => mockFile.path).thenReturn('/path/to/file.pdf');

        fileAttachment = Attachment(
          id: 'file-attachment',
          type: AttachmentType.file,
          file: mockFile,
        );
      });

      test('uses retry policy on transient errors', () async {
        var attemptCount = 0;
        final completer = Completer<Attachment>();

        // Custom retry policy that retries twice
        final config = AttachmentUploadConfig(
          maxConcurrentUploads: 1, // Limit to 1 to avoid parallel uploads
          retryPolicy: RetryPolicy(
            maxRetryAttempts: 3,
            delayFactor: const Duration(milliseconds: 10),
            shouldRetry: (_, attempt, error) {
              return attempt < 3 && error is StreamChatNetworkError;
            },
          ),
        );

        when(
          () => mockChannel.sendFile(
            any(),
            cancelToken: any(named: 'cancelToken'),
            extraData: any(named: 'extraData'),
            onSendProgress: any(named: 'onSendProgress'),
          ),
        ).thenAnswer((_) {
          attemptCount++;
          if (attemptCount < 3) {
            throw StreamChatNetworkError(ChatErrorCode.requestTimeout);
          }
          return Future.value(MockSendFileResponse());
        });

        final uploadManager = AttachmentUploadManager(
          channel: mockChannel,
          config: config,
          onAttachmentUploadSuccess: (_, attachment, response) {
            completer.complete(
              attachment.copyWith(
                assetUrl: response.file,
                uploadState: const UploadState.success(),
              ),
            );
          },
        );

        uploadManager.enqueue(
          batchId: 'test-batch',
          attachments: [fileAttachment],
        );

        final uploadedAttachment = await completer.future;

        expect(uploadedAttachment.assetUrl, 'file-url');
        expect(attemptCount, 3); // 2 failures + 1 success

        // Only verify that it was called with attempts
        verify(
          () => mockChannel.sendFile(
            any(),
            cancelToken: any(named: 'cancelToken'),
            extraData: any(named: 'extraData'),
            onSendProgress: any(named: 'onSendProgress'),
          ),
        ).called(attemptCount);
      });

      test('gives up after max attempts', () async {
        var attemptCount = 0;
        final completer = Completer<Object?>();

        // Custom retry policy with limited attempts
        final config = AttachmentUploadConfig(
          retryPolicy: RetryPolicy(
            maxRetryAttempts: 2,
            delayFactor: const Duration(milliseconds: 10),
            shouldRetry: (_, attempt, error) {
              return attempt < 2 && error is StreamChatNetworkError;
            },
          ),
        );

        when(
          () => mockChannel.sendFile(
            any(),
            cancelToken: any(named: 'cancelToken'),
            extraData: any(named: 'extraData'),
            onSendProgress: any(named: 'onSendProgress'),
          ),
        ).thenAnswer((_) {
          attemptCount++;
          throw StreamChatNetworkError(ChatErrorCode.requestTimeout);
        });

        final uploadManager = AttachmentUploadManager(
          channel: mockChannel,
          config: config,
          onAttachmentUploadFailed: (_, __, error, ___) {
            completer.complete(error);
          },
        );

        uploadManager.enqueue(
          batchId: 'test-batch',
          attachments: [fileAttachment],
        );

        final error = await completer.future;

        expect(error, isA<StreamChatNetworkError>());
        expect(attemptCount, 2); // Only attempted twice per policy

        // Verify that it was called twice
        verify(
          () => mockChannel.sendFile(
            any(),
            cancelToken: any(named: 'cancelToken'),
            extraData: any(named: 'extraData'),
            onSendProgress: any(named: 'onSendProgress'),
          ),
        ).called(attemptCount);
      });

      test('does not retry on permanent errors', () async {
        var attemptCount = 0;
        final completer = Completer<Object?>();

        // Custom retry policy that only retries network errors
        final config = AttachmentUploadConfig(
          retryPolicy: RetryPolicy(
            maxRetryAttempts: 3,
            delayFactor: const Duration(milliseconds: 10),
            shouldRetry: (_, __, error) {
              return error is StreamChatNetworkError;
            },
          ),
        );

        when(
          () => mockChannel.sendFile(
            any(),
            cancelToken: any(named: 'cancelToken'),
            extraData: any(named: 'extraData'),
            onSendProgress: any(named: 'onSendProgress'),
          ),
        ).thenAnswer((_) {
          attemptCount++;
          // This is not a network error, so should not be retried
          throw const StreamChatError('Permanent error');
        });

        final uploadManager = AttachmentUploadManager(
          channel: mockChannel,
          config: config,
          onAttachmentUploadFailed: (_, __, error, ___) {
            completer.complete(error);
          },
        );

        uploadManager.enqueue(
          batchId: 'test-batch',
          attachments: [fileAttachment],
        );

        final error = await completer.future;

        expect(error, isA<StreamChatError>());
        expect(attemptCount, 1); // No retries for non-network errors

        // Verify that it was called once
        verify(
          () => mockChannel.sendFile(
            any(),
            cancelToken: any(named: 'cancelToken'),
            extraData: any(named: 'extraData'),
            onSendProgress: any(named: 'onSendProgress'),
          ),
        ).called(attemptCount);
      });
    });

    group('cancellation and retry', () {
      late Attachment fileAttachment;
      late MockAttachmentFile mockFileAttachment;

      setUp(() {
        mockFileAttachment = MockAttachmentFile();
        when(() => mockFileAttachment.size).thenReturn(100);
        when(() => mockFileAttachment.path).thenReturn('/path/to/file.pdf');

        fileAttachment = Attachment(
          id: 'file-attachment',
          type: AttachmentType.file,
          file: mockFileAttachment,
        );

        when(
          () => mockChannel.sendFile(
            any(),
            cancelToken: any(named: 'cancelToken'),
            extraData: any(named: 'extraData'),
            onSendProgress: any(named: 'onSendProgress'),
          ),
        ).thenAnswer((_) async {
          await Future.delayed(const Duration(milliseconds: 50));
          return MockSendFileResponse();
        });
      });

      test('cancelAttachment returns false for non-existing attachments', () {
        final uploadManager = AttachmentUploadManager(
          channel: mockChannel,
          config: config,
        );

        final result = uploadManager.cancelAttachment(
          batchId: 'non-existent-batch',
          attachmentId: 'non-existent-attachment',
        );

        expect(result, isFalse);
      });

      test('retryAttachment returns false for non-existing batch', () {
        final uploadManager = AttachmentUploadManager(
          channel: mockChannel,
          config: config,
        );

        final result = uploadManager.retryAttachment(
          batchId: 'non-existent-batch',
          attachmentId: 'non-existent-attachment',
        );

        expect(result, isFalse);
      });

      test(
        'retryAttachment returns false for attachment not found in failed attachments',
        () async {
          final uploadManager = AttachmentUploadManager(
            channel: mockChannel,
            config: config,
          );

          // Queue a successful upload
          uploadManager.enqueue(
            batchId: 'test-batch',
            attachments: [fileAttachment],
          );

          // Wait for upload to complete
          await Future.delayed(const Duration(milliseconds: 100));

          // Try to retry an attachment that wasn't failed
          final result = uploadManager.retryAttachment(
            batchId: 'test-batch',
            attachmentId: 'file-attachment',
          );

          expect(result, isFalse);
        },
      );

      test(
        'cancelAttachment returns correct value for non-existent items',
        () {
          final uploadManager = AttachmentUploadManager(
            channel: mockChannel,
            config: config,
          );

          // Verify cancelAttachment returns false for non-existent batch
          final result1 = uploadManager.cancelAttachment(
            batchId: 'non-existent-batch',
            attachmentId: 'file-attachment',
          );

          expect(result1, isFalse);

          // Enqueue a batch but with a different attachment ID
          uploadManager.enqueue(
            batchId: 'test-batch',
            attachments: [fileAttachment],
          );

          // Verify cancelAttachment returns false for non-existent attachment
          // in real batch
          final result2 = uploadManager.cancelAttachment(
            batchId: 'test-batch',
            attachmentId: 'non-existent-attachment',
          );

          expect(result2, isFalse);
        },
      );

      test('successfully cancels an attachment upload in progress', () async {
        final uploadStartedCompleter = Completer<void>();
        final uploadFailedCompleter = Completer<Object?>();
        CancelToken? capturedCancelToken;

        // Simulate a slow upload that can be cancelled
        when(
          () => mockChannel.sendFile(
            any(),
            cancelToken: any(named: 'cancelToken'),
            extraData: any(named: 'extraData'),
            onSendProgress: any(named: 'onSendProgress'),
          ),
        ).thenAnswer((invocation) async {
          capturedCancelToken = invocation.namedArguments[#cancelToken];

          // Notify that upload has started
          uploadStartedCompleter.complete();

          // Wait a bit to simulate upload in progress
          await Future.delayed(const Duration(milliseconds: 300));

          // If we reach here without cancellation, complete the upload
          if (capturedCancelToken!.isCancelled) {
            throw DioException.requestCancelled(
              reason: 'Request cancelled',
              requestOptions: RequestOptions(),
            );
          }

          return MockSendFileResponse();
        });

        final uploadManager = AttachmentUploadManager(
          channel: mockChannel,
          config: config,
          onAttachmentUploadStarted: (_, __) {
            if (!uploadStartedCompleter.isCompleted) {
              uploadStartedCompleter.complete();
            }
          },
          onAttachmentUploadFailed: (_, __, error, ___) {
            uploadFailedCompleter.complete(error);
          },
        );

        // Queue the upload
        uploadManager.enqueue(
          batchId: 'test-batch',
          attachments: [fileAttachment],
        );

        // Wait for upload to start
        await uploadStartedCompleter.future;

        // Directly capture and manually cancel the token to simulate real behavior
        // This simulates our implementation where we get the task and cancel its token
        await Future.delayed(const Duration(milliseconds: 50));
        capturedCancelToken?.cancel('Testing cancellation');

        // Verify that upload failed with a cancellation error
        final error = await uploadFailedCompleter.future;
        expect(error, isA<DioException>());
      });

      test('cancelAttachment returns false for already cancelled attachment',
          () async {
        final uploadStartedCompleter = Completer<void>();
        final failedCompleter = Completer<Object?>();
        CancelToken? capturedCancelToken;

        // Simulate a cancellable upload
        when(
          () => mockChannel.sendFile(
            any(),
            cancelToken: any(named: 'cancelToken'),
            extraData: any(named: 'extraData'),
            onSendProgress: any(named: 'onSendProgress'),
          ),
        ).thenAnswer((invocation) async {
          capturedCancelToken =
              invocation.namedArguments[#cancelToken] as CancelToken;

          uploadStartedCompleter.complete();

          // Wait a bit
          await Future.delayed(const Duration(milliseconds: 200));

          if (capturedCancelToken!.isCancelled) {
            throw DioException(
              requestOptions: RequestOptions(),
              type: DioExceptionType.cancel,
              error: 'Request cancelled',
            );
          }

          return MockSendFileResponse();
        });

        final uploadManager = AttachmentUploadManager(
          channel: mockChannel,
          config: config,
          onAttachmentUploadStarted: (_, __) {
            if (!uploadStartedCompleter.isCompleted) {
              uploadStartedCompleter.complete();
            }
          },
          onAttachmentUploadFailed: (_, __, error, ___) {
            failedCompleter.complete(error);
          },
        );

        // Queue the upload
        uploadManager.enqueue(
          batchId: 'test-batch',
          attachments: [fileAttachment],
        );

        // Wait for upload to start
        await uploadStartedCompleter.future;

        // Directly cancel the token to simulate real behavior
        await Future.delayed(const Duration(milliseconds: 50));
        capturedCancelToken?.cancel();

        // Wait for failure callback
        await failedCompleter.future;
      });

      test('successfully retries a failed attachment', () async {
        final failureCompleter = Completer<void>();
        final successCompleter = Completer<void>();
        bool hasFailed = false;

        // First fail, then succeed on retry
        when(
          () => mockChannel.sendFile(
            any(),
            cancelToken: any(named: 'cancelToken'),
            extraData: any(named: 'extraData'),
            onSendProgress: any(named: 'onSendProgress'),
          ),
        ).thenAnswer((_) async {
          if (!hasFailed) {
            hasFailed = true;
            failureCompleter.complete();
            throw const StreamChatNetworkError(ChatErrorCode.inputError);
          }
          await Future.delayed(const Duration(milliseconds: 50));
          return MockSendFileResponse();
        });

        final uploadManager = AttachmentUploadManager(
          channel: mockChannel,
          config: config,
          onAttachmentUploadFailed: (_, __, ___, ____) {
            if (!failureCompleter.isCompleted) {
              failureCompleter.complete();
            }
          },
          onAttachmentUploadSuccess: (_, __, ___) {
            successCompleter.complete();
          },
        );

        // Queue the upload that will fail
        uploadManager.enqueue(
          batchId: 'test-batch',
          attachments: [fileAttachment],
        );

        // Wait for it to fail
        await failureCompleter.future;
        await Future.delayed(const Duration(milliseconds: 100));

        // Manually set up conditions for retry to succeed
        final retryResult = uploadManager.retryAttachment(
          batchId: 'test-batch',
          attachmentId: 'file-attachment',
        );

        // Wait for success after retry
        await successCompleter.future;

        // Verify sendFile was called twice (initial failure + successful retry)
        verify(
          () => mockChannel.sendFile(
            any(),
            cancelToken: any(named: 'cancelToken'),
            extraData: any(named: 'extraData'),
            onSendProgress: any(named: 'onSendProgress'),
          ),
        ).called(2);
      });

      test('retryAttachment returns false for attachment already in queue',
          () async {
        final failureCompleter = Completer<void>();
        final pendingTaskCompleter = Completer<void>();
        bool hasFailed = false;

        // First request fails, second one is delayed
        when(
          () => mockChannel.sendFile(
            any(),
            cancelToken: any(named: 'cancelToken'),
            extraData: any(named: 'extraData'),
            onSendProgress: any(named: 'onSendProgress'),
          ),
        ).thenAnswer((invocation) async {
          if (!hasFailed) {
            hasFailed = true;
            failureCompleter.complete();
            throw const StreamChatNetworkError(ChatErrorCode.inputError);
          }

          pendingTaskCompleter.complete();
          // Second request will hang to keep task in queue
          await Future.delayed(const Duration(hours: 1));
          return MockSendFileResponse();
        });

        final uploadManager = AttachmentUploadManager(
          channel: mockChannel,
          config: config,
          onAttachmentUploadFailed: (_, __, ___, ____) {
            if (!failureCompleter.isCompleted) {
              failureCompleter.complete();
            }
          },
        );

        // Queue the upload that will fail
        uploadManager.enqueue(
          batchId: 'test-batch',
          attachments: [fileAttachment],
        );

        // Wait for it to fail
        await failureCompleter.future;
        await Future.delayed(const Duration(milliseconds: 100));

        // First retry should work
        uploadManager.retryAttachment(
          batchId: 'test-batch',
          attachmentId: 'file-attachment',
        );

        // Wait for the pending task to start
        await pendingTaskCompleter.future;

        // Second retry should fail since task is already in queue
        final secondRetryResult = uploadManager.retryAttachment(
          batchId: 'test-batch',
          attachmentId: 'file-attachment',
        );

        expect(secondRetryResult, isFalse);
      });

      test('cancelAttachment returns false for already completed attachment',
          () async {
        final successCompleter = Completer<void>();

        final uploadManager = AttachmentUploadManager(
          channel: mockChannel,
          config: config,
          onAttachmentUploadSuccess: (_, __, ___) {
            successCompleter.complete();
          },
        );

        // Queue a successful upload
        uploadManager.enqueue(
          batchId: 'test-batch',
          attachments: [fileAttachment],
        );

        // Wait for it to complete
        await successCompleter.future;
        await Future.delayed(const Duration(milliseconds: 50));

        // Try to cancel completed attachment
        final cancelResult = uploadManager.cancelAttachment(
          batchId: 'test-batch',
          attachmentId: 'file-attachment',
        );

        expect(cancelResult, isFalse);
      });
    });
  });
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment