Skip to content

Instantly share code, notes, and snippets.

@orweinberger
Last active February 13, 2022 07:17
Show Gist options
  • Save orweinberger/6d519fd5deaf59bed7dc to your computer and use it in GitHub Desktop.
Save orweinberger/6d519fd5deaf59bed7dc to your computer and use it in GitHub Desktop.
Extract Gmail messages and insert into Elasticsearch
[
{
"_id": "Gmail-Dashboard",
"_type": "dashboard",
"_source": {
"title": "Gmail Dashboard",
"hits": 0,
"description": "",
"panelsJSON": "[{\"col\":1,\"id\":\"Top-10-Worst-senders\",\"row\":4,\"size_x\":2,\"size_y\":5,\"type\":\"visualization\"},{\"col\":3,\"id\":\"Emails-Date-Histogram\",\"row\":1,\"size_x\":10,\"size_y\":3,\"type\":\"visualization\"},{\"col\":3,\"id\":\"Top-10-Senders\",\"row\":4,\"size_x\":2,\"size_y\":5,\"type\":\"visualization\"},{\"col\":1,\"id\":\"Total-Messages\",\"row\":1,\"size_x\":2,\"size_y\":3,\"type\":\"visualization\"},{\"col\":10,\"id\":\"Day-Of-Week\",\"row\":4,\"size_x\":3,\"size_y\":5,\"type\":\"visualization\"},{\"id\":\"Hour-Of-Day\",\"type\":\"visualization\",\"size_x\":5,\"size_y\":5,\"col\":5,\"row\":4}]",
"version": 1,
"timeRestore": false,
"kibanaSavedObjectMeta": {
"searchSourceJSON": "{\"filter\":[{\"query\":{\"query_string\":{\"analyze_wildcard\":true,\"query\":\"*\"}}}]}"
}
}
},
{
"_id": "Top-10-Senders",
"_type": "visualization",
"_source": {
"title": "Top 10 Senders",
"visState": "{\"type\":\"table\",\"params\":{\"perPage\":10,\"showPartialRows\":false,\"showMeticsAtAllLevels\":false},\"aggs\":[{\"id\":\"1\",\"type\":\"count\",\"schema\":\"metric\",\"params\":{}},{\"id\":\"2\",\"type\":\"terms\",\"schema\":\"bucket\",\"params\":{\"field\":\"from.raw\",\"size\":10,\"order\":\"desc\",\"orderBy\":\"1\"}}],\"listeners\":{}}",
"description": "",
"version": 1,
"kibanaSavedObjectMeta": {
"searchSourceJSON": "{\"index\":\"gmail\",\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}},\"filter\":[]}"
}
}
},
{
"_id": "Total-Messages",
"_type": "visualization",
"_source": {
"title": "Total Messages",
"visState": "{\"type\":\"metric\",\"params\":{\"fontSize\":60},\"aggs\":[{\"id\":\"1\",\"type\":\"count\",\"schema\":\"metric\",\"params\":{}}],\"listeners\":{}}",
"description": "",
"version": 1,
"kibanaSavedObjectMeta": {
"searchSourceJSON": "{\"index\":\"gmail\",\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}},\"filter\":[]}"
}
}
},
{
"_id": "Top-10-Worst-senders",
"_type": "visualization",
"_source": {
"title": "Top 10 Worst senders",
"visState": "{\"type\":\"table\",\"params\":{\"perPage\":10,\"showPartialRows\":false,\"showMeticsAtAllLevels\":false},\"aggs\":[{\"id\":\"1\",\"type\":\"count\",\"schema\":\"metric\",\"params\":{}},{\"id\":\"2\",\"type\":\"terms\",\"schema\":\"bucket\",\"params\":{\"field\":\"from.raw\",\"size\":10,\"order\":\"desc\",\"orderBy\":\"1\"}}],\"listeners\":{}}",
"description": "",
"version": 1,
"kibanaSavedObjectMeta": {
"searchSourceJSON": "{\"index\":\"gmail\",\"query\":{\"query_string\":{\"query\":\"dmarc: false AND spf: false AND dkim: false\",\"analyze_wildcard\":true}},\"filter\":[]}"
}
}
},
{
"_id": "Emails-Date-Histogram",
"_type": "visualization",
"_source": {
"title": "Emails Date Histogram",
"visState": "{\"type\":\"histogram\",\"params\":{\"shareYAxis\":true,\"addTooltip\":true,\"addLegend\":true,\"scale\":\"linear\",\"mode\":\"stacked\",\"times\":[],\"addTimeMarker\":false,\"defaultYExtents\":false,\"setYExtents\":false,\"yAxis\":{}},\"aggs\":[{\"id\":\"1\",\"type\":\"count\",\"schema\":\"metric\",\"params\":{}},{\"id\":\"2\",\"type\":\"date_histogram\",\"schema\":\"segment\",\"params\":{\"field\":\"timestamp\",\"interval\":\"auto\",\"customInterval\":\"2h\",\"min_doc_count\":1,\"extended_bounds\":{}}}],\"listeners\":{}}",
"description": "",
"version": 1,
"kibanaSavedObjectMeta": {
"searchSourceJSON": "{\"index\":\"gmail\",\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}},\"filter\":[]}"
}
}
},
{
"_id": "Day-Of-Week",
"_type": "visualization",
"_source": {
"title": "Day Of Week",
"visState": "{\"type\":\"histogram\",\"params\":{\"shareYAxis\":true,\"addTooltip\":true,\"addLegend\":true,\"scale\":\"linear\",\"mode\":\"stacked\",\"times\":[],\"addTimeMarker\":false,\"defaultYExtents\":false,\"setYExtents\":false,\"yAxis\":{}},\"aggs\":[{\"id\":\"1\",\"type\":\"count\",\"schema\":\"metric\",\"params\":{}},{\"id\":\"2\",\"type\":\"terms\",\"schema\":\"segment\",\"params\":{\"field\":\"dayOfWeek\",\"size\":7,\"order\":\"desc\",\"orderBy\":\"1\"}}],\"listeners\":{}}",
"description": "",
"version": 1,
"kibanaSavedObjectMeta": {
"searchSourceJSON": "{\"index\":\"gmail\",\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}},\"filter\":[]}"
}
}
},
{
"_id": "Hour-Of-Day",
"_type": "visualization",
"_source": {
"title": "Hour Of Day",
"visState": "{\"type\":\"histogram\",\"params\":{\"shareYAxis\":true,\"addTooltip\":true,\"addLegend\":true,\"scale\":\"linear\",\"mode\":\"stacked\",\"times\":[],\"addTimeMarker\":false,\"defaultYExtents\":false,\"setYExtents\":false,\"yAxis\":{}},\"aggs\":[{\"id\":\"1\",\"type\":\"count\",\"schema\":\"metric\",\"params\":{}},{\"id\":\"2\",\"type\":\"terms\",\"schema\":\"segment\",\"params\":{\"field\":\"hourOfDay\",\"size\":24,\"order\":\"desc\",\"orderBy\":\"1\"}}],\"listeners\":{}}",
"description": "",
"version": 1,
"kibanaSavedObjectMeta": {
"searchSourceJSON": "{\"index\":\"gmail\",\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}},\"filter\":[]}"
}
}
}
]
var fs = require('fs'),
readline = require('readline'),
google = require('googleapis'),
googleAuth = require('google-auth-library'),
async = require('async'),
addr = require('email-addresses'),
elasticsearch = require('elasticsearch');
var client = new elasticsearch.Client({
host: 'localhost:9200'
});
var weekday = new Array(7);
weekday[0] = "Sunday";
weekday[1] = "Monday";
weekday[2] = "Tuesday";
weekday[3] = "Wednesday";
weekday[4] = "Thursday";
weekday[5] = "Friday";
weekday[6] = "Saturday";
var SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'];
var TOKEN_DIR = (process.env.HOME || process.env.HOMEPATH ||
process.env.USERPROFILE) + '/.credentials/';
var TOKEN_PATH = TOKEN_DIR + 'gmail-api-quickstart.json';
// Load client secrets from a local file.
fs.readFile('client_secret.json', function processClientSecrets(err, content) {
if (err) {
console.log('Error loading client secret file: ' + err);
return;
}
// Authorize a client with the loaded credentials, then call the
// Gmail API.
authorize(JSON.parse(content), start);
});
/**
* Create an OAuth2 client with the given credentials, and then execute the
* given callback function.
*
* @param {Object} credentials The authorization client credentials.
* @param {function} callback The callback to call with the authorized client.
*/
function authorize(credentials, callback) {
var clientSecret = credentials.installed.client_secret;
var clientId = credentials.installed.client_id;
var redirectUrl = credentials.installed.redirect_uris[0];
var auth = new googleAuth();
var oauth2Client = new auth.OAuth2(clientId, clientSecret, redirectUrl);
// Check if we have previously stored a token.
fs.readFile(TOKEN_PATH, function (err, token) {
if (err) {
getNewToken(oauth2Client, callback);
} else {
oauth2Client.credentials = JSON.parse(token);
callback(oauth2Client);
}
});
}
/**
* Get and store new token after prompting for user authorization, and then
* execute the given callback with the authorized OAuth2 client.
*
* @param {google.auth.OAuth2} oauth2Client The OAuth2 client to get token for.
* @param {getEventsCallback} callback The callback to call with the authorized
* client.
*/
function getNewToken(oauth2Client, callback) {
var authUrl = oauth2Client.generateAuthUrl({
access_type: 'offline',
scope: SCOPES
});
console.log('Authorize this app by visiting this url: ', authUrl);
var rl = readline.createInterface({
input: process.stdin,
output: process.stdout
});
rl.question('Enter the code from that page here: ', function (code) {
rl.close();
oauth2Client.getToken(code, function (err, token) {
if (err) {
console.log('Error while trying to retrieve access token', err);
return;
}
oauth2Client.credentials = token;
storeToken(token);
callback(oauth2Client);
});
});
}
/**
* Store token to disk be used in later program executions.
*
* @param {Object} token The token to store to disk.
*/
function storeToken(token) {
try {
fs.mkdirSync(TOKEN_DIR);
} catch (err) {
if (err.code != 'EEXIST') {
throw err;
}
}
fs.writeFile(TOKEN_PATH, JSON.stringify(token));
console.log('Token stored to ' + TOKEN_PATH);
}
/**
* Lists the labels in the user's account.
*
* @param {google.auth.OAuth2} auth An authorized OAuth2 client.
*/
function listMessages(auth, pageToken, cb) {
var gmail = google.gmail('v1');
gmail.users.messages.list({
auth: auth,
userId: 'me',
maxResults: 100,
pageToken: pageToken
}, function (err, response) {
if (err) {
console.log('The API returned an error: ' + err);
return cb(err);
}
var messages = response.messages;
var nextToken = response.nextPageToken;
if (messages.length == 0) {
console.log('No messages found.');
} else {
return cb(null, messages, nextToken);
}
});
}
function getEmail(auth, id, cb) {
var gmail = google.gmail('v1');
gmail.users.messages.get({
auth: auth,
userId: 'me',
id: id
}, function (err, response) {
if (err) {
console.log('The API returned an error: ' + err);
return cb(err);
}
return cb(null, response.payload.headers);
});
}
function start(auth, pageToken) {
console.log('Starting', pageToken);
async.waterfall([function (callback) {
listMessages(auth, pageToken, function (err, messages, nextToken) {
if (err) return callback(err);
callback(null, messages, nextToken);
});
}, function (messages, nextToken, callback) {
var counter = 0;
var arr = [];
messages.forEach(function (m) {
var from = [];
var to = [];
var address;
var subject;
var date;
var spf = false;
var dkim = false;
var dmarc = false;
getEmail(auth, m.id, function (err, headers) {
console.log('Processing: ', m.id);
counter++;
if (err) return callback(err);
if (headers && headers.length > 0) {
headers.forEach(function (h) {
if (h.name === 'To' && h.value.indexOf('@') > -1) {
address = addr.parseAddressList(h.value);
if (address && address.length > 0)
address.forEach(function (t) {
to.push(t.address);
});
}
else if (h.name === 'From') {
address = addr.parseAddressList(h.value);
if (address && address.length > 0)
address.forEach(function (f) {
from.push(f.address);
});
}
else if (h.name === 'Subject') {
subject = h.value;
}
else if (h.name === 'Date') {
try {
date = new Date(h.value).toISOString();
}
catch (x) {
console.log('Could not translate date', x, date);
}
}
else if (h.name === 'Authentication-Results') {
if (h.value.indexOf('spf=pass') > -1)
spf = true;
if (h.value.indexOf('dkim=pass') > -1)
dkim = true;
if (h.value.indexOf('dmarc=pass') > -1)
dmarc = true;
}
});
arr.push({
id: m.id,
from: from,
to: to,
subject: subject,
timestamp: date,
spf: spf,
dkim: dkim,
dmarc: dmarc
});
if (counter >= messages.length)
callback(null, arr, nextToken);
}
else
callback(null, arr, nextToken);
});
});
}], function (err, arr, nextToken) {
pushToElastic(arr);
start(auth, nextToken);
});
}
function pushToElastic(arr) {
var final = [];
arr.forEach(function (a) {
final.push({index: {_index: 'gmail', _type: 'message', _id: a.id}});
final.push({
timestamp: a.timestamp,
hourOfDay: new Date(a.timestamp).getHours(),
dayOfWeek: weekday[new Date(a.timestamp).getDay()],
subject: a.subject,
from: a.from,
to: a.to,
dmarc: a.dmarc,
spf: a.spf,
dkim: a.dkim
})
});
client.bulk({
body: final
}, function (err, resp) {
if (err) console.log(err);
});
}
@maartenvantigkhem
Copy link

var fs = require('fs'),
readline = require('readline'),
google = require('googleapis'),
googleAuth = require('google-auth-library'),
async = require('async'),
addr = require('email-addresses'),
elasticsearch = require('elasticsearch');

var client = new elasticsearch.Client({
host: 'localhost:9200'
});

var weekday = new Array(7);
weekday[0] = "Sunday";
weekday[1] = "Monday";
weekday[2] = "Tuesday";
weekday[3] = "Wednesday";
weekday[4] = "Thursday";
weekday[5] = "Friday";
weekday[6] = "Saturday";

var SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'];
var TOKEN_DIR = (process.env.HOME || process.env.HOMEPATH ||
process.env.USERPROFILE) + '/.credentials/';
var TOKEN_PATH = TOKEN_DIR + 'gmail-api-quickstart.json';

// Load client secrets from a local file.
fs.readFile('client_secret.json', function processClientSecrets(err, content) {
if (err) {
console.log('Error loading client secret file: ' + err);
return;
}
// Authorize a client with the loaded credentials, then call the
// Gmail API.
authorize(JSON.parse(content), start);
});

/**

  • Create an OAuth2 client with the given credentials, and then execute the

  • given callback function.
    *

  • @param {Object} credentials The authorization client credentials.

  • @param {function} callback The callback to call with the authorized client.
    */
    function authorize(credentials, callback) {
    var clientSecret = credentials.installed.client_secret;
    var clientId = credentials.installed.client_id;
    var redirectUrl = credentials.installed.redirect_uris[0];
    var auth = new googleAuth();
    var oauth2Client = new auth.OAuth2(clientId, clientSecret, redirectUrl);

    // Check if we have previously stored a token.
    fs.readFile(TOKEN_PATH, function (err, token) {
    if (err) {
    getNewToken(oauth2Client, callback);
    } else {
    oauth2Client.credentials = JSON.parse(token);
    callback(oauth2Client);
    }
    });
    }

/**

  • Get and store new token after prompting for user authorization, and then
  • execute the given callback with the authorized OAuth2 client.
    *
  • @param {google.auth.OAuth2} oauth2Client The OAuth2 client to get token for.
  • @param {getEventsCallback} callback The callback to call with the authorized
  • client.
    
    */
    function getNewToken(oauth2Client, callback) {
    var authUrl = oauth2Client.generateAuthUrl({
    access_type: 'offline',
    scope: SCOPES
    });
    console.log('Authorize this app by visiting this url: ', authUrl);
    var rl = readline.createInterface({
    input: process.stdin,
    output: process.stdout
    });
    rl.question('Enter the code from that page here: ', function (code) {
    rl.close();
    oauth2Client.getToken(code, function (err, token) {
    if (err) {
    console.log('Error while trying to retrieve access token', err);
    return;
    }
    oauth2Client.credentials = token;
    storeToken(token);
    callback(oauth2Client);
    });
    });
    }

/**

  • Store token to disk be used in later program executions.
    *
  • @param {Object} token The token to store to disk.
    */
    function storeToken(token) {
    try {
    fs.mkdirSync(TOKEN_DIR);
    } catch (err) {
    if (err.code != 'EEXIST') {
    throw err;
    }
    }
    fs.writeFile(TOKEN_PATH, JSON.stringify(token));
    console.log('Token stored to ' + TOKEN_PATH);
    }

/**

  • Lists the labels in the user's account.
    *

  • @param {google.auth.OAuth2} auth An authorized OAuth2 client.
    */
    function listMessages(auth, pageToken, cb) {
    var gmail = google.gmail('v1');
    gmail.users.messages.list({
    auth: auth,
    userId: 'me',
    maxResults: 100,
    pageToken: pageToken
    }, function (err, response) {
    if (err) {
    console.log('The API returned an error: ' + err);
    return cb(err);
    }

    var messages = response.messages;
    var nextToken = response.nextPageToken;
    if (messages.length == 0) {
    console.log('No messages found.');
    } else {
    return cb(null, messages, nextToken);
    }
    });
    }

function getEmail(auth, id, cb) {
var gmail = google.gmail('v1');
gmail.users.messages.get({
auth: auth,
userId: 'me',
id: id
}, function (err, response) {
if (err) {
console.log('The API returned an error: ' + err);
return cb(err);
}
return cb(null, response);
});
}

function start(auth, pageToken) {
console.log('Starting', pageToken);
async.waterfall([function (callback) {
listMessages(auth, pageToken, function (err, messages, nextToken) {
if (err) return callback(err);
callback(null, messages, nextToken);
});
}, function (messages, nextToken, callback) {
var counter = 0;
var arr = [];
messages.forEach(function (m) {
var from = [];
var to = [];
var address;
var subject;
var snippet;
var date;
var body;
var spf = false;
var dkim = false;
var dmarc = false;

        getEmail(auth, m.id, function (err, response) {
            var headers = response.payload.headers;
            counter++;
            if (err) return callback(err);
            if (headers && headers.length > 0) {
                headers.forEach(function (h) {
                    if (h.name === 'To' && h.value.indexOf('@') > -1) {
                        address = addr.parseAddressList(h.value);
                        if (address && address.length > 0)
                        address.forEach(function (t) {
                            to.push(t.address);
                        });
                    }
                    else if (h.name === 'From') {
                        address = addr.parseAddressList(h.value);
                        if (address && address.length > 0)
                        address.forEach(function (f) {
                            from.push(f.address);
                        });
                    }
                    else if (h.name === 'Subject') {
                        subject = h.value;
                    }
                    else if (h.name === 'Date') {
                        try {
                        date = new Date(h.value).toISOString();
                        }
                        catch (x) {
                        console.log('Could not translate date', x, date);
                        }
                    }
                    else if (h.name === 'Authentication-Results') {
                        if (h.value.indexOf('spf=pass') > -1)
                        spf = true;
                        if (h.value.indexOf('dkim=pass') > -1)
                        dkim = true;
                        if (h.value.indexOf('dmarc=pass') > -1)
                        dmarc = true;
                    }
                });

                snippet = response.snippet;
                body = response.payload.body;

                arr.push({
                id: m.id,
                from: from,
                to: to,
                subject: subject,
                timestamp: date,
                spf: spf,
                dkim: dkim,
                dmarc: dmarc,
                snippet: snippet,
                body: body
                });
                if (counter >= messages.length)
                callback(null, arr, nextToken);
            }
            else
                callback(null, arr, nextToken);
        });
    });
}], function (err, arr, nextToken) {
    pushToElastic(arr);
    start(auth, nextToken);
});

}

function pushToElastic(arr) {
var final = [];
arr.forEach(function (a) {
final.push({index: {_index: 'gmail', _type: 'message', _id: a.id}});
final.push({
timestamp: a.timestamp,
hourOfDay: new Date(a.timestamp).getHours(),
dayOfWeek: weekday[new Date(a.timestamp).getDay()],
subject: a.subject,
from: a.from,
to: a.to,
dmarc: a.dmarc,
spf: a.spf,
dkim: a.dkim,
snippet: a.snippet,
body: a.body
})
});
console.log('HELLOOO');
console.log(final);

client.bulk({
body: final
}, function (err, resp) {
if (err) console.log(err);
});
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment