Skip to content

Instantly share code, notes, and snippets.

@chux0519
Last active June 22, 2018 03:27
Show Gist options
  • Save chux0519/ec33e664817827691fc65748a63af920 to your computer and use it in GitHub Desktop.
Save chux0519/ec33e664817827691fc65748a63af920 to your computer and use it in GitHub Desktop.
aws s3 wrapper
const AWS = require('aws-sdk')
function buildS3Service (options) {
const {
s3Key,
s3Secret
} = options
const s3Client = new AWS.S3({
accessKeyId: s3Key,
secretAccessKey: s3Secret
})
async function getObject ({Bucket, Key}) {
const request = s3Client.getObject({
Bucket,
Key
})
const results = await request.promise()
return results
}
function getObjectStream ({Bucket, Key}) {
return s3Client.getObject({
Bucket,
Key
}).createReadStream()
}
return {
getObject,
getObjectStream
}
}
module.exports = {
buildS3Service
}
// 获取 readable stream
const rstream = s3.getObjectStream({Bucket, Key})
rstream.on('data', chunk => {
// chunk为二进制数据,注意最后一行可能不完整
const rowContent = (remain + chunk.toString()).split('\n')
const len = rowContent.length - 1
// 尾巴残缺部分
remain = rowContent[len]
// 合法的部分
content = fp.concat(content)(fp.slice(0, len)(rowContent))
rows += len
totalRows += len
// 先暂停,消化后再继续读
if (rows >= rowsLimit) rstream.pause()
// 放入 队列中进行消化
while (rows >= rowsLimit) {
illegalRows += handle({content, rowsLimit, doc})
rows -= rowsLimit
content = fp.slice(rowsLimit, content.length)(content)
}
// 继续读
if (rstream.isPaused()) rstream.resume()
})
rstream.on('end', async () => {
// 处理余下的
illegalRows += handle({content, rowsLimit, doc})
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment