Last active
November 11, 2024 09:08
-
-
Save otobrglez/238cfbedbb9553aee8b3147b2df853e3 to your computer and use it in GitHub Desktop.
A simple stream of emails from IMAP server.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import eu.timepit.refined.auto.autoUnwrap | |
import jakarta.mail.event.{MessageCountEvent, MessageCountListener} | |
import jakarta.mail.{Message, Session, Store} | |
import org.eclipse.angus.mail.imap.{IMAPFolder, SortTerm} | |
import zio.ZIO.{attempt, logInfo} | |
import zio.stream.{Stream, ZStream} | |
import zio.{Chunk, RIO, Scope, Task, ZIO} | |
import zio.durationInt | |
final case class MessageID( | |
uid: Option[Long], // IMAP UID | |
messageID: Option[String] // RFC 822 - Message ID Header | |
) | |
final case class EmailMessage( | |
messageID: MessageID, | |
subject: String, | |
from: String, | |
content: String, | |
flags: Set[SafeFlags] | |
) | |
object EmailStream: | |
import FlagOps.* | |
private def setupSSL: Task[Unit] = attempt(TrustAllX509TrustManager.trustAllCertificates()) | |
private def createSession: Task[Session] = attempt(Session.getInstance(System.getProperties, null)) | |
private def connectStore(config: SMTPConfig, session: Session): RIO[Scope, Store] = ZIO.acquireRelease(attempt: | |
val store = session.getStore("imaps") | |
store.connect(config.host, config.port, config.username, config.password) | |
store | |
)(store => ZIO.succeed(store.close()).ignoreLogged <* logInfo("Store closed")) | |
private def openFolder(config: SMTPConfig, store: Store, name: String = "INBOX"): RIO[Scope, IMAPFolder] = | |
ZIO.acquireRelease(attempt: | |
val folder = store.getFolder(name).asInstanceOf[IMAPFolder] | |
folder.open(config.folderMode) | |
folder | |
)(folder => ZIO.succeed(folder.close(true)).ignoreLogged <* logInfo(s"Folder ${folder.getName} closed")) | |
private def getMessageID(folder: IMAPFolder, message: Message): Task[MessageID] = attempt: | |
MessageID( | |
uid = Option(folder.getUID(message)).flatMap(u => if u > 0 then Some(u) else None), | |
messageID = Option(message.getHeader("Message-ID")).flatMap(_.headOption) | |
) | |
private def parseEmailMessage(folder: IMAPFolder)(message: Message): Task[EmailMessage] = for | |
messageID <- getMessageID(folder, message) | |
emailMessage = EmailMessage( | |
messageID = messageID, | |
subject = message.getSubject, | |
from = message.getFrom.headOption.map(_.toString).getOrElse("UNKNOWN"), | |
content = message.getContent.toString, | |
flags = message.getFlags.getSystemFlags.toSet.map(_.asSafeFlag) | |
) | |
yield emailMessage | |
private def fetchExisting(config: SMTPConfig, folder: IMAPFolder, top: Int = 100): Stream[Throwable, EmailMessage] = | |
ZStream | |
.fromZIO(attempt(folder.getSortedMessages(Array(SortTerm.REVERSE, SortTerm.DATE)))) | |
.take(top) | |
.flatMap(ZStream.fromIterable) | |
.mapZIO(parseEmailMessage(folder)) | |
private def observeForNew(config: SMTPConfig, folder: IMAPFolder): Stream[Throwable, EmailMessage] = | |
ZStream.logInfo(s"Starting monitoring of ${folder.getName} for new messages") *> | |
ZStream.asyncZIO[Any, Throwable, EmailMessage]: callback => | |
val listener = new MessageCountListener: | |
def messagesRemoved(event: MessageCountEvent): Unit = () | |
def messagesAdded(event: MessageCountEvent): Unit = event.getMessages.foreach: message => | |
callback(parseEmailMessage(folder)(message).mapBoth(Some(_), Chunk(_))) | |
folder.addMessageCountListener(listener) | |
ZIO.attemptBlocking { | |
var supportsIdle = false | |
try | |
folder.idle() // Attempt to enter IDLE mode | |
supportsIdle = true | |
catch case _: Exception => supportsIdle = false // Fallback to polling if IDLE isn't supported | |
// Fallback to interval pooling with 200ms delay | |
if supportsIdle then folder.idle() | |
else (ZIO.attemptBlocking(folder.getMessageCount) *> ZIO.sleep(200.millis)).forever.fork | |
}.fork | |
def make(config: SMTPConfig, top: Int = 100): ZStream[Scope, Throwable, EmailMessage] = ZStream.scoped { | |
for | |
_ <- setupSSL | |
session <- createSession | |
store <- connectStore(config, session) | |
inbox <- openFolder(config, store) | |
yield fetchExisting(config, inbox, top) ++ observeForNew(config, inbox) | |
}.flatten |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment