Created
November 10, 2017 16:30
-
-
Save thomassuckow/d409c006d780d4dbf7979276626f95bd to your computer and use it in GitHub Desktop.
Nifi Extract Email Body Text Processor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package foo; | |
import javax.mail.BodyPart; | |
import javax.mail.Message; | |
import javax.mail.MessagingException; | |
import javax.mail.internet.MimeMessage; | |
import javax.mail.internet.MimeMultipart; | |
import java.nio.charset.StandardCharsets; | |
import org.apache.nifi.annotation.behavior.InputRequirement; | |
import org.apache.nifi.annotation.documentation.CapabilityDescription; | |
import org.apache.nifi.annotation.documentation.Tags; | |
import org.apache.nifi.components.PropertyDescriptor; | |
import org.apache.nifi.flowfile.FlowFile; | |
import org.apache.nifi.logging.ComponentLog; | |
import org.apache.nifi.processor.AbstractProcessor; | |
import org.apache.nifi.processor.ProcessContext; | |
import org.apache.nifi.processor.ProcessSession; | |
import org.apache.nifi.processor.Relationship; | |
import org.apache.nifi.processor.exception.ProcessException; | |
import java.io.IOException; | |
import java.util.*; | |
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) | |
@Tags({"split", "email"}) | |
@CapabilityDescription("Extracts the plain/text body from a multipart email") | |
public class ExtractEmailBodyText extends AbstractProcessor { | |
public static final Relationship REL_SUCCESS = new Relationship.Builder() | |
.name("success") | |
.description("All successfully extracted FlowFiles are routed to this relationship") | |
.build(); | |
public static final Relationship REL_FAILURE = new Relationship.Builder() | |
.name("failure") | |
.description("FlowFiles are transferred to this relationship when an error occurs") | |
.build(); | |
public static final Set<Relationship> relationships = Collections.unmodifiableSet( | |
new HashSet<>(Arrays.asList(new Relationship[]{REL_SUCCESS, REL_FAILURE}))); | |
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( | |
Arrays.asList()); | |
@Override | |
public Set<Relationship> getRelationships() { | |
return relationships; | |
} | |
@Override | |
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { | |
return properties; | |
} | |
private String getTextFromMessage(Message message) throws MessagingException, IOException { | |
if (message.isMimeType("text/plain")){ | |
return message.getContent().toString(); | |
}else if (message.isMimeType("multipart/*")) { | |
String result = ""; | |
MimeMultipart mimeMultipart = (MimeMultipart)message.getContent(); | |
int count = mimeMultipart.getCount(); | |
for (int i = 0; i < count; i ++){ | |
BodyPart bodyPart = mimeMultipart.getBodyPart(i); | |
if (bodyPart.isMimeType("text/plain")){ | |
result = result + "\n" + bodyPart.getContent(); | |
break; //without break same text appears twice in my tests | |
} | |
} | |
return result; | |
} | |
return ""; | |
} | |
@Override | |
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { | |
final FlowFile flowFile = session.get(); | |
if (flowFile == null) { | |
return; | |
} | |
final ComponentLog logger = getLogger(); | |
try { | |
final FlowFile newFlowFile = session.write(flowFile, (inputStream, outputStream) -> { | |
try { | |
MimeMessage mime = new MimeMessage(null, inputStream); | |
outputStream.write(getTextFromMessage(mime).getBytes(StandardCharsets.UTF_8)); | |
} catch( MessagingException e) { | |
throw new ProcessException("Not an Email", e); | |
} | |
}); | |
session.putAttribute(newFlowFile, "mime.type", "text/plain"); | |
session.transfer(newFlowFile, REL_SUCCESS); | |
} | |
catch (ProcessException e) { | |
logger.error("Error: " + e.getLocalizedMessage()); | |
session.transfer(flowFile, REL_FAILURE); | |
} | |
session.commit(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment