Created
March 26, 2020 21:30
-
-
Save arturaz/9a45a8b3807fc625e4feb006b3a94ab1 to your computer and use it in GitHub Desktop.
Utility class to unzip 7zip archives using fs2 scala library
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package app.utils | |
import java.io.{File, FileOutputStream, RandomAccessFile} | |
import cats.effect.{ConcurrentEffect, IO} | |
import com.typesafe.scalalogging.Logger | |
import fs2._ | |
import fs2.concurrent.Queue | |
import net.sf.sevenzipjbinding._ | |
import net.sf.sevenzipjbinding.impl.RandomAccessFileInStream | |
object SevenZipStreams { | |
val chunkSize = 4098 | |
case class ItemInfo(isFolder: Boolean, path: String) | |
case class Entry[F[_]](item: ItemInfo, data: Stream[F, Byte]) | |
def unzipPipe[F[_]](chunkSize: Int = chunkSize)( | |
implicit F: ConcurrentEffect[F], log: Logger | |
): Pipe[F, Byte, Entry[F]] = s => { | |
val tempFileStream = Stream.bracket(F.delay { | |
val file = File.createTempFile("SevenZipStreams-", ".temp") | |
log.debug(s"Created temp file @ ${file.getAbsolutePath}") | |
(new FileOutputStream(file), file) | |
}) { case (stream, file) => F.delay { | |
val path = file.toPath | |
log.debug(s"Closing stream (from bracket) for temp file @ $path") | |
stream.close() | |
log.debug(s"Deleting temp file @ $path") | |
file.delete() | |
} } | |
tempFileStream.flatMap { case (tempFileOutputStream, tempFile) => | |
def pipeChunksToTempFile = | |
s.chunks.evalMap(c => F.delay { tempFileOutputStream.write(c.toArray) }) | |
.append(Stream.eval(F.delay { | |
log.debug(s"Flushing & closing stream (from reader) for temp file @ ${tempFile.getPath}") | |
tempFileOutputStream.flush() | |
tempFileOutputStream.close() | |
})) | |
.drain | |
case class OpenedArchive(file: RandomAccessFile, stream: RandomAccessFileInStream, archive: IInArchive) { | |
def close(): Unit = { | |
try { archive.close() } catch { case t: Throwable => log.error("can't close archive", t) } | |
try { stream.close() } catch { case t: Throwable => log.error("can't close stream", t) } | |
try { file.close() } catch { case t: Throwable => log.error("can't close file", t) } | |
} | |
} | |
def openFile = Stream.bracket(F.delay { | |
log.debug(s"Opening archive from ${tempFile.getPath}") | |
val file = new RandomAccessFile(tempFile, "r") | |
val stream = new RandomAccessFileInStream(file) | |
val archive = SevenZip.openInArchive(null /* autodetect */, stream) | |
OpenedArchive(file, stream, archive) | |
})(archive => F.delay { | |
log.debug(s"Closing archive") | |
archive.close() | |
}) | |
pipeChunksToTempFile ++ openFile.flatMap(a => archiveStream(a.archive)(F, log)) | |
} | |
} | |
def archiveStream[F[_]](archive: IInArchive)( | |
implicit F: ConcurrentEffect[F], log: Logger | |
): Stream[F, Entry[F]] = { | |
Stream.eval(Queue.unbounded[F, Option[Entry[F]]]).flatMap { q => | |
Stream.eval { F.delay { | |
def enqueue(v: Option[Entry[F]]): Unit = F.runAsync(q.enqueue1(v))(_ => IO.unit).unsafeRunSync | |
log.debug("archive.extract start") | |
archive.extract( | |
Array.tabulate(archive.getNumberOfItems)(identity), false, | |
new IArchiveExtractCallback { | |
override def getStream(index: Int, extractAskMode: ExtractAskMode): ISequentialOutStream = { | |
val info = ItemInfo( | |
isFolder = archive.getProperty(index, PropID.IS_FOLDER).asInstanceOf[Boolean], | |
path = archive.getProperty(index, PropID.PATH).asInstanceOf[String] | |
) | |
val entry = Entry(info, itemStream(archive, index, info.path)) | |
enqueue(Some(entry)) | |
null | |
} | |
override def setOperationResult(extractOperationResult: ExtractOperationResult): Unit = {} | |
override def prepareOperation(extractAskMode: ExtractAskMode): Unit = {} | |
override def setTotal(total: Long): Unit = {} | |
override def setCompleted(complete: Long): Unit = {} | |
} | |
) | |
log.debug("archive.extract end") | |
enqueue(None) | |
} }.drain ++ q.dequeue.unNoneTerminate | |
} | |
} | |
def itemStream[F[_]](archive: IInArchive, idx: Int, debugString: String = "")( | |
implicit F: ConcurrentEffect[F], log: Logger | |
): Stream[F, Byte] = { | |
for { | |
queue <- Stream.eval(Queue.unbounded[F, Option[Chunk[Byte]]]) | |
_ <- Stream.eval { F.delay { | |
def enqueue(v: Option[Chunk[Byte]]): Unit = F.runAsync(queue.enqueue1(v))(_ => IO.unit).unsafeRunSync | |
log.debug(s"Extracting item idx=$idx from archive (debug: $debugString)") | |
archive.extract( | |
Array(idx), false, | |
new IArchiveExtractCallback { | |
override def getStream(index: Int, extractAskMode: ExtractAskMode): ISequentialOutStream = { | |
val skipExtraction = archive.getProperty(index, PropID.IS_FOLDER).asInstanceOf[Boolean] | |
if (skipExtraction || extractAskMode != ExtractAskMode.EXTRACT) null | |
else data => { | |
// log.debug(s"Got chunk of data (${data.length} bytes)") | |
enqueue(Some(Chunk.bytes(data))) | |
data.length | |
} | |
} | |
override def setOperationResult(extractOperationResult: ExtractOperationResult): Unit = { | |
log.debug(s"Extracted item idx=$idx from archive (debug: $debugString), result=$extractOperationResult") | |
} | |
override def prepareOperation(extractAskMode: ExtractAskMode): Unit = {} | |
override def setTotal(total: Long): Unit = {} | |
override def setCompleted(complete: Long): Unit = {} | |
} | |
) | |
log.debug(s"Extract item idx=$idx from archive (debug: $debugString) done.") | |
enqueue(None) | |
} } | |
byte <- queue.dequeue.unNoneTerminate.flatMap(Stream.chunk) | |
} yield byte | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment