Created
December 1, 2024 06:09
-
-
Save onefoursix/f1234189a2f2ce5c5d866d8dbd952045 to your computer and use it in GitHub Desktop.
StreamSets Groovy script to get email attachments (work in progress)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Properties | |
import jakarta.mail.* | |
import jakarta.mail.search.* | |
import java.nio.file.* | |
import java.util.* | |
import java.text.SimpleDateFormat | |
// IMAP Connection Parameters | |
String host = sdc.userParams["IMAP_SERVER_HOST"]; | |
String user = sdc.userParams["EMAIL_USER"]; | |
String password = sdc.userParams["EMAIL_PASSWORD"]; | |
// Set the properties for IMAP connection | |
Properties properties = new Properties(); | |
properties.put("mail.store.protocol", "imap"); | |
properties.put("mail.imap.host", host); | |
properties.put("mail.imap.port", "993"); | |
properties.put("mail.imap.ssl.enable", "true"); | |
// Create an IAMP Session | |
Session session = Session.getInstance(properties, null); | |
Store store = session.getStore("imap"); | |
store.connect(user, password); | |
// Open the desired folder | |
Folder folder = store.getFolder(sdc.userParams["EMAIL_FOLDER"]) | |
folder.open(Folder.READ_ONLY); | |
// Single threaded - no entityName because we need only one offset | |
entityName = '' | |
// Keep track of if we have created a record for an email message | |
recordCreatedforMessage = false | |
// Get the previously saved offset if it exists | |
if (sdc.lastOffsets.containsKey(entityName)) { | |
offset = sdc.lastOffsets.get(entityName) as long | |
} else { | |
// There is no saved offset, so start from STARTING_OFFSET_LOOKBACK_DAYS | |
Calendar calendar = Calendar.getInstance(); | |
calendar.add(Calendar.DAY_OF_MONTH, (-1 * Integer.parseInt(sdc.userParams['STARTING_OFFSET_LOOKBACK_DAYS']))); | |
offset = calendar.getTimeInMillis(); | |
} | |
// IMAP can only search for messages by DAY, so we'll bump the search term back by a day, | |
// and then use the actual offset to only process messages after the actual saved offset | |
// Create a Search Term for the offset 1 day before the offset | |
Date startDate = new Date(offset); | |
Calendar cal = Calendar.getInstance(); | |
cal.setTime(startDate); | |
cal.add(Calendar.DAY_OF_MONTH, -1) | |
startDate = cal.getTime(); | |
SearchTerm searchTerm = new ReceivedDateTerm(ComparisonTerm.GT, startDate); | |
// Get messages using the Search Term | |
Message[] messages = folder.search(searchTerm); | |
// Create a batch | |
cur_batch = sdc.createBatch() | |
// Create a dummy record for error handling | |
record = sdc.createRecord('dummyRecord') | |
// Loop through every message retrieved | |
for (message in messages) { | |
try { | |
// Ignore the message if it was received before the offset | |
if (message.getReceivedDate().getTime() > offset){ | |
recordCreatedforMessage = false | |
// If the message is multipart | |
if (message.isMimeType("multipart/*")) { | |
handleMultipartMessage(message) | |
} | |
if (recordCreatedforMessage == false){ | |
record = createRecord(message) | |
// Bump the offset to the current message | |
offset = message.getReceivedDate().getTime() | |
// Add the record to the current batch | |
cur_batch.add(record) | |
// if the batch is full, process it and start a new one | |
if (cur_batch.size() >= 1) { | |
cur_batch.process(entityName, offset.toString()) | |
cur_batch = sdc.createBatch() | |
} | |
} | |
} | |
if (sdc.isStopped()) { | |
break | |
} | |
} catch (Exception e) { | |
sdc.log.info("MARK " + e.toString()) | |
cur_batch.addError(record, e.toString()) | |
cur_batch.process(entityName, offset.toString()) | |
hasNext = false | |
} | |
if (sdc.isStopped()) { | |
break | |
} | |
} | |
if (cur_batch.size() + cur_batch.errorCount() + cur_batch.eventCount() > 0) { | |
cur_batch.process(entityName, offset.toString()) | |
} | |
// Handle a multipart message | |
def handleMultipartMessage(Message message){ | |
Multipart multipart = (Multipart) message.getContent() | |
// For each message part | |
for (int i = 0; i < multipart.getCount(); i++) { | |
bodyPart = multipart.getBodyPart(i) | |
if (bodyPart != null && bodyPart.getSize() > 0) { | |
attachmentFileName = getAttachmentFileName(message, bodyPart) | |
// If the part contains an attachment | |
if (attachmentFileName != null && attachmentFileName.size() > 0){ | |
// Create the output directory for the local file | |
outputDir = createOutputDir(message, bodyPart) | |
// Save the attachment to a local file. | |
// This is a bit of a hack for now; it would be better | |
// to emit a WholeFileDataFormat record and/or to | |
//parse the attachment into multiple reocrds | |
saveAttachmentToLocalFile(message, bodyPart, outputDir, attachmentFileName) | |
// Create a record for the attachment | |
recordCreatedforMessage = true | |
record = createRecord(message, outputDir, attachmentFileName) | |
// Add the record to the current batch | |
cur_batch.add(record) | |
// Bump the offset to the current message | |
offset = message.getReceivedDate().getTime() | |
// if the batch is full, process it and start a new one | |
if (cur_batch.size() >= 1) { | |
cur_batch.process(entityName, offset.toString()) | |
cur_batch = sdc.createBatch() | |
} | |
} | |
} | |
} | |
} | |
// Returns an attachment file name if an attachment exists or null otherwise | |
String getAttachmentFileName(Message message, BodyPart bodyPart){ | |
String disposition = bodyPart.getDisposition(); | |
attachmentFileName = bodyPart.getFileName(); | |
if (disposition != null && ( | |
disposition.equalsIgnoreCase(Part.ATTACHMENT) || disposition.equalsIgnoreCase(Part.INLINE))) { | |
if(attachmentFileName != null && attachmentFileName.length() > 0){ | |
return attachmentFileName | |
} else { | |
return null | |
} | |
} | |
} | |
// Create the output directory for attachment in the form | |
// of sdc.userParams["LOCAL_FILE_DIR"]/sender/timestamp | |
def createOutputDir(Message message, BodyPart bodyPart) { | |
parentDir = sdc.userParams["LOCAL_FILE_DIR"] | |
sender = message.getFrom()[0].toString() | |
sender = sender.replace("@", "_").replace("<", "").replace(">", "").replace(" ", "_") | |
dateFormat = new SimpleDateFormat("yyyyMMddHHmmss") | |
receivedDate = dateFormat.format(message.getReceivedDate()) | |
return parentDir + "/" + sender + "/" + receivedDate | |
} | |
// Method to save an attachment to a local file | |
def saveAttachmentToLocalFile(Message message, BodyPart bodyPart, String outputDir, String attachmentFileName){ | |
InputStream inputStream = bodyPart.getInputStream(); | |
Path outputPath = Paths.get(outputDir, attachmentFileName); | |
Files.createDirectories(outputPath.getParent()); // Create directories if they don't exist | |
Files.copy(inputStream, outputPath, StandardCopyOption.REPLACE_EXISTING); // Save the attachment | |
} | |
// Method to create a Record for each attachment of a message | |
def createRecord(Message message, String outputDir, String attachmentFileName){ | |
record = createRecord(message) | |
record.value['attachment_file_'] = outputDir | |
record.value['attachment_file_name'] = attachmentFileName | |
record.value['has_attachment'] = true | |
return record | |
} | |
def createRecord(Message message){ | |
messageId = message.getMessageID() | |
receivedDate = message.getReceivedDate() | |
// Create a record | |
record = sdc.createRecord(messageId) | |
record.value = sdc.createMap(false) | |
record.value['messageId'] = messageId | |
record.value['receivedDate'] = receivedDate | |
record.value['subject'] = message.getSubject() | |
record.value['from'] = message.getFrom()[0].toString() | |
record.value['contentType'] = message.getContentType() | |
// Get the toList | |
record.value['toList'] = [] | |
recipients = message.getRecipients(Message.RecipientType.TO); | |
for (r in recipients){ | |
record.value['toList'].add(r.toString()) | |
} | |
// Get the ccList | |
record.value['ccList'] = [] | |
recipients_cc = message.getRecipients(Message.RecipientType.CC); | |
for (r in recipients_cc){ | |
record.value['ccList'].add(r.toString()) | |
} | |
record.value['has_attachment'] = false | |
return record | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment