Skip to content

Instantly share code, notes, and snippets.

@ndelitski
Created September 13, 2016 14:01
Show Gist options
  • Save ndelitski/bd10dd1a09aea14197462c739e7a72e7 to your computer and use it in GitHub Desktop.
Save ndelitski/bd10dd1a09aea14197462c739e7a72e7 to your computer and use it in GitHub Desktop.
Gmail Threads Processing
import _ from 'lodash';
import B, {promisifyAll} from 'bluebird';
import {Log, trace} from '@synccloud/logging';
import {assertType} from '@synccloud/utils';
import {HtmlConverter, Markup, StripBlockquoteHandler} from '@synccloud/mime-letter-markup';
import {serializeDate, deserializeDate} from './date-convertor.es6';
import {MailParser} from 'mailparser';
import s3Stream from 's3-upload-stream';
import {S3} from 'aws-sdk';
import streamLength from 'length-stream';
import moment from 'moment';
import contentDisposition from 'content-disposition';
import {PermissionGmailError} from './errors';
import mailcomposer from 'mailcomposer';
import base64Url from 'base64url';
import path from 'path';
// сколько тредов закачиваем при первой синхронизации
const FIRST_SYNC_THREADS_COUNT = 14;
export default class GmailThreads {
constructor(options) {
this.options = options;
this.s3 = promisifyAll(new S3(options.s3));
this.s3StreamUploader = s3Stream(this.s3);
this.google = options.google;
this.backend = options.backend;
}
async addMessageToThreadAsync({providingAccountId, threadId, messageId}) {
let backendThread = await this.backend.getThreadAsync({
threadId,
providingAccountId,
withHidden: true,
withTransformed: true,
withMessages: true,
withAttachments: false
});
if (backendThread) {
const [message] = await this.loadMessagesAsync({providingAccountId, messageIds: [messageId]});
if (message.threadId != threadId) {
throw new Error(`message id=${messageId} doesn't belong to thread id=${threadId}`);
}
const threadJson = JSON.parse(backendThread.json);
return await this.backend.updateThreadAsync({
id: backendThread.id,
historyId: parseInt(message.historyId), // todo or get from latest message historyId?
json: _.assign(threadJson, {
updatedAt: serializeDate(new Date()),
messagesCount: threadJson.messagesCount + 1
}),
messages: [message]
});
} else {
throw new Error(`Cannot add message because thread_id=${threadId}, account_id=${providingAccountId} doesnt exist on server`);
}
}
async processMultipleThreadsAsync({threads, providingAccountId}) {
for (let {id, threadId} of threads) {
await this.processSingleThreadAsync({id, providingAccountId, threadId});
}
}
async processSingleThreadAsync({id, providingAccountId, threadId}) {
let backendThread = await this.backend.getThreadAsync({
threadId,
providingAccountId,
withHidden: true,
withTransformed: true,
withMessages: true,
withAttachments: false
});
let gmailThread;
try {
gmailThread = await this.google.fetchGmailThreadAsync({
threadId,
fields: 'historyId,id,messages(id)'
});
} catch (err) {
Log.warning(()=>({
msg: 'Failed to get gmail thread with GMAIL API',
threadId,
error: err
}), ({message:m})=>`${m.msg} thread_id=${m.threadId} error:${Log.format(m.error)}`);
return;
}
Log.info(()=> ({
msg: 'Receive thread from Gmail',
providingAccountId,
thread: gmailThread
}), ({message:m})=>`${m.msg} thread_id=${m.thread.id} of account_id=${m.providingAccountId}\n${Log.format(m.thread)}`);
const {
historyId,
messages,
} = gmailThread;
let loadedMessages = [];
let json;
// Заполняем инфу по треду (без месседжей)
if (!backendThread || !backendThread.messages || !backendThread.messages.length) {
const firstMessageId = gmailThread.messages[0]
? gmailThread.messages[0].id
: threadId;
const [rootMessage] = await this.loadMessagesAsync({messageIds: [firstMessageId], providingAccountId});
let lastMessageDate = rootMessage.mime.date;
if (gmailThread.messages.length > 1) {
const lastMessage = await this.google.getMessageWithDate({id: _.last(gmailThread.messages).id});
lastMessageDate = lastMessage.date;
}
loadedMessages.push(rootMessage);
json = {
updatedAt: serializeDate(new Date()),
messagesCount: gmailThread.messages.length,
tags: rootMessage.json.labelIds,
from: rootMessage.mime.from,
to: rootMessage.mime.to,
cc: rootMessage.mime.cc,
body: rootMessage.mime.textBody,
subject: rootMessage.mime.subject
};
if (!backendThread) {
backendThread = await this.backend.createThreadAsync({
id,
threadId,
providingAccountId,
historyId: parseInt(historyId),
firstMessageMimeDate: rootMessage.mime.date,
lastMessageMimeDate: lastMessageDate,
json
});
}
}
// Догружаем нехватающие в треде месседжи и мерджим
const existingMessageIds = _.map(backendThread.messages, 'messageId');
console.log('existingMessageIds', existingMessageIds);
const missedMessagesIds = _.difference(_.map(gmailThread.messages, 'id'), existingMessageIds.concat(_.map(loadedMessages, 'messageId')));
console.log('missedMessagesIds', missedMessagesIds);
const fetchedMessages = await this.loadMessagesAsync({messageIds: missedMessagesIds, providingAccountId});
console.log('fetchedMessagesIds', _.map(fetchedMessages, 'messageId'));
const newMessages = loadedMessages.concat(fetchedMessages);
await this.backend.updateThreadAsync({
id: backendThread.id,
historyId: parseInt(historyId), // todo or get from latest message historyId?
json: _.assign(JSON.parse(backendThread.json), json || {}, {
processingState: 'done',
updatedAt: serializeDate(new Date()),
messagesCount: gmailThread.messages.length
}),
messages: newMessages
});
}
async loadMessagesAsync({messageIds=[], providingAccountId}){
let messages = [];
if (!messageIds.length) {
return [];
} else if (messageIds.length == 1) {
messages = [await this.google.fetchGmailMessageAsync({
id: messageIds[0],
format: 'raw'
})];
} else {
messages = await this.google._fetchMessagesInBatchAsync({
messageIds,
format: 'raw',
ignoreNotFound: true
});
}
return await B.all(messages)
.map(async ({raw, id, threadId, historyId, internalDate, labelIds}) => {
const s3Key = `gmail.${providingAccountId}.${id}`;
const {parsedText, letter} = await this.parseRawMimeLetterAsync(s3Key, raw);
Log.info(() => ({
msg: 'Parsed mime letter',
id,
letter
}), ({message:m})=> `${m.msg} message_id=${m.id}, letter=${Log.format(m.letter)}`);
return {
messageId: id,
threadId,
historyId: parseInt(historyId),
json: {
labelIds,
fetchedAt: serializeDate(new Date()),
htmlRaw: letter.html,
tag: letter.headers['x-synccloud-tag'],
headers: {
messageId: letter.messageId,
replyTo: letter.replyTo,
}
},
mime: {
date: serializeDate(new Date(letter.date || parseInt(internalDate))), // todo convert internalDate to utc
bucket: this.options.s3.bucket,
messageId: letter.messageId,
key: s3Key,
subject: letter.subject,
textBody: parsedText,
htmlBody: parsedText && parsedText.replace(/(\r?\n)/g, '<br>'),
from: letter.from[0],
to: letter.to,
cc: letter.cc,
bcc: letter.bcc,
},
attachments: letter.attachments.map(({bucket, length, key, name, type})=>({
bucket,
key,
length,
type,
name
}))
};
}, {concurrency: 10});
}
async parseRawMimeLetterAsync(s3Key, raw) {
const buff = new Buffer(raw, 'base64');
const parser = new MailParser({
showAttachmentLinks: false,
streamAttachments: true
});
const letter = await new Promise((resolve, reject) => {
const promises = [];
parser.on('end', async (mail) => {
try {
mail.attachments = _.compact(await Promise.all(promises)); // remove empty result promises(ignored files)
console.log('attacheees', mail.attachments);
} catch (err) {
Log.error(()=>({
msg: 'Failed to process mime attachment',
err,
mail
}), ({message:m})=>`${m.msg} error:${Log.format(m.err)}`);
reject(err);
}
try {
await this.s3.putObjectAsync({
Bucket: this.options.s3.bucket,
Key: s3Key,
Body: new Buffer(raw, 'utf8'),
ContentType: 'message/rfc822'
});
} catch (err) {
Log.error(()=>({
msg: 'Failed to save raw letter to S3',
s3Options: this.options.s3,
err,
mail
}), ({message:m})=>`${m.msg} error:${Log.format(m.err)}`);
reject(err);
}
resolve(mail);
});
let index = 0;
parser.on('attachment', (attachment, mail) => {
let promise = (async() => {
Log.info(
() => ({
msg: 'Parsed MIME attachment',
attachment: _.omit(attachment, 'stream')
}),
({message:m}) => `${m.msg} INDEX=${m.attachment.index}, ${Log.format(m.attachment)}`);
// ignore inline attached files
if (attachment.contentDisposition === 'inline') {
const extension = path.extname(attachment.fileName || attachment.generatedFileName || '').toLowerCase();
if (extension && extension.match(/(jpg|jpeg|png|gif)$/)) {
const buffer = attachment.buffer = await readToBufferAsync(attachment.stream);
if (buffer.length <= 8 * 1024) { // > 8kb
Log.info(
() => ({
msg: 'Ignoring inline attachment due to lower size < 8KB',
attachment: _.omit(attachment, 'stream', 'buffer')
}),
({message:m}) => `${m.msg} INDEX=${m.attachment.index}, ${Log.format(m.attachment)}`);
return;
}
}
}
return await this._uploadAttachmentAsync(s3Key, attachment, index++);
})();
promises.push(Promise.resolve(promise));
});
parser.on('error', reject);
parser.write(buff);
parser.end();
});
const htmlConverter = new HtmlConverter({
handlers: [new StripBlockquoteHandler()]
});
let markup;
// prefer text version over html
if (letter.html) {
markup = await htmlConverter.parseAsync(letter.html)
} else if (letter.text) {
// convert html to text before
markup = Markup.fromPlain(letter.text);
} else {
markup = Markup.fromPlain(''); // empty letter with no html and text
}
const {plain} = markup.stripMailSignature();
return {
parsedText: plain,
letter
};
}
async _uploadAttachmentAsync(key, attachment, index) {
const result = {
index: index,
bucket: this.options.s3.bucket,
key: `${key}-attachment_${index}`,
name: attachment.fileName || attachment.generatedFileName || `attachment_${index}`,
//length: attachment.length, // zero length for streamed file
type: attachment.contentType
};
try {
Log.info(
() => ({
msg: 'Uploading MIME part',
uploadResult: result
}),
({message:m}) => `${m.msg} INDEX=${m.uploadResult.index}, KEY=${m.uploadResult.key}, BUCKET=${m.uploadResult.bucket}`);
const attachBodyBuffer = attachment.buffer || await readToBufferAsync(attachment.stream);
result.length = attachBodyBuffer.length;
if (!attachBodyBuffer.length) {
Log.warning(
() => ({
msg: 'MIME part has zero bytes length, ignoring it',
uploadResult: result
}), ({message:m}) => `${m.msg} INDEX=${m.uploadResult.index}, KEY=${m.uploadResult.key}, BUCKET=${m.uploadResult.bucket}`);
return;
}
await this.s3.putObjectAsync({
Key: result.key,
Bucket: result.bucket,
ContentType: result.type,
ContentDisposition: contentDisposition(result.name),
Body: attachBodyBuffer
});
Log.info(
() => ({
msg: 'Uploaded MIME part',
uploadResult: result
}),
({message:m}) => `${m.msg} INDEX=${m.uploadResult.index}, KEY=${m.uploadResult.key}, ` +
`BUCKET=${m.uploadResult.bucket}`);
return result;
}
catch (exc) {
Log.error(
() => ({
msg: 'Failed to upload attachment',
uploadResult: result,
exception: exc
}),
({message:m}) => `${m.msg} INDEX=${m.uploadResult.index}, KEY=${m.uploadResult.key}, ` +
`BUCKET=${m.uploadResult.bucket}`);
throw exc;
}
}
async getNewMessagesAsync({lastHistoryId, providingAccountId}) {
if (lastHistoryId) {
return await this.google.getMessagesSinceLastHistoryIdAsync({lastHistoryId});
}
const topThread = await this.backend.getTopThreadAsync({providingAccountId});
const params = {};
let messages;
// Если у нас вообще нет тредов считаем что это первая синхронизация
if (!topThread) {
params.afterDate = moment.utc().add('months', -2).toDate();
params.maxThreads = FIRST_SYNC_THREADS_COUNT;
messages = await this.google.listMessagesAsync({...params});
} else {
const {lastMessageMimeDate} = topThread;
// история действительна только в течении примерно недели
if (deserializeDate(lastMessageMimeDate) <= moment.utc().add('days', 9)) {
// запрашиваем сообщения начиная с последнего дня последнего скачанного сообщения
params.afterDate = deserializeDate(topThread.lastMessageMimeDate);
messages = await this.google.listMessagesAsync({...params});
} else {
// берем из истории
try {
messages = await this.google.getMessagesSinceLastHistoryIdAsync({lastHistoryId: topThread.historyId})
} catch (err) {
console.error(err);
console.error(err.stack);
throw err;
}
}
}
return messages;
}
async archiveThreadsAsync({providingAccountId, archiveInGmail, threadIds}) {
const threads = await this.backend.getThreadsByIdsAsync({providingAccountId, threadIds});
if (archiveInGmail) {
for (let threadId of threadIds) {
try {
await this.google.archiveThreadAsync({threadId});
} catch (err) {
if (err instanceof PermissionGmailError) {
Log.warning(() => ({
msg: 'Failed to archive due to insufficient permissions',
providingAccountId,
threadId
}), ({message:m}) => `${m.msg} account_id=${m.providingAccountId} threadId=${m.threadId}`);
// skip permission errors
} else {
throw err;
}
}
}
}
return await this.backend.postAsync('/api/email/gmail/hide-threads', {
threadIds: _.map(threads, 'id'),
});
}
async readThreadsAsync({providingAccountId, readInGmail, threadIds}) {
if (!readInGmail) {
return;
}
for (let threadId of threadIds) {
try {
await this.google.readThreadAsync({threadId});
} catch (err) {
if (err instanceof PermissionGmailError) {
Log.warning(() => ({
msg: 'Failed to archive due to insufficient permissions',
providingAccountId,
threadId
}), ({message:m}) => `${m.msg} account_id=${m.providingAccountId} threadId=${m.threadId}`);
// skip permission errors
} else {
throw err;
}
}
}
}
async sendMessageAsync({providingAccountId, replyToMessageId, threadId, toEmails = [], ccEmails = [], htmlBody, textBody, attachmentStreams=[], tag}) {
const account = await this.backend.getGoogleAccountAsync({providingAccountId});
const messageIds = await this.google.getAllMessageIdsInThreadAsync({threadId});
let subject;
let messageIdHeader;
if (replyToMessageId) {
const replyToMessage = await this.google.fetchGmailMessageAsync({
messageId: replyToMessageId,
format: 'metadata',
metadataHeaders: ['Subject', 'Message-ID']
});
if (replyToMessage.threadId !== threadId) {
throw new Error(`replyMessageId=${replyToMessageId} doesnt belongs to thread_id=${threadId}`);
}
subject = findHeader(replyToMessage.payload.headers, 'subject');
messageIdHeader = findHeader(replyToMessage.payload.headers, 'message-id');
} else {
const threadMessages = await this.google.fetchGmailThreadAsync({
threadId,
format: 'metadata',
metadataHeaders: ['Message-ID', 'Subject']
});
const latestMessage = _.last(threadMessages);
subject = findHeader(latestMessage.payload.headers, 'subject');
messageIdHeader = findHeader(latestMessage.payload.headers, 'message-id');
}
function findHeader(headers, headerName) {
const header = _.find(headers, ({name}) => {
return name.toLowerCase() === headerName.toLowerCase();
});
return header && header.value;
}
if (subject && !subject.startsWith('Re:')) {
subject = 'Re: '+ subject;
}
const fromEmail = {
address: account.email,
name: account.user.name //todo need refresh on every send? because it cached in our backend on first gmail login
};
const mail = mailcomposer({
from: fromEmail,
inReplyTo: messageIdHeader,
references: messageIds,
subject,
headers: {
'x-synccloud-tag': tag,
},
attachments: attachmentStreams.map(({fileName, mimeType, stream})=>({
filename: fileName,
contentType: mimeType,
content: stream
})),
to: !toEmails.length ? [fromEmail] : toEmails,
cc: ccEmails,
text: textBody,
html: htmlBody
});
const rfc822 = await new Promise((resolve, reject) => {
eos(mail.createReadStream(), (err, data) => {
if (err) {
reject(err);
} else {
resolve(data)
}
})
});
console.log(rfc822);
const encoded = base64Url(rfc822);
try {
const message = await this.google.messagesClient.sendAsync({
userId: account.email,
resource: {
raw: encoded,
threadId,
}
});
await this.google.messagesClient.modifyAsync({
userId: account.email,
id: message.id,
resource: {
addLabelIds: [
'INBOX'
],
removeLabelIds: [
'UNREAD'
]
}
});
return message;
} catch (err) {
if (err instanceof PermissionGmailError) {
Log.warning(() => ({
msg: 'Failed to archive due to insufficient permissions',
providingAccountId,
threadId
}), ({message:m}) => `${m.msg} account_id=${m.providingAccountId} threadId=${m.threadId}`);
// skip permission errors
} else {
throw err;
}
}
}
}
function eos(stream, cb) {
let buf = '';
stream.on('data', function(chunk) {
buf+= chunk;
});
stream.on('end', function() {
cb(null, buf);
});
stream.on('error', cb);
}
function readToBufferAsync(stream) {
let buff = [];
return new Promise((resolve, reject) => {
stream.on('data', (chunk) => {
buff.push(chunk);
});
stream.on('end', ()=>{
resolve(Buffer.concat(buff))
});
stream.on('error', reject);
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment