Skip to content

Instantly share code, notes, and snippets.

@otobrglez
Last active November 11, 2024 09:08
Show Gist options
  • Save otobrglez/238cfbedbb9553aee8b3147b2df853e3 to your computer and use it in GitHub Desktop.
Save otobrglez/238cfbedbb9553aee8b3147b2df853e3 to your computer and use it in GitHub Desktop.
A simple stream of emails from IMAP server.
import eu.timepit.refined.auto.autoUnwrap
import jakarta.mail.event.{MessageCountEvent, MessageCountListener}
import jakarta.mail.{Message, Session, Store}
import org.eclipse.angus.mail.imap.{IMAPFolder, SortTerm}
import zio.ZIO.{attempt, logInfo}
import zio.stream.{Stream, ZStream}
import zio.{Chunk, RIO, Scope, Task, ZIO}
import zio.durationInt
final case class MessageID(
uid: Option[Long], // IMAP UID
messageID: Option[String] // RFC 822 - Message ID Header
)
final case class EmailMessage(
messageID: MessageID,
subject: String,
from: String,
content: String,
flags: Set[SafeFlags]
)
object EmailStream:
import FlagOps.*
private def setupSSL: Task[Unit] = attempt(TrustAllX509TrustManager.trustAllCertificates())
private def createSession: Task[Session] = attempt(Session.getInstance(System.getProperties, null))
private def connectStore(config: SMTPConfig, session: Session): RIO[Scope, Store] = ZIO.acquireRelease(attempt:
val store = session.getStore("imaps")
store.connect(config.host, config.port, config.username, config.password)
store
)(store => ZIO.succeed(store.close()).ignoreLogged <* logInfo("Store closed"))
private def openFolder(config: SMTPConfig, store: Store, name: String = "INBOX"): RIO[Scope, IMAPFolder] =
ZIO.acquireRelease(attempt:
val folder = store.getFolder(name).asInstanceOf[IMAPFolder]
folder.open(config.folderMode)
folder
)(folder => ZIO.succeed(folder.close(true)).ignoreLogged <* logInfo(s"Folder ${folder.getName} closed"))
private def getMessageID(folder: IMAPFolder, message: Message): Task[MessageID] = attempt:
MessageID(
uid = Option(folder.getUID(message)).flatMap(u => if u > 0 then Some(u) else None),
messageID = Option(message.getHeader("Message-ID")).flatMap(_.headOption)
)
private def parseEmailMessage(folder: IMAPFolder)(message: Message): Task[EmailMessage] = for
messageID <- getMessageID(folder, message)
emailMessage = EmailMessage(
messageID = messageID,
subject = message.getSubject,
from = message.getFrom.headOption.map(_.toString).getOrElse("UNKNOWN"),
content = message.getContent.toString,
flags = message.getFlags.getSystemFlags.toSet.map(_.asSafeFlag)
)
yield emailMessage
private def fetchExisting(config: SMTPConfig, folder: IMAPFolder, top: Int = 100): Stream[Throwable, EmailMessage] =
ZStream
.fromZIO(attempt(folder.getSortedMessages(Array(SortTerm.REVERSE, SortTerm.DATE))))
.take(top)
.flatMap(ZStream.fromIterable)
.mapZIO(parseEmailMessage(folder))
private def observeForNew(config: SMTPConfig, folder: IMAPFolder): Stream[Throwable, EmailMessage] =
ZStream.logInfo(s"Starting monitoring of ${folder.getName} for new messages") *>
ZStream.asyncZIO[Any, Throwable, EmailMessage]: callback =>
val listener = new MessageCountListener:
def messagesRemoved(event: MessageCountEvent): Unit = ()
def messagesAdded(event: MessageCountEvent): Unit = event.getMessages.foreach: message =>
callback(parseEmailMessage(folder)(message).mapBoth(Some(_), Chunk(_)))
folder.addMessageCountListener(listener)
ZIO.attemptBlocking {
var supportsIdle = false
try
folder.idle() // Attempt to enter IDLE mode
supportsIdle = true
catch case _: Exception => supportsIdle = false // Fallback to polling if IDLE isn't supported
// Fallback to interval pooling with 200ms delay
if supportsIdle then folder.idle()
else (ZIO.attemptBlocking(folder.getMessageCount) *> ZIO.sleep(200.millis)).forever.fork
}.fork
def make(config: SMTPConfig, top: Int = 100): ZStream[Scope, Throwable, EmailMessage] = ZStream.scoped {
for
_ <- setupSSL
session <- createSession
store <- connectStore(config, session)
inbox <- openFolder(config, store)
yield fetchExisting(config, inbox, top) ++ observeForNew(config, inbox)
}.flatten
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment