Skip to content

Instantly share code, notes, and snippets.

@eashish93
Created August 3, 2024 11:51
Show Gist options
  • Save eashish93/85ebe26ccc78ef8cb87339c8c918b719 to your computer and use it in GitHub Desktop.
Save eashish93/85ebe26ccc78ef8cb87339c8c918b719 to your computer and use it in GitHub Desktop.
Parallel upload on R2 using S3 sdk with multipart (for large files)
// Define the type for upload parts
type UploadParts = {
ETag: string;
PartNumber: number;
};
// Function to create multipart upload
async function createMultipartUpload(file: File): Promise<{ uploadId: string; key: string }> {
const fileKey = `${crypto.randomUUID()}-${file.name}`;
const response = await fetch('/api/multipart', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ key: fileKey, contentType: file.type }),
});
if (!response.ok) {
throw new Error('Failed to create multipart upload');
}
const json = await response.json();
return { uploadId: json.uploadId, key: json.key };
}
// Function to get presigned URL for each part
async function getPresignedUrl(uploadId: string, partNumber: number, key: string): Promise<string> {
const response = await fetch(
`/api/multipart?uploadId=${uploadId}&partNumber=${partNumber}&key=${key}`
);
if (!response.ok) {
throw new Error(`Failed to get presigned URL for part ${partNumber}`);
}
const json = await response.json();
return json.url;
}
// Function to upload a single chunk with retry mechanism
async function uploadChunkWithRetry(
url: string,
chunk: Blob,
fileType: string,
maxRetries: number = 3
): Promise<string> {
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
const response = await fetch(url, {
method: 'PUT',
body: chunk,
headers: {
'Content-Type': fileType,
},
});
if (!response.ok) {
throw new Error('Failed to upload chunk');
}
return response.headers.get('ETag') || '';
} catch (error) {
if (attempt < maxRetries) {
console.log(`Retrying upload chunk (attempt ${attempt + 1} of ${maxRetries})...`);
} else {
throw new Error('Max retries reached. Failed to upload chunk');
}
}
}
throw new Error('Failed to upload chunk after retries');
}
// Function to complete multipart upload
async function completeMultipartUpload(
uploadId: string,
key: string,
parts: UploadParts[]
): Promise<string> {
const response = await fetch('/api/multipart/complete', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ uploadId, key, parts }),
});
if (!response.ok) {
throw new Error('Failed to complete multipart upload');
}
const json = await response.json();
return json.publicURL;
}
// Function to handle the entire upload process
export default async function handleFileUpload(
file: File,
onProgress: (progress: number) => void,
queueLimit = 3
): Promise<string> {
const CHUNK_SIZE = 8 * 1024 * 1024; // 8MB
const MAX_PARALLEL_UPLOADS = queueLimit;
const { uploadId, key } = await createMultipartUpload(file);
const totalChunks = Math.ceil(file.size / CHUNK_SIZE);
const progressArray = Array(totalChunks).fill(0);
const partsArray: UploadParts[] = [];
const uploadQueue: Promise<void>[] = [];
const updateProgress = () => {
const uploadedSize = progressArray.reduce((acc, val) => acc + val, 0);
const progress = (uploadedSize / file.size) * 100;
onProgress(progress);
console.log('Progress:', progress);
};
const uploadSingleChunk = async (chunkIndex: number) => {
const chunkSize = Math.min(CHUNK_SIZE, file.size - chunkIndex * CHUNK_SIZE);
const chunk = file.slice(chunkIndex * CHUNK_SIZE, chunkIndex * CHUNK_SIZE + chunkSize);
try {
const url = await getPresignedUrl(uploadId, chunkIndex + 1, key);
const eTag = await uploadChunkWithRetry(url, chunk, file.type);
partsArray.push({ ETag: eTag, PartNumber: chunkIndex + 1 });
progressArray[chunkIndex] = chunk.size;
updateProgress();
} catch (error) {
console.error('Error during upload:', error);
throw error;
}
};
const manageQueue = async () => {
for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex++) {
const uploadPromise = uploadSingleChunk(chunkIndex);
uploadQueue.push(uploadPromise);
if (uploadQueue.length >= MAX_PARALLEL_UPLOADS) {
await Promise.race(uploadQueue);
uploadQueue.splice(uploadQueue.indexOf(uploadPromise), 1);
}
}
await Promise.all(uploadQueue);
};
await manageQueue();
partsArray.sort((a, b) => a.PartNumber - b.PartNumber);
return completeMultipartUpload(uploadId, key, partsArray);
}
import { NextResponse, NextRequest } from 'next/server';
import apiError from '../../lib/apiError';
import { nanoid } from 'nanoid';
import {
S3Client,
CreateMultipartUploadCommand,
UploadPartCommand,
} from '@aws-sdk/client-s3';
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
import { z } from 'zod';
import { parseQuery } from '../../lib/utils';
export const revalidate = 0;
export const dynamic = 'force-dynamic';
const R2_ENDPOINT = process.env.R2_ENDPOINT;
const R2_BUCKET = process.env.R2_BUCKET;
const R2_ACCESS_KEY = process.env.R2_ACCESS_KEY;
const R2_SECRET_KEY = process.env.R2_SECRET_KEY;
const R2_PUBLIC_URL = process.env.R2_PUBLIC_URL;
if (!R2_ACCESS_KEY || !R2_SECRET_KEY) {
throw new Error('R2_ACCESS_KEY and R2_SECRET_KEY must be set');
}
const S3 = new S3Client({
region: 'auto',
endpoint: R2_ENDPOINT,
credentials: {
accessKeyId: R2_ACCESS_KEY,
secretAccessKey: R2_SECRET_KEY,
},
});
// Create multipart upload. (POST: /api/multipart)
export async function POST(req: NextRequest) {
try {
const body = await req.json();
// create multipart upload
const contentType = body.contentType;
const Key = body.key;
let params = {
Key: Key,
ContentType: contentType,
Bucket: R2_BUCKET,
};
const command = new CreateMultipartUploadCommand(params);
const data = await S3.send(command);
return NextResponse.json({
key: data.Key,
uploadId: data.UploadId,
});
} catch (e) {
return apiError(e);
}
}
const querySchema = z.object({
key: z.string().max(500),
partNumber: z.string().max(100),
uploadId: z.string().max(2048),
});
// Get signed url for upload part.
// (POST: /api/multipart)
export async function GET(req: NextRequest) {
const sp = req.nextUrl.searchParams;
const { uploadId, partNumber, key } = parseQuery(sp, querySchema);
const url = await getSignedUrl(
// @ts-ignore
S3,
new UploadPartCommand({
Bucket: R2_BUCKET,
Key: key,
UploadId: uploadId,
PartNumber: partNumber,
// Body: '',
// ACL: 'public-read',
}),
// in seconds.
{ expiresIn: 100 }
);
return NextResponse.json({
url,
});
}
// Complete multipart upload
// POST: (/api/multipart/complete)
export async function POST(req: NextRequest) {
try {
const body = await req.json();
let params = {
Key: body.key,
UploadId: body.uploadId,
MultipartUpload: {
Parts: body.parts,
},
Bucket: R2_BUCKET,
};
const command = new CompleteMultipartUploadCommand(params);
const data = await S3.send(command);
return NextResponse.json({
publicURL: `${R2_PUBLIC_URL}/${data.Key}`,
});
} catch (e) {
console.error('Error [complete multipart]: ', e);
return apiError(e);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment