Created
December 21, 2016 13:10
-
-
Save Brooooooklyn/1cc45249701c11320f7c657845be72b2 to your computer and use it in GitHub Desktop.
RxJS file uploader demo
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Fetch, Utils } from 'teambition-sdk' | |
import { Observable } from 'rxjs/Observable' | |
import { Subject } from 'rxjs/Subject' | |
import { ReplaySubject } from 'rxjs/ReplaySubject' | |
import { AjaxResponse } from 'rxjs/observable/dom/AjaxObservable' | |
import { Subscriber } from 'rxjs/Subscriber' | |
import * as config from 'config' | |
export interface ChunkMeta { | |
fileType: string | |
fileName: string | |
mimeType: string | |
fileCategory: string | |
fileSize: number | |
chunkSize: number | |
chunks: number | |
created: string | |
fileMD5: string | |
lastUpdated: string | |
uploadedChunks: number[] | |
storage: string | |
token: { | |
userId: string | |
exp: number | |
storage: string | |
} | |
fileKey: string | |
downloadUrl: string | |
thumbnailUrl: string | |
previewUrl: string | |
} | |
export class ChunkUploader { | |
static resume$ = new Subject<void>() | |
static pause$ = new Subject<void>() | |
private fetch = new Fetch() | |
private progress = new Subject<number>() | |
chunkUploader$: Observable<ChunkMeta> = this.getChunkMeta() | |
.concatMap((chunkMeta) => { | |
const progress$ = this.progress | |
const blobs = this.slice(this.file, chunkMeta.chunks, chunkMeta.chunkSize) | |
const uploaded: number[] = [] | |
const dists = blobs.map((blob, index) => { | |
let currentLoaded = 0 | |
const result = this.uploadChunk(chunkMeta, index, blob) | |
return result.progress$ | |
.catch(() => Promise.resolve({ loaded: uploaded[index] * this.file.size })) | |
.do((r) => { | |
currentLoaded = r.loaded / this.file.size | |
uploaded[index] = currentLoaded | |
const percent = uploaded.reduce((acc, val) => acc + (val ? val : 0)) | |
progress$.next(percent) | |
if (percent >= 0.99) { | |
progress$.complete() | |
} | |
}) | |
}) | |
const uploadStream = Observable.from(dists) | |
.mergeAll(this.concurrency) | |
return Observable.forkJoin(uploadStream) | |
.mapTo(chunkMeta) | |
}) | |
progress$ = this.progress | |
.distinctUntilChanged((x, y) => x - y >= 0) | |
settleFile$ = this.chunkUploader$ | |
.concatMap((meta) => { | |
return this.fetch.post(`/upload/chunk/${meta.fileKey}`) | |
.retryWhen(() => ChunkUploader.resume$) | |
.catch((e) => Promise.resolve(e)) | |
}) | |
constructor( | |
private file: File, | |
private strikerToken: string, | |
private chunkRetryCount = 2, | |
private concurrency = 3 | |
) { } | |
slice(file: File, n: number, chunkSize: number): Blob[] { | |
const result: Blob[] = [] | |
for (let i = 0; i < n; i ++) { | |
const startSize = i * chunkSize | |
const slice = file.slice(startSize, i === n - 1 ? startSize + (file.size - startSize) : (i + 1) * chunkSize) | |
result.push(slice) | |
} | |
return result | |
} | |
private getChunkMeta() { | |
this.fetch.setAPIHost(config.FILE_HOST) | |
this.fetch.setHeaders({ | |
Authorization: this.strikerToken | |
}) | |
return this.fetch | |
.post<ChunkMeta>(`/upload/chunk`, { | |
fileSize: this.file.size, | |
fileMD5: Utils.uuid(), | |
lastUpdated: this.file.lastModifiedDate, | |
fileName: this.file.name | |
}) | |
} | |
private uploadChunk(meta: ChunkMeta, index: number, blob: Blob) { | |
const host = `${config.FILE_HOST}/upload/chunk/${meta.fileKey}?chunk=${index + 1}&chunks=${meta.chunks}` | |
const complete$ = new ReplaySubject<void>(1) | |
const progress$: Observable<ProgressEvent | AjaxResponse> = Observable | |
.create((subscriber: Subscriber<ProgressEvent | AjaxResponse>) => { | |
const ajax$ = Observable.ajax({ | |
url: host, | |
body: blob, | |
method: 'post', | |
crossDomain: true, | |
headers: { | |
Authorization: this.strikerToken, | |
'Content-Type': 'application/octet-stream' | |
}, | |
progressSubscriber: subscriber | |
}) | |
.retry(this.chunkRetryCount) | |
.takeUntil(ChunkUploader.pause$) | |
.catch((e) => Promise.resolve(e)) | |
.repeatWhen(() => { | |
return ChunkUploader.resume$ | |
.takeUntil(complete$) | |
}) | |
const subscription = ajax$.subscribe() | |
return () => subscription.unsubscribe() | |
}) | |
.retryWhen(() => ChunkUploader.resume$) | |
.publish() | |
.refCount() | |
return { progress$, meta } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment