Skip to content

Instantly share code, notes, and snippets.

@onefoursix
Created December 1, 2024 06:09
Show Gist options
  • Save onefoursix/f1234189a2f2ce5c5d866d8dbd952045 to your computer and use it in GitHub Desktop.
Save onefoursix/f1234189a2f2ce5c5d866d8dbd952045 to your computer and use it in GitHub Desktop.
StreamSets Groovy script to get email attachments (work in progress)
import java.util.Properties
import jakarta.mail.*
import jakarta.mail.search.*
import java.nio.file.*
import java.util.*
import java.text.SimpleDateFormat
// IMAP Connection Parameters
String host = sdc.userParams["IMAP_SERVER_HOST"];
String user = sdc.userParams["EMAIL_USER"];
String password = sdc.userParams["EMAIL_PASSWORD"];
// Set the properties for IMAP connection
Properties properties = new Properties();
properties.put("mail.store.protocol", "imap");
properties.put("mail.imap.host", host);
properties.put("mail.imap.port", "993");
properties.put("mail.imap.ssl.enable", "true");
// Create an IAMP Session
Session session = Session.getInstance(properties, null);
Store store = session.getStore("imap");
store.connect(user, password);
// Open the desired folder
Folder folder = store.getFolder(sdc.userParams["EMAIL_FOLDER"])
folder.open(Folder.READ_ONLY);
// Single threaded - no entityName because we need only one offset
entityName = ''
// Keep track of if we have created a record for an email message
recordCreatedforMessage = false
// Get the previously saved offset if it exists
if (sdc.lastOffsets.containsKey(entityName)) {
offset = sdc.lastOffsets.get(entityName) as long
} else {
// There is no saved offset, so start from STARTING_OFFSET_LOOKBACK_DAYS
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.DAY_OF_MONTH, (-1 * Integer.parseInt(sdc.userParams['STARTING_OFFSET_LOOKBACK_DAYS'])));
offset = calendar.getTimeInMillis();
}
// IMAP can only search for messages by DAY, so we'll bump the search term back by a day,
// and then use the actual offset to only process messages after the actual saved offset
// Create a Search Term for the offset 1 day before the offset
Date startDate = new Date(offset);
Calendar cal = Calendar.getInstance();
cal.setTime(startDate);
cal.add(Calendar.DAY_OF_MONTH, -1)
startDate = cal.getTime();
SearchTerm searchTerm = new ReceivedDateTerm(ComparisonTerm.GT, startDate);
// Get messages using the Search Term
Message[] messages = folder.search(searchTerm);
// Create a batch
cur_batch = sdc.createBatch()
// Create a dummy record for error handling
record = sdc.createRecord('dummyRecord')
// Loop through every message retrieved
for (message in messages) {
try {
// Ignore the message if it was received before the offset
if (message.getReceivedDate().getTime() > offset){
recordCreatedforMessage = false
// If the message is multipart
if (message.isMimeType("multipart/*")) {
handleMultipartMessage(message)
}
if (recordCreatedforMessage == false){
record = createRecord(message)
// Bump the offset to the current message
offset = message.getReceivedDate().getTime()
// Add the record to the current batch
cur_batch.add(record)
// if the batch is full, process it and start a new one
if (cur_batch.size() >= 1) {
cur_batch.process(entityName, offset.toString())
cur_batch = sdc.createBatch()
}
}
}
if (sdc.isStopped()) {
break
}
} catch (Exception e) {
sdc.log.info("MARK " + e.toString())
cur_batch.addError(record, e.toString())
cur_batch.process(entityName, offset.toString())
hasNext = false
}
if (sdc.isStopped()) {
break
}
}
if (cur_batch.size() + cur_batch.errorCount() + cur_batch.eventCount() > 0) {
cur_batch.process(entityName, offset.toString())
}
// Handle a multipart message
def handleMultipartMessage(Message message){
Multipart multipart = (Multipart) message.getContent()
// For each message part
for (int i = 0; i < multipart.getCount(); i++) {
bodyPart = multipart.getBodyPart(i)
if (bodyPart != null && bodyPart.getSize() > 0) {
attachmentFileName = getAttachmentFileName(message, bodyPart)
// If the part contains an attachment
if (attachmentFileName != null && attachmentFileName.size() > 0){
// Create the output directory for the local file
outputDir = createOutputDir(message, bodyPart)
// Save the attachment to a local file.
// This is a bit of a hack for now; it would be better
// to emit a WholeFileDataFormat record and/or to
//parse the attachment into multiple reocrds
saveAttachmentToLocalFile(message, bodyPart, outputDir, attachmentFileName)
// Create a record for the attachment
recordCreatedforMessage = true
record = createRecord(message, outputDir, attachmentFileName)
// Add the record to the current batch
cur_batch.add(record)
// Bump the offset to the current message
offset = message.getReceivedDate().getTime()
// if the batch is full, process it and start a new one
if (cur_batch.size() >= 1) {
cur_batch.process(entityName, offset.toString())
cur_batch = sdc.createBatch()
}
}
}
}
}
// Returns an attachment file name if an attachment exists or null otherwise
String getAttachmentFileName(Message message, BodyPart bodyPart){
String disposition = bodyPart.getDisposition();
attachmentFileName = bodyPart.getFileName();
if (disposition != null && (
disposition.equalsIgnoreCase(Part.ATTACHMENT) || disposition.equalsIgnoreCase(Part.INLINE))) {
if(attachmentFileName != null && attachmentFileName.length() > 0){
return attachmentFileName
} else {
return null
}
}
}
// Create the output directory for attachment in the form
// of sdc.userParams["LOCAL_FILE_DIR"]/sender/timestamp
def createOutputDir(Message message, BodyPart bodyPart) {
parentDir = sdc.userParams["LOCAL_FILE_DIR"]
sender = message.getFrom()[0].toString()
sender = sender.replace("@", "_").replace("<", "").replace(">", "").replace(" ", "_")
dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
receivedDate = dateFormat.format(message.getReceivedDate())
return parentDir + "/" + sender + "/" + receivedDate
}
// Method to save an attachment to a local file
def saveAttachmentToLocalFile(Message message, BodyPart bodyPart, String outputDir, String attachmentFileName){
InputStream inputStream = bodyPart.getInputStream();
Path outputPath = Paths.get(outputDir, attachmentFileName);
Files.createDirectories(outputPath.getParent()); // Create directories if they don't exist
Files.copy(inputStream, outputPath, StandardCopyOption.REPLACE_EXISTING); // Save the attachment
}
// Method to create a Record for each attachment of a message
def createRecord(Message message, String outputDir, String attachmentFileName){
record = createRecord(message)
record.value['attachment_file_'] = outputDir
record.value['attachment_file_name'] = attachmentFileName
record.value['has_attachment'] = true
return record
}
def createRecord(Message message){
messageId = message.getMessageID()
receivedDate = message.getReceivedDate()
// Create a record
record = sdc.createRecord(messageId)
record.value = sdc.createMap(false)
record.value['messageId'] = messageId
record.value['receivedDate'] = receivedDate
record.value['subject'] = message.getSubject()
record.value['from'] = message.getFrom()[0].toString()
record.value['contentType'] = message.getContentType()
// Get the toList
record.value['toList'] = []
recipients = message.getRecipients(Message.RecipientType.TO);
for (r in recipients){
record.value['toList'].add(r.toString())
}
// Get the ccList
record.value['ccList'] = []
recipients_cc = message.getRecipients(Message.RecipientType.CC);
for (r in recipients_cc){
record.value['ccList'].add(r.toString())
}
record.value['has_attachment'] = false
return record
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment