Skip to content

Instantly share code, notes, and snippets.

@arturaz
Last active August 20, 2025 21:44
Show Gist options
  • Save arturaz/9a45a8b3807fc625e4feb006b3a94ab1 to your computer and use it in GitHub Desktop.
Save arturaz/9a45a8b3807fc625e4feb006b3a94ab1 to your computer and use it in GitHub Desktop.
Utility class to unzip 7zip archives using fs2 and fs2-compress scala libraries
package app.utils.compression
import cats.effect.kernel.Async
import cats.effect.std.{Dispatcher, Queue, QueueSink, QueueSource}
import de.lhns.fs2.compress.ArchiveEntry
import fs2.io.file.{Files, Path}
import fs2.Chunk
import net.sf.sevenzipjbinding.*
import net.sf.sevenzipjbinding.impl.RandomAccessFileInStream
import java.io.{File, RandomAccessFile}
import java.util.Date
given CanEqual[ExtractAskMode, ExtractAskMode] = CanEqual.derived
class SevenZipUnarchiver[F[_]: Async] {
type Entry = (ArchiveEntry[Option, Unit], Stream[F, Byte])
/** Unarchives the contents of a SevenZip archive.
*
* Note that this method creates a temporary file to store the archive contents before processing them.
*/
def unarchive(stream: Stream[F, Byte]): Resource[F, Stream[F, Entry]] = {
val tempFileResource =
Resource.make(Async[F].blocking(Path.fromNioPath(File.createTempFile("SevenZipUnarchiver-", ".tmp").toPath)))(
path => Async[F].blocking(path.toNioPath.toFile.delete(): Unit)
)
tempFileResource.flatMap { tempFile =>
Resource.eval(Files.forAsync[F].writeAll(tempFile)(stream).compile.drain) >> unarchive(tempFile)
}
}
/** Unarchives the contents of a SevenZip archive.
*
* You must consume the entries before the resource is closed, otherwise the archive will be closed.
*
* @param path
* The path to the SevenZip archive.
*/
def unarchive(path: Path): Resource[F, Stream[F, Entry]] = {
def randomAccessFile =
Resource.fromAutoCloseable(Async[F].delay(new RandomAccessFile(path.toNioPath.toFile, "r")))
def randomAccessFileInputStream(raf: RandomAccessFile) =
Resource.fromAutoCloseable(Async[F].delay(new RandomAccessFileInStream(raf)))
def sevenZipArchive(stream: RandomAccessFileInStream) =
Resource.fromAutoCloseable(Async[F].blocking(SevenZip.openInArchive(null /* autodetect */, stream)))
for {
raf <- randomAccessFile
inStream <- randomAccessFileInputStream(raf)
archive <- sevenZipArchive(inStream)
} yield archiveEntriesResource(archive)
}
def archiveEntriesResource(
archive: IInArchive
): Stream[F, Entry] = {
val numOfItems = archive.getNumberOfItems
if (numOfItems == 0) return Stream.empty
def makeArchiveEntry(index: Int) = ArchiveEntry(
name = archive.getProperty(index, PropID.PATH).asInstanceOf[String],
isDirectory = archive.getProperty(index, PropID.IS_FOLDER).asInstanceOf[Boolean],
uncompressedSize = {
val raw = archive.getProperty(index, PropID.SIZE)
if (raw == null) None
else {
val size = raw.asInstanceOf[Long]
if (size < 0) None else Some(size)
}
},
lastModified = Option(archive.getProperty(index, PropID.LAST_MODIFICATION_TIME))
.map(_.asInstanceOf[Date].toInstant()),
lastAccess = Option(archive.getProperty(index, PropID.LAST_ACCESS_TIME))
.map(_.asInstanceOf[Date].toInstant()),
creation = Option(archive.getProperty(index, PropID.CREATION_TIME))
.map(_.asInstanceOf[Date].toInstant()),
underlying = (),
)
def listEntriesToQueue(dispatcher: Dispatcher[F], queue: QueueSink[F, Entry]) =
Async[F]
.blocking {
val indices = Array.tabulate(numOfItems)(identity)
val callback = new IArchiveExtractCallback {
override def getStream(index: Int, extractAskMode: ExtractAskMode): ISequentialOutStream = {
val entry = makeArchiveEntry(index)
val body = itemContentsStream(archive, index)
dispatcher.unsafeRunSync(queue.offer((entry, body)))
null // do not actually extract the file
}
override def setOperationResult(extractOperationResult: ExtractOperationResult): Unit = {}
override def prepareOperation(extractAskMode: ExtractAskMode): Unit = {}
override def setTotal(total: Long): Unit = {}
override def setCompleted(complete: Long): Unit = {}
}
archive.extract(indices, /* testMode */ false, callback)
}
produceConsume(queueSize = 100)(listEntriesToQueue)(Stream.fromQueueNoneTerminated(_))
}
/** Extracts a single item from archive, given it's index. */
def itemContentsStream(archive: IInArchive, index: Int): Stream[F, Byte] = {
def extractToQueue(dispatcher: Dispatcher[F], queue: QueueSink[F, Chunk[Byte]]) =
Async[F]
.blocking {
val callback = new IArchiveExtractCallback {
override def getStream(index: Int, extractAskMode: ExtractAskMode): ISequentialOutStream = {
val skipExtraction = archive.getProperty(index, PropID.IS_FOLDER).asInstanceOf[Boolean]
if (skipExtraction || extractAskMode != ExtractAskMode.EXTRACT) null
else
data => {
dispatcher.unsafeRunSync(queue.offer(Chunk.from(data)))
data.length // return how many bytes we have consumed
}
}
override def setOperationResult(extractOperationResult: ExtractOperationResult): Unit = {}
override def prepareOperation(extractAskMode: ExtractAskMode): Unit = {}
override def setTotal(total: Long): Unit = {}
override def setCompleted(complete: Long): Unit = {}
}
archive.extract(Array(index), /* testMode */ false, callback)
}
produceConsume(queueSize = 10)(extractToQueue)(Stream.fromQueueNoneTerminatedChunk(_))
}
private def produceConsume[In, Out](queueSize: Int)(makeProducer: (Dispatcher[F], QueueSink[F, In]) => F[Unit])(
makeConsumer: QueueSource[F, Option[In]] => Stream[F, Out]
): Stream[F, Out] = {
val resource = for {
dispatcher <- Dispatcher.sequential[F]
queue <- Resource.eval(Queue.bounded[F, Option[In]](queueSize))
} yield (dispatcher, queue)
Stream.resource(resource).flatMap { case (dispatcher, queue) =>
val producerSink = queue.asSink.contramap[In](_.some)
val producerF = makeProducer(dispatcher, producerSink).flatMap(_ => queue.offer(None))
val producer = Stream.exec(producerF)
val consumer = makeConsumer(queue)
consumer.concurrently(producer)
}
}
}
object SevenZipUnarchiver {
def apply[F[_]](implicit instance: SevenZipUnarchiver[F]): SevenZipUnarchiver[F] = instance
def make[F[_]: Async]: SevenZipUnarchiver[F] =
new SevenZipUnarchiver
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment