Last active
August 20, 2025 21:44
-
-
Save arturaz/9a45a8b3807fc625e4feb006b3a94ab1 to your computer and use it in GitHub Desktop.
Utility class to unzip 7zip archives using fs2 and fs2-compress scala libraries
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package app.utils.compression | |
import cats.effect.kernel.Async | |
import cats.effect.std.{Dispatcher, Queue, QueueSink, QueueSource} | |
import de.lhns.fs2.compress.ArchiveEntry | |
import fs2.io.file.{Files, Path} | |
import fs2.Chunk | |
import net.sf.sevenzipjbinding.* | |
import net.sf.sevenzipjbinding.impl.RandomAccessFileInStream | |
import java.io.{File, RandomAccessFile} | |
import java.util.Date | |
given CanEqual[ExtractAskMode, ExtractAskMode] = CanEqual.derived | |
class SevenZipUnarchiver[F[_]: Async] { | |
type Entry = (ArchiveEntry[Option, Unit], Stream[F, Byte]) | |
/** Unarchives the contents of a SevenZip archive. | |
* | |
* Note that this method creates a temporary file to store the archive contents before processing them. | |
*/ | |
def unarchive(stream: Stream[F, Byte]): Resource[F, Stream[F, Entry]] = { | |
val tempFileResource = | |
Resource.make(Async[F].blocking(Path.fromNioPath(File.createTempFile("SevenZipUnarchiver-", ".tmp").toPath)))( | |
path => Async[F].blocking(path.toNioPath.toFile.delete(): Unit) | |
) | |
tempFileResource.flatMap { tempFile => | |
Resource.eval(Files.forAsync[F].writeAll(tempFile)(stream).compile.drain) >> unarchive(tempFile) | |
} | |
} | |
/** Unarchives the contents of a SevenZip archive. | |
* | |
* You must consume the entries before the resource is closed, otherwise the archive will be closed. | |
* | |
* @param path | |
* The path to the SevenZip archive. | |
*/ | |
def unarchive(path: Path): Resource[F, Stream[F, Entry]] = { | |
def randomAccessFile = | |
Resource.fromAutoCloseable(Async[F].delay(new RandomAccessFile(path.toNioPath.toFile, "r"))) | |
def randomAccessFileInputStream(raf: RandomAccessFile) = | |
Resource.fromAutoCloseable(Async[F].delay(new RandomAccessFileInStream(raf))) | |
def sevenZipArchive(stream: RandomAccessFileInStream) = | |
Resource.fromAutoCloseable(Async[F].blocking(SevenZip.openInArchive(null /* autodetect */, stream))) | |
for { | |
raf <- randomAccessFile | |
inStream <- randomAccessFileInputStream(raf) | |
archive <- sevenZipArchive(inStream) | |
} yield archiveEntriesResource(archive) | |
} | |
def archiveEntriesResource( | |
archive: IInArchive | |
): Stream[F, Entry] = { | |
val numOfItems = archive.getNumberOfItems | |
if (numOfItems == 0) return Stream.empty | |
def makeArchiveEntry(index: Int) = ArchiveEntry( | |
name = archive.getProperty(index, PropID.PATH).asInstanceOf[String], | |
isDirectory = archive.getProperty(index, PropID.IS_FOLDER).asInstanceOf[Boolean], | |
uncompressedSize = { | |
val raw = archive.getProperty(index, PropID.SIZE) | |
if (raw == null) None | |
else { | |
val size = raw.asInstanceOf[Long] | |
if (size < 0) None else Some(size) | |
} | |
}, | |
lastModified = Option(archive.getProperty(index, PropID.LAST_MODIFICATION_TIME)) | |
.map(_.asInstanceOf[Date].toInstant()), | |
lastAccess = Option(archive.getProperty(index, PropID.LAST_ACCESS_TIME)) | |
.map(_.asInstanceOf[Date].toInstant()), | |
creation = Option(archive.getProperty(index, PropID.CREATION_TIME)) | |
.map(_.asInstanceOf[Date].toInstant()), | |
underlying = (), | |
) | |
def listEntriesToQueue(dispatcher: Dispatcher[F], queue: QueueSink[F, Entry]) = | |
Async[F] | |
.blocking { | |
val indices = Array.tabulate(numOfItems)(identity) | |
val callback = new IArchiveExtractCallback { | |
override def getStream(index: Int, extractAskMode: ExtractAskMode): ISequentialOutStream = { | |
val entry = makeArchiveEntry(index) | |
val body = itemContentsStream(archive, index) | |
dispatcher.unsafeRunSync(queue.offer((entry, body))) | |
null // do not actually extract the file | |
} | |
override def setOperationResult(extractOperationResult: ExtractOperationResult): Unit = {} | |
override def prepareOperation(extractAskMode: ExtractAskMode): Unit = {} | |
override def setTotal(total: Long): Unit = {} | |
override def setCompleted(complete: Long): Unit = {} | |
} | |
archive.extract(indices, /* testMode */ false, callback) | |
} | |
produceConsume(queueSize = 100)(listEntriesToQueue)(Stream.fromQueueNoneTerminated(_)) | |
} | |
/** Extracts a single item from archive, given it's index. */ | |
def itemContentsStream(archive: IInArchive, index: Int): Stream[F, Byte] = { | |
def extractToQueue(dispatcher: Dispatcher[F], queue: QueueSink[F, Chunk[Byte]]) = | |
Async[F] | |
.blocking { | |
val callback = new IArchiveExtractCallback { | |
override def getStream(index: Int, extractAskMode: ExtractAskMode): ISequentialOutStream = { | |
val skipExtraction = archive.getProperty(index, PropID.IS_FOLDER).asInstanceOf[Boolean] | |
if (skipExtraction || extractAskMode != ExtractAskMode.EXTRACT) null | |
else | |
data => { | |
dispatcher.unsafeRunSync(queue.offer(Chunk.from(data))) | |
data.length // return how many bytes we have consumed | |
} | |
} | |
override def setOperationResult(extractOperationResult: ExtractOperationResult): Unit = {} | |
override def prepareOperation(extractAskMode: ExtractAskMode): Unit = {} | |
override def setTotal(total: Long): Unit = {} | |
override def setCompleted(complete: Long): Unit = {} | |
} | |
archive.extract(Array(index), /* testMode */ false, callback) | |
} | |
produceConsume(queueSize = 10)(extractToQueue)(Stream.fromQueueNoneTerminatedChunk(_)) | |
} | |
private def produceConsume[In, Out](queueSize: Int)(makeProducer: (Dispatcher[F], QueueSink[F, In]) => F[Unit])( | |
makeConsumer: QueueSource[F, Option[In]] => Stream[F, Out] | |
): Stream[F, Out] = { | |
val resource = for { | |
dispatcher <- Dispatcher.sequential[F] | |
queue <- Resource.eval(Queue.bounded[F, Option[In]](queueSize)) | |
} yield (dispatcher, queue) | |
Stream.resource(resource).flatMap { case (dispatcher, queue) => | |
val producerSink = queue.asSink.contramap[In](_.some) | |
val producerF = makeProducer(dispatcher, producerSink).flatMap(_ => queue.offer(None)) | |
val producer = Stream.exec(producerF) | |
val consumer = makeConsumer(queue) | |
consumer.concurrently(producer) | |
} | |
} | |
} | |
object SevenZipUnarchiver { | |
def apply[F[_]](implicit instance: SevenZipUnarchiver[F]): SevenZipUnarchiver[F] = instance | |
def make[F[_]: Async]: SevenZipUnarchiver[F] = | |
new SevenZipUnarchiver | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment