Created
July 7, 2018 00:45
-
-
Save glendaviesnz/0cb4ee94ec8cb7e221b4818ab7b6dc99 to your computer and use it in GitHub Desktop.
RxJs document chunker and uploader. This class takes a document and breaks it into 1Mb chunks and uploads those chunks sequentially to the server.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export class ChunkingService { | |
public chunkSize = 1024 * 1024; | |
private _chunkQueue$ = new Subject(); | |
private _maxConnections = 3; | |
constructor( | |
private _apiService: apiService, | |
private _store: Store<any>) { | |
Guard.notNothing(_apiService, '_apiService'); | |
Guard.notNothing(_store, '_store'); | |
// this is where the magic happens a queue of document chunk uploads that limits the number of | |
// concurrent uploads - in just 3 lines of code - got to love RxJs! | |
this._chunkQueue$ | |
.pipe(mergeMap((data: any) => data, null, this._maxConnections)) | |
.subscribe(); | |
} | |
public uploadDocument(documentData: UploadDocument) { | |
const crc32 = new CRC32(); // crc is used to validate file when chunks recombined on server | |
const chunkSizes = this.calculateChunks(documentData.document.size, this.chunkSize); | |
documentData.chunkSizes = chunkSizes; | |
const getChunks$ = this.getChunks(0, documentData); | |
const chunkArray$ = getChunks$.toArray(); | |
const calculateCrc$ = getChunks$.pipe(reduce((acc: number, chunk: Chunk) => { | |
return acc ^ crc32.computeHash(chunk.byteArray, 0, chunk.byteArray.length); | |
}, 0)); | |
this.startChunkUpload(chunkArray$, calculateCrc$); | |
getChunks$.connect(); | |
} | |
private calculateChunks(fileSize: number, chunkSize: number) { | |
const chunkSizes: ChunkSize[] = []; | |
const numberOfChunks = Math.max(Math.ceil(fileSize / chunkSize), 1); | |
for (let offset = 0; offset < numberOfChunks; offset++) { | |
const startByte = offset * chunkSize; | |
const endByte = Math.min(fileSize, (offset + 1) * chunkSize); | |
chunkSizes.push( | |
{ startByte, endByte } | |
); | |
} | |
return chunkSizes; | |
} | |
private getChunks(documentData: UploadDocument) { | |
return Observable.create((observer: Observer<Chunk>) => { | |
this.chunkReader(0, documentData, observer); | |
}).publish(); | |
} | |
private chunkReader(index: number, documentData: UploadDocument, observer: Observer<Chunk>) { | |
if (index >= documentData.chunkSizes.length) { | |
observer.complete(); | |
} else { | |
const fileReader = new FileReader(); | |
const chunk = documentData.chunkSizes[index]; | |
fileReader.onloadend = (evt: any) => { | |
const unint8Array = new Uint8Array(evt.target.result); | |
observer.next({ | |
documentId: documentData.documentId, | |
fileSize: documentData.document.size, | |
fileName: documentData.document.name, | |
byteArray: unint8Array, | |
sequence: index + 1, | |
totalChunks: documentData.chunkSizes.length, | |
isLast: ((index === documentData.chunkSizes.length - 1)) ? true : false, | |
data: null, | |
fileCrc: null, | |
}); | |
this.chunkReader(index + 1, documentData, observer); | |
}; | |
const chunkBlob = documentData.document.slice(chunk.startByte, chunk.endByte); | |
fileReader.readAsArrayBuffer(chunkBlob); | |
} | |
} | |
private startChunkUpload(chunkArray$: Observable<Chunk[]>, calculateCrc$: Observable<number>) { | |
Observable.combineLatest(chunkArray$, calculateCrc$, (chunksArray: Chunk[], crc: number) => { | |
const chunksWithCrc = chunksArray.map(chunk => { | |
chunk.fileCrc = crc; | |
return chunk; | |
}); | |
return chunksWithCrc; | |
}) | |
.pipe( | |
filter((chunks: Chunk[]) => chunks.length > 0), | |
map((chunks: Chunk[]) => this.uploadFirstChunk(chunks)) | |
) | |
.subscribe(); | |
} | |
private uploadFirstChunk(chunks: Chunk[]) { | |
const firstChunk = chunks[0]; | |
firstChunk.data = new Blob([firstChunk.byteArray]); | |
firstChunk.byteArray = null; | |
const firstChunk$ = this._apiService.uploadChunk(chunks[0]) | |
.pipe( | |
filter((event: HttpEvent<any>) => event.type === HttpEventType.UploadProgress || event instanceof HttpResponse), | |
tap((event: HttpProgressEvent | HttpResponse<any>) => { | |
if (event.type === HttpEventType.UploadProgress) { | |
this.trackProgress(firstChunk, event.loaded, event.total); | |
} | |
}), | |
filter((event: HttpEvent<any>) => event instanceof HttpResponse), | |
take(1), | |
map((event: HttpResponse<any>) => { | |
const body = event.body; | |
const uploadDocumentId = body.uploadDocumentId; | |
if (chunks.length > 1) { | |
return this.uploadRemainingChunks(chunks, uploadDocumentId); | |
} else { | |
const documentUploadRequestId = body.documentUploadRequestId; | |
this.completeUpload(firstChunk.placeholder, firstChunk.tabId, event.status, documentUploadRequestId); | |
} | |
}) | |
) | |
.catch((error) => this.handleError(firstChunk, error)); | |
this._chunkQueue$.next(firstChunk$); | |
} | |
private uploadRemainingChunks(chunks: Chunk[], uploadDocumentId: number) { | |
const remainingChunks = chunks | |
.filter((chunk: Chunk) => chunk.sequence > 1) | |
.map((chunk: Chunk) => { | |
chunk.uploadDocumentId = uploadDocumentId; | |
return chunk; | |
}); | |
this.uploadChunk(0, remainingChunks); | |
} | |
private uploadChunk(index: number, chunks: Chunk[]) { | |
const chunk = chunks[index]; | |
chunk.data = new Blob([chunk.byteArray]); | |
chunk.fileSize = chunk.data.size; | |
chunk.byteArray = null; | |
const chunk$ = this._apiService.uploadChunk(chunk) | |
.pipe(filter((event: HttpEvent<any>) => event.type === HttpEventType.UploadProgress | |
|| event instanceof HttpResponse), | |
tap((event: HttpProgressEvent | HttpResponse<any>) => { | |
if (event.type === HttpEventType.UploadProgress) { | |
this.trackProgress(chunk, event.loaded, event.total); | |
} | |
}), | |
filter((event: HttpEvent<any>) => event instanceof HttpResponse), | |
take(1), | |
map((event: HttpResponse<any>) => { | |
if (chunk.sequence === chunk.totalChunks) { | |
const documentUploadRequestId = event.body.documentUploadRequestId; | |
this.completeUpload(chunk.placeholder, chunk.tabId, event.status, documentUploadRequestId); | |
} else { | |
this.uploadChunk(index + 1, chunks); | |
} | |
return { status: 'done' }; | |
}) | |
) | |
.catch((error) => this.handleError(chunk, error)); | |
this._chunkQueue$.next(chunk$); | |
} | |
private trackProgress(chunk: Chunk, loaded: number, total: number) { | |
const chunkPercentage = (100 / chunk.totalChunks); | |
const percentageComplete = (chunk.sequence - 1) * chunkPercentage; | |
const percentDone = Math.round(((100 * loaded / total) / chunk.totalChunks) + percentageComplete); | |
this._store.dispatch(new UpdateUploadProgressAction({ | |
documentId: chunk.documentId, percentDone: percentDone | |
})); | |
} | |
private completeUpload(placeholder: Placeholder, tabId: number, status: number, documentUploadRequestId: number) { | |
const document = placeholder.document; | |
if (status === 200) { | |
document.uploading = true; | |
} | |
document.tempName = null; | |
document.clientUploading = false; | |
this._store.dispatch(new ClearUploadProgressAction(placeholder.document.documentId)); | |
this._store.dispatch(new EnableRefreshAction()); | |
this._store.dispatch(new UpdatePlaceholderAction({ | |
tabId: tabId, | |
placeholder: placeholder, | |
immutable: true | |
})); | |
} | |
private handleError(chunk: Chunk, error: any) { | |
const placeholder = chunk.placeholder; | |
placeholder.document.clientUploading = false; | |
const tabId = chunk.tabId; | |
this._store.dispatch(new UpdatePlaceholderAction({ tabId, placeholder, immutable: true })); | |
return Observable.of(error); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment