Skip to content

Instantly share code, notes, and snippets.

@arturaz
Created March 26, 2020 21:30
Show Gist options
  • Save arturaz/9a45a8b3807fc625e4feb006b3a94ab1 to your computer and use it in GitHub Desktop.
Save arturaz/9a45a8b3807fc625e4feb006b3a94ab1 to your computer and use it in GitHub Desktop.
Utility class to unzip 7zip archives using fs2 scala library
package app.utils
import java.io.{File, FileOutputStream, RandomAccessFile}
import cats.effect.{ConcurrentEffect, IO}
import com.typesafe.scalalogging.Logger
import fs2._
import fs2.concurrent.Queue
import net.sf.sevenzipjbinding._
import net.sf.sevenzipjbinding.impl.RandomAccessFileInStream
object SevenZipStreams {
val chunkSize = 4098
case class ItemInfo(isFolder: Boolean, path: String)
case class Entry[F[_]](item: ItemInfo, data: Stream[F, Byte])
def unzipPipe[F[_]](chunkSize: Int = chunkSize)(
implicit F: ConcurrentEffect[F], log: Logger
): Pipe[F, Byte, Entry[F]] = s => {
val tempFileStream = Stream.bracket(F.delay {
val file = File.createTempFile("SevenZipStreams-", ".temp")
log.debug(s"Created temp file @ ${file.getAbsolutePath}")
(new FileOutputStream(file), file)
}) { case (stream, file) => F.delay {
val path = file.toPath
log.debug(s"Closing stream (from bracket) for temp file @ $path")
stream.close()
log.debug(s"Deleting temp file @ $path")
file.delete()
} }
tempFileStream.flatMap { case (tempFileOutputStream, tempFile) =>
def pipeChunksToTempFile =
s.chunks.evalMap(c => F.delay { tempFileOutputStream.write(c.toArray) })
.append(Stream.eval(F.delay {
log.debug(s"Flushing & closing stream (from reader) for temp file @ ${tempFile.getPath}")
tempFileOutputStream.flush()
tempFileOutputStream.close()
}))
.drain
case class OpenedArchive(file: RandomAccessFile, stream: RandomAccessFileInStream, archive: IInArchive) {
def close(): Unit = {
try { archive.close() } catch { case t: Throwable => log.error("can't close archive", t) }
try { stream.close() } catch { case t: Throwable => log.error("can't close stream", t) }
try { file.close() } catch { case t: Throwable => log.error("can't close file", t) }
}
}
def openFile = Stream.bracket(F.delay {
log.debug(s"Opening archive from ${tempFile.getPath}")
val file = new RandomAccessFile(tempFile, "r")
val stream = new RandomAccessFileInStream(file)
val archive = SevenZip.openInArchive(null /* autodetect */, stream)
OpenedArchive(file, stream, archive)
})(archive => F.delay {
log.debug(s"Closing archive")
archive.close()
})
pipeChunksToTempFile ++ openFile.flatMap(a => archiveStream(a.archive)(F, log))
}
}
def archiveStream[F[_]](archive: IInArchive)(
implicit F: ConcurrentEffect[F], log: Logger
): Stream[F, Entry[F]] = {
Stream.eval(Queue.unbounded[F, Option[Entry[F]]]).flatMap { q =>
Stream.eval { F.delay {
def enqueue(v: Option[Entry[F]]): Unit = F.runAsync(q.enqueue1(v))(_ => IO.unit).unsafeRunSync
log.debug("archive.extract start")
archive.extract(
Array.tabulate(archive.getNumberOfItems)(identity), false,
new IArchiveExtractCallback {
override def getStream(index: Int, extractAskMode: ExtractAskMode): ISequentialOutStream = {
val info = ItemInfo(
isFolder = archive.getProperty(index, PropID.IS_FOLDER).asInstanceOf[Boolean],
path = archive.getProperty(index, PropID.PATH).asInstanceOf[String]
)
val entry = Entry(info, itemStream(archive, index, info.path))
enqueue(Some(entry))
null
}
override def setOperationResult(extractOperationResult: ExtractOperationResult): Unit = {}
override def prepareOperation(extractAskMode: ExtractAskMode): Unit = {}
override def setTotal(total: Long): Unit = {}
override def setCompleted(complete: Long): Unit = {}
}
)
log.debug("archive.extract end")
enqueue(None)
} }.drain ++ q.dequeue.unNoneTerminate
}
}
def itemStream[F[_]](archive: IInArchive, idx: Int, debugString: String = "")(
implicit F: ConcurrentEffect[F], log: Logger
): Stream[F, Byte] = {
for {
queue <- Stream.eval(Queue.unbounded[F, Option[Chunk[Byte]]])
_ <- Stream.eval { F.delay {
def enqueue(v: Option[Chunk[Byte]]): Unit = F.runAsync(queue.enqueue1(v))(_ => IO.unit).unsafeRunSync
log.debug(s"Extracting item idx=$idx from archive (debug: $debugString)")
archive.extract(
Array(idx), false,
new IArchiveExtractCallback {
override def getStream(index: Int, extractAskMode: ExtractAskMode): ISequentialOutStream = {
val skipExtraction = archive.getProperty(index, PropID.IS_FOLDER).asInstanceOf[Boolean]
if (skipExtraction || extractAskMode != ExtractAskMode.EXTRACT) null
else data => {
// log.debug(s"Got chunk of data (${data.length} bytes)")
enqueue(Some(Chunk.bytes(data)))
data.length
}
}
override def setOperationResult(extractOperationResult: ExtractOperationResult): Unit = {
log.debug(s"Extracted item idx=$idx from archive (debug: $debugString), result=$extractOperationResult")
}
override def prepareOperation(extractAskMode: ExtractAskMode): Unit = {}
override def setTotal(total: Long): Unit = {}
override def setCompleted(complete: Long): Unit = {}
}
)
log.debug(s"Extract item idx=$idx from archive (debug: $debugString) done.")
enqueue(None)
} }
byte <- queue.dequeue.unNoneTerminate.flatMap(Stream.chunk)
} yield byte
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment