Created
September 13, 2016 14:01
-
-
Save ndelitski/bd10dd1a09aea14197462c739e7a72e7 to your computer and use it in GitHub Desktop.
Gmail Threads Processing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import _ from 'lodash'; | |
import B, {promisifyAll} from 'bluebird'; | |
import {Log, trace} from '@synccloud/logging'; | |
import {assertType} from '@synccloud/utils'; | |
import {HtmlConverter, Markup, StripBlockquoteHandler} from '@synccloud/mime-letter-markup'; | |
import {serializeDate, deserializeDate} from './date-convertor.es6'; | |
import {MailParser} from 'mailparser'; | |
import s3Stream from 's3-upload-stream'; | |
import {S3} from 'aws-sdk'; | |
import streamLength from 'length-stream'; | |
import moment from 'moment'; | |
import contentDisposition from 'content-disposition'; | |
import {PermissionGmailError} from './errors'; | |
import mailcomposer from 'mailcomposer'; | |
import base64Url from 'base64url'; | |
import path from 'path'; | |
// сколько тредов закачиваем при первой синхронизации | |
const FIRST_SYNC_THREADS_COUNT = 14; | |
export default class GmailThreads { | |
constructor(options) { | |
this.options = options; | |
this.s3 = promisifyAll(new S3(options.s3)); | |
this.s3StreamUploader = s3Stream(this.s3); | |
this.google = options.google; | |
this.backend = options.backend; | |
} | |
async addMessageToThreadAsync({providingAccountId, threadId, messageId}) { | |
let backendThread = await this.backend.getThreadAsync({ | |
threadId, | |
providingAccountId, | |
withHidden: true, | |
withTransformed: true, | |
withMessages: true, | |
withAttachments: false | |
}); | |
if (backendThread) { | |
const [message] = await this.loadMessagesAsync({providingAccountId, messageIds: [messageId]}); | |
if (message.threadId != threadId) { | |
throw new Error(`message id=${messageId} doesn't belong to thread id=${threadId}`); | |
} | |
const threadJson = JSON.parse(backendThread.json); | |
return await this.backend.updateThreadAsync({ | |
id: backendThread.id, | |
historyId: parseInt(message.historyId), // todo or get from latest message historyId? | |
json: _.assign(threadJson, { | |
updatedAt: serializeDate(new Date()), | |
messagesCount: threadJson.messagesCount + 1 | |
}), | |
messages: [message] | |
}); | |
} else { | |
throw new Error(`Cannot add message because thread_id=${threadId}, account_id=${providingAccountId} doesnt exist on server`); | |
} | |
} | |
async processMultipleThreadsAsync({threads, providingAccountId}) { | |
for (let {id, threadId} of threads) { | |
await this.processSingleThreadAsync({id, providingAccountId, threadId}); | |
} | |
} | |
async processSingleThreadAsync({id, providingAccountId, threadId}) { | |
let backendThread = await this.backend.getThreadAsync({ | |
threadId, | |
providingAccountId, | |
withHidden: true, | |
withTransformed: true, | |
withMessages: true, | |
withAttachments: false | |
}); | |
let gmailThread; | |
try { | |
gmailThread = await this.google.fetchGmailThreadAsync({ | |
threadId, | |
fields: 'historyId,id,messages(id)' | |
}); | |
} catch (err) { | |
Log.warning(()=>({ | |
msg: 'Failed to get gmail thread with GMAIL API', | |
threadId, | |
error: err | |
}), ({message:m})=>`${m.msg} thread_id=${m.threadId} error:${Log.format(m.error)}`); | |
return; | |
} | |
Log.info(()=> ({ | |
msg: 'Receive thread from Gmail', | |
providingAccountId, | |
thread: gmailThread | |
}), ({message:m})=>`${m.msg} thread_id=${m.thread.id} of account_id=${m.providingAccountId}\n${Log.format(m.thread)}`); | |
const { | |
historyId, | |
messages, | |
} = gmailThread; | |
let loadedMessages = []; | |
let json; | |
// Заполняем инфу по треду (без месседжей) | |
if (!backendThread || !backendThread.messages || !backendThread.messages.length) { | |
const firstMessageId = gmailThread.messages[0] | |
? gmailThread.messages[0].id | |
: threadId; | |
const [rootMessage] = await this.loadMessagesAsync({messageIds: [firstMessageId], providingAccountId}); | |
let lastMessageDate = rootMessage.mime.date; | |
if (gmailThread.messages.length > 1) { | |
const lastMessage = await this.google.getMessageWithDate({id: _.last(gmailThread.messages).id}); | |
lastMessageDate = lastMessage.date; | |
} | |
loadedMessages.push(rootMessage); | |
json = { | |
updatedAt: serializeDate(new Date()), | |
messagesCount: gmailThread.messages.length, | |
tags: rootMessage.json.labelIds, | |
from: rootMessage.mime.from, | |
to: rootMessage.mime.to, | |
cc: rootMessage.mime.cc, | |
body: rootMessage.mime.textBody, | |
subject: rootMessage.mime.subject | |
}; | |
if (!backendThread) { | |
backendThread = await this.backend.createThreadAsync({ | |
id, | |
threadId, | |
providingAccountId, | |
historyId: parseInt(historyId), | |
firstMessageMimeDate: rootMessage.mime.date, | |
lastMessageMimeDate: lastMessageDate, | |
json | |
}); | |
} | |
} | |
// Догружаем нехватающие в треде месседжи и мерджим | |
const existingMessageIds = _.map(backendThread.messages, 'messageId'); | |
console.log('existingMessageIds', existingMessageIds); | |
const missedMessagesIds = _.difference(_.map(gmailThread.messages, 'id'), existingMessageIds.concat(_.map(loadedMessages, 'messageId'))); | |
console.log('missedMessagesIds', missedMessagesIds); | |
const fetchedMessages = await this.loadMessagesAsync({messageIds: missedMessagesIds, providingAccountId}); | |
console.log('fetchedMessagesIds', _.map(fetchedMessages, 'messageId')); | |
const newMessages = loadedMessages.concat(fetchedMessages); | |
await this.backend.updateThreadAsync({ | |
id: backendThread.id, | |
historyId: parseInt(historyId), // todo or get from latest message historyId? | |
json: _.assign(JSON.parse(backendThread.json), json || {}, { | |
processingState: 'done', | |
updatedAt: serializeDate(new Date()), | |
messagesCount: gmailThread.messages.length | |
}), | |
messages: newMessages | |
}); | |
} | |
async loadMessagesAsync({messageIds=[], providingAccountId}){ | |
let messages = []; | |
if (!messageIds.length) { | |
return []; | |
} else if (messageIds.length == 1) { | |
messages = [await this.google.fetchGmailMessageAsync({ | |
id: messageIds[0], | |
format: 'raw' | |
})]; | |
} else { | |
messages = await this.google._fetchMessagesInBatchAsync({ | |
messageIds, | |
format: 'raw', | |
ignoreNotFound: true | |
}); | |
} | |
return await B.all(messages) | |
.map(async ({raw, id, threadId, historyId, internalDate, labelIds}) => { | |
const s3Key = `gmail.${providingAccountId}.${id}`; | |
const {parsedText, letter} = await this.parseRawMimeLetterAsync(s3Key, raw); | |
Log.info(() => ({ | |
msg: 'Parsed mime letter', | |
id, | |
letter | |
}), ({message:m})=> `${m.msg} message_id=${m.id}, letter=${Log.format(m.letter)}`); | |
return { | |
messageId: id, | |
threadId, | |
historyId: parseInt(historyId), | |
json: { | |
labelIds, | |
fetchedAt: serializeDate(new Date()), | |
htmlRaw: letter.html, | |
tag: letter.headers['x-synccloud-tag'], | |
headers: { | |
messageId: letter.messageId, | |
replyTo: letter.replyTo, | |
} | |
}, | |
mime: { | |
date: serializeDate(new Date(letter.date || parseInt(internalDate))), // todo convert internalDate to utc | |
bucket: this.options.s3.bucket, | |
messageId: letter.messageId, | |
key: s3Key, | |
subject: letter.subject, | |
textBody: parsedText, | |
htmlBody: parsedText && parsedText.replace(/(\r?\n)/g, '<br>'), | |
from: letter.from[0], | |
to: letter.to, | |
cc: letter.cc, | |
bcc: letter.bcc, | |
}, | |
attachments: letter.attachments.map(({bucket, length, key, name, type})=>({ | |
bucket, | |
key, | |
length, | |
type, | |
name | |
})) | |
}; | |
}, {concurrency: 10}); | |
} | |
async parseRawMimeLetterAsync(s3Key, raw) { | |
const buff = new Buffer(raw, 'base64'); | |
const parser = new MailParser({ | |
showAttachmentLinks: false, | |
streamAttachments: true | |
}); | |
const letter = await new Promise((resolve, reject) => { | |
const promises = []; | |
parser.on('end', async (mail) => { | |
try { | |
mail.attachments = _.compact(await Promise.all(promises)); // remove empty result promises(ignored files) | |
console.log('attacheees', mail.attachments); | |
} catch (err) { | |
Log.error(()=>({ | |
msg: 'Failed to process mime attachment', | |
err, | |
}), ({message:m})=>`${m.msg} error:${Log.format(m.err)}`); | |
reject(err); | |
} | |
try { | |
await this.s3.putObjectAsync({ | |
Bucket: this.options.s3.bucket, | |
Key: s3Key, | |
Body: new Buffer(raw, 'utf8'), | |
ContentType: 'message/rfc822' | |
}); | |
} catch (err) { | |
Log.error(()=>({ | |
msg: 'Failed to save raw letter to S3', | |
s3Options: this.options.s3, | |
err, | |
}), ({message:m})=>`${m.msg} error:${Log.format(m.err)}`); | |
reject(err); | |
} | |
resolve(mail); | |
}); | |
let index = 0; | |
parser.on('attachment', (attachment, mail) => { | |
let promise = (async() => { | |
Log.info( | |
() => ({ | |
msg: 'Parsed MIME attachment', | |
attachment: _.omit(attachment, 'stream') | |
}), | |
({message:m}) => `${m.msg} INDEX=${m.attachment.index}, ${Log.format(m.attachment)}`); | |
// ignore inline attached files | |
if (attachment.contentDisposition === 'inline') { | |
const extension = path.extname(attachment.fileName || attachment.generatedFileName || '').toLowerCase(); | |
if (extension && extension.match(/(jpg|jpeg|png|gif)$/)) { | |
const buffer = attachment.buffer = await readToBufferAsync(attachment.stream); | |
if (buffer.length <= 8 * 1024) { // > 8kb | |
Log.info( | |
() => ({ | |
msg: 'Ignoring inline attachment due to lower size < 8KB', | |
attachment: _.omit(attachment, 'stream', 'buffer') | |
}), | |
({message:m}) => `${m.msg} INDEX=${m.attachment.index}, ${Log.format(m.attachment)}`); | |
return; | |
} | |
} | |
} | |
return await this._uploadAttachmentAsync(s3Key, attachment, index++); | |
})(); | |
promises.push(Promise.resolve(promise)); | |
}); | |
parser.on('error', reject); | |
parser.write(buff); | |
parser.end(); | |
}); | |
const htmlConverter = new HtmlConverter({ | |
handlers: [new StripBlockquoteHandler()] | |
}); | |
let markup; | |
// prefer text version over html | |
if (letter.html) { | |
markup = await htmlConverter.parseAsync(letter.html) | |
} else if (letter.text) { | |
// convert html to text before | |
markup = Markup.fromPlain(letter.text); | |
} else { | |
markup = Markup.fromPlain(''); // empty letter with no html and text | |
} | |
const {plain} = markup.stripMailSignature(); | |
return { | |
parsedText: plain, | |
letter | |
}; | |
} | |
async _uploadAttachmentAsync(key, attachment, index) { | |
const result = { | |
index: index, | |
bucket: this.options.s3.bucket, | |
key: `${key}-attachment_${index}`, | |
name: attachment.fileName || attachment.generatedFileName || `attachment_${index}`, | |
//length: attachment.length, // zero length for streamed file | |
type: attachment.contentType | |
}; | |
try { | |
Log.info( | |
() => ({ | |
msg: 'Uploading MIME part', | |
uploadResult: result | |
}), | |
({message:m}) => `${m.msg} INDEX=${m.uploadResult.index}, KEY=${m.uploadResult.key}, BUCKET=${m.uploadResult.bucket}`); | |
const attachBodyBuffer = attachment.buffer || await readToBufferAsync(attachment.stream); | |
result.length = attachBodyBuffer.length; | |
if (!attachBodyBuffer.length) { | |
Log.warning( | |
() => ({ | |
msg: 'MIME part has zero bytes length, ignoring it', | |
uploadResult: result | |
}), ({message:m}) => `${m.msg} INDEX=${m.uploadResult.index}, KEY=${m.uploadResult.key}, BUCKET=${m.uploadResult.bucket}`); | |
return; | |
} | |
await this.s3.putObjectAsync({ | |
Key: result.key, | |
Bucket: result.bucket, | |
ContentType: result.type, | |
ContentDisposition: contentDisposition(result.name), | |
Body: attachBodyBuffer | |
}); | |
Log.info( | |
() => ({ | |
msg: 'Uploaded MIME part', | |
uploadResult: result | |
}), | |
({message:m}) => `${m.msg} INDEX=${m.uploadResult.index}, KEY=${m.uploadResult.key}, ` + | |
`BUCKET=${m.uploadResult.bucket}`); | |
return result; | |
} | |
catch (exc) { | |
Log.error( | |
() => ({ | |
msg: 'Failed to upload attachment', | |
uploadResult: result, | |
exception: exc | |
}), | |
({message:m}) => `${m.msg} INDEX=${m.uploadResult.index}, KEY=${m.uploadResult.key}, ` + | |
`BUCKET=${m.uploadResult.bucket}`); | |
throw exc; | |
} | |
} | |
async getNewMessagesAsync({lastHistoryId, providingAccountId}) { | |
if (lastHistoryId) { | |
return await this.google.getMessagesSinceLastHistoryIdAsync({lastHistoryId}); | |
} | |
const topThread = await this.backend.getTopThreadAsync({providingAccountId}); | |
const params = {}; | |
let messages; | |
// Если у нас вообще нет тредов считаем что это первая синхронизация | |
if (!topThread) { | |
params.afterDate = moment.utc().add('months', -2).toDate(); | |
params.maxThreads = FIRST_SYNC_THREADS_COUNT; | |
messages = await this.google.listMessagesAsync({...params}); | |
} else { | |
const {lastMessageMimeDate} = topThread; | |
// история действительна только в течении примерно недели | |
if (deserializeDate(lastMessageMimeDate) <= moment.utc().add('days', 9)) { | |
// запрашиваем сообщения начиная с последнего дня последнего скачанного сообщения | |
params.afterDate = deserializeDate(topThread.lastMessageMimeDate); | |
messages = await this.google.listMessagesAsync({...params}); | |
} else { | |
// берем из истории | |
try { | |
messages = await this.google.getMessagesSinceLastHistoryIdAsync({lastHistoryId: topThread.historyId}) | |
} catch (err) { | |
console.error(err); | |
console.error(err.stack); | |
throw err; | |
} | |
} | |
} | |
return messages; | |
} | |
async archiveThreadsAsync({providingAccountId, archiveInGmail, threadIds}) { | |
const threads = await this.backend.getThreadsByIdsAsync({providingAccountId, threadIds}); | |
if (archiveInGmail) { | |
for (let threadId of threadIds) { | |
try { | |
await this.google.archiveThreadAsync({threadId}); | |
} catch (err) { | |
if (err instanceof PermissionGmailError) { | |
Log.warning(() => ({ | |
msg: 'Failed to archive due to insufficient permissions', | |
providingAccountId, | |
threadId | |
}), ({message:m}) => `${m.msg} account_id=${m.providingAccountId} threadId=${m.threadId}`); | |
// skip permission errors | |
} else { | |
throw err; | |
} | |
} | |
} | |
} | |
return await this.backend.postAsync('/api/email/gmail/hide-threads', { | |
threadIds: _.map(threads, 'id'), | |
}); | |
} | |
async readThreadsAsync({providingAccountId, readInGmail, threadIds}) { | |
if (!readInGmail) { | |
return; | |
} | |
for (let threadId of threadIds) { | |
try { | |
await this.google.readThreadAsync({threadId}); | |
} catch (err) { | |
if (err instanceof PermissionGmailError) { | |
Log.warning(() => ({ | |
msg: 'Failed to archive due to insufficient permissions', | |
providingAccountId, | |
threadId | |
}), ({message:m}) => `${m.msg} account_id=${m.providingAccountId} threadId=${m.threadId}`); | |
// skip permission errors | |
} else { | |
throw err; | |
} | |
} | |
} | |
} | |
async sendMessageAsync({providingAccountId, replyToMessageId, threadId, toEmails = [], ccEmails = [], htmlBody, textBody, attachmentStreams=[], tag}) { | |
const account = await this.backend.getGoogleAccountAsync({providingAccountId}); | |
const messageIds = await this.google.getAllMessageIdsInThreadAsync({threadId}); | |
let subject; | |
let messageIdHeader; | |
if (replyToMessageId) { | |
const replyToMessage = await this.google.fetchGmailMessageAsync({ | |
messageId: replyToMessageId, | |
format: 'metadata', | |
metadataHeaders: ['Subject', 'Message-ID'] | |
}); | |
if (replyToMessage.threadId !== threadId) { | |
throw new Error(`replyMessageId=${replyToMessageId} doesnt belongs to thread_id=${threadId}`); | |
} | |
subject = findHeader(replyToMessage.payload.headers, 'subject'); | |
messageIdHeader = findHeader(replyToMessage.payload.headers, 'message-id'); | |
} else { | |
const threadMessages = await this.google.fetchGmailThreadAsync({ | |
threadId, | |
format: 'metadata', | |
metadataHeaders: ['Message-ID', 'Subject'] | |
}); | |
const latestMessage = _.last(threadMessages); | |
subject = findHeader(latestMessage.payload.headers, 'subject'); | |
messageIdHeader = findHeader(latestMessage.payload.headers, 'message-id'); | |
} | |
function findHeader(headers, headerName) { | |
const header = _.find(headers, ({name}) => { | |
return name.toLowerCase() === headerName.toLowerCase(); | |
}); | |
return header && header.value; | |
} | |
if (subject && !subject.startsWith('Re:')) { | |
subject = 'Re: '+ subject; | |
} | |
const fromEmail = { | |
address: account.email, | |
name: account.user.name //todo need refresh on every send? because it cached in our backend on first gmail login | |
}; | |
const mail = mailcomposer({ | |
from: fromEmail, | |
inReplyTo: messageIdHeader, | |
references: messageIds, | |
subject, | |
headers: { | |
'x-synccloud-tag': tag, | |
}, | |
attachments: attachmentStreams.map(({fileName, mimeType, stream})=>({ | |
filename: fileName, | |
contentType: mimeType, | |
content: stream | |
})), | |
to: !toEmails.length ? [fromEmail] : toEmails, | |
cc: ccEmails, | |
text: textBody, | |
html: htmlBody | |
}); | |
const rfc822 = await new Promise((resolve, reject) => { | |
eos(mail.createReadStream(), (err, data) => { | |
if (err) { | |
reject(err); | |
} else { | |
resolve(data) | |
} | |
}) | |
}); | |
console.log(rfc822); | |
const encoded = base64Url(rfc822); | |
try { | |
const message = await this.google.messagesClient.sendAsync({ | |
userId: account.email, | |
resource: { | |
raw: encoded, | |
threadId, | |
} | |
}); | |
await this.google.messagesClient.modifyAsync({ | |
userId: account.email, | |
id: message.id, | |
resource: { | |
addLabelIds: [ | |
'INBOX' | |
], | |
removeLabelIds: [ | |
'UNREAD' | |
] | |
} | |
}); | |
return message; | |
} catch (err) { | |
if (err instanceof PermissionGmailError) { | |
Log.warning(() => ({ | |
msg: 'Failed to archive due to insufficient permissions', | |
providingAccountId, | |
threadId | |
}), ({message:m}) => `${m.msg} account_id=${m.providingAccountId} threadId=${m.threadId}`); | |
// skip permission errors | |
} else { | |
throw err; | |
} | |
} | |
} | |
} | |
function eos(stream, cb) { | |
let buf = ''; | |
stream.on('data', function(chunk) { | |
buf+= chunk; | |
}); | |
stream.on('end', function() { | |
cb(null, buf); | |
}); | |
stream.on('error', cb); | |
} | |
function readToBufferAsync(stream) { | |
let buff = []; | |
return new Promise((resolve, reject) => { | |
stream.on('data', (chunk) => { | |
buff.push(chunk); | |
}); | |
stream.on('end', ()=>{ | |
resolve(Buffer.concat(buff)) | |
}); | |
stream.on('error', reject); | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment