Last active
May 25, 2024 10:20
-
-
Save dacr/46718666ae96ebac300b27c80ed7bec3 to your computer and use it in GitHub Desktop.
Photos indexation proof of concept using ZIO, opensearch and deep-java-learning - When you have ~89000 photos/videos tooling is mandatory / published by https://github.com/dacr/code-examples-manager #131e1312-4b2b-48c4-a60a-e17bfa73754b/f33f5d68754f8119cd4a467f8d40a50727ff9eca
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// summary : Photos indexation proof of concept using ZIO, opensearch and deep-java-learning - When you have ~89000 photos/videos tooling is mandatory | |
// keywords : scala, photos, memories, zio, ziostream, poc, djl, machine-learning, elasticsearch | |
// publish : gist | |
// authors : David Crosson | |
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2) | |
// id : 131e1312-4b2b-48c4-a60a-e17bfa73754b | |
// created-on : 2021-12-19T09:29:59+01:00 | |
// managed-by : https://github.com/dacr/code-examples-manager | |
// run-with : scala-cli $file | |
/* | |
Just run it once configured. Configuration is achieved using environment variables : | |
- PHOTOS_SEARCH_IGNORE_MASK=(?:(/Originals/)|(/[.])) | |
- PHOTOS_SEARCH_INCLUDE_MASK=(?i)[.](?:(jpg)|(png)|(jpeg)|(tiff)|(heic)) | |
- PHOTOS_SEARCH_ROOTS=/place1,/place2 | |
- PHOTOS_ELASTIC_URL=http://127.0.0.1:9200 | |
- PHOTOS_ELASTIC_USERNAME=admin | |
- PHOTOS_ELASTIC_PASSWORD=admin | |
*/ | |
/* | |
What it does in a streamed way : | |
- search for photo from given search root directories | |
- extract keywords from the directory name containing each photos | |
- extract photo meta-data information (exif) | |
- classify each photo using deep java learning (DJL) classification algorithm | |
- identify objects included in each photo using DJL dedicated algorithm | |
- index into elasticsearch all the data extracted from each photo | |
Issues encountered while designing this proof of concept : | |
- DJL native libraries not thread-safe, generates JVM SIGSEGV | |
- data (photos) related issues | |
- Photo filename using bad char encoding | |
- Invalid shootDateTime (negative year or not in the right year range [2000,now]) | |
- not using an unified timestamp field which is required for OpenSearch-Dashboard pattern index pattern | |
- priority to valid shootDateTime and fallback to image file last updated datetime | |
- Not yet found the right Photo UUID to use | |
- Using named UUID computed from photo hash was not a good idea as it hides any photo duplicates | |
- Currently using a filepath only approach for the computed named UUID | |
to do next : | |
- extract GPS locations | |
- extract a third optional timestamp from the filepath | |
- /home/ALBUMS/2017/2017-08-15 Cote Granite Rose/2Y7A0847.JPG => 2017-08-15 | |
- Face recognition with bounded boxes | |
- add how many people counter on each photo | |
- People faces classification and name indexation | |
*/ | |
// --------------------- | |
//> using scala "3.4.2" | |
//> using dep "dev.zio::zio:2.0.15" | |
//> using dep "dev.zio::zio-streams:2.0.15" | |
//> using dep "dev.zio::zio-json:0.6.0" | |
//> using dep "com.drewnoakes:metadata-extractor:2.18.0" | |
//> using dep "com.fasterxml.uuid:java-uuid-generator:4.2.0" | |
//> using dep "com.sksamuel.elastic4s::elastic4s-effect-zio:8.9.1" | |
//> using dep "com.sksamuel.elastic4s::elastic4s-client-esjava:8.9.1" | |
//> using dep "com.sksamuel.elastic4s::elastic4s-json-zio:8.9.1" | |
//---------------------- | |
//> using dep "ai.djl:api:0.23.0" | |
//> using dep "ai.djl:basicdataset:0.23.0" | |
//> using dep "ai.djl:model-zoo:0.23.0" | |
//> using dep "ai.djl.mxnet:mxnet-engine:0.23.0" | |
//> using dep "ai.djl.mxnet:mxnet-model-zoo:0.23.0" | |
//> using dep "ai.djl.mxnet:mxnet-native-auto:1.8.0" | |
//> using dep "net.java.dev.jna:jna:5.13.0" | |
//---------------------- | |
//> using dep "org.slf4j:slf4j-api:2.0.7" | |
//> using dep "org.slf4j:slf4j-simple:2.0.7" | |
// --------------------- | |
import com.drew.imaging.ImageMetadataReader | |
import com.drew.metadata.exif.{ExifDirectoryBase, ExifIFD0Directory, ExifSubIFDDirectory} | |
import com.fasterxml.uuid.Generators | |
import zio.* | |
import zio.json.* | |
import zio.stream.* | |
import zio.stream.ZPipeline.{splitLines, utf8Decode} | |
import java.io.{File, IOException} | |
import java.nio.charset.Charset | |
import java.nio.file.attribute.BasicFileAttributes | |
import java.nio.file.{Files, Path, Paths} | |
import java.time.{Instant, OffsetDateTime, ZoneId, ZoneOffset, ZonedDateTime} | |
import java.util.UUID | |
import scala.util.matching.Regex | |
import scala.util.{Either, Failure, Left, Properties, Right, Success, Try} | |
import java.time.format.DateTimeFormatter.ISO_DATE_TIME | |
import java.time.temporal.ChronoField | |
import scala.jdk.CollectionConverters.* | |
import scala.Console.{BLUE, GREEN, RED, RESET, YELLOW} | |
// ===================================================================================================================== | |
object HashOps { | |
def sha1(that: String): String = | |
import java.math.BigInteger | |
import java.security.MessageDigest | |
val content = if (that == null) "" else that // TODO - probably discutable, migrate to an effect | |
val md = MessageDigest.getInstance("SHA-1") // TODO - can fail => potential border side effect ! | |
val digest = md.digest(content.getBytes) | |
val bigInt = new BigInteger(1, digest) | |
val hashedString = bigInt.toString(16) | |
hashedString | |
def fileDigest(path: Path, algo: String = "SHA-256"): String = | |
import java.math.BigInteger | |
import java.security.{MessageDigest, DigestInputStream} | |
import java.io.FileInputStream | |
val buffer = new Array[Byte](8192) | |
val md5 = MessageDigest.getInstance(algo) | |
val dis = new DigestInputStream(new FileInputStream(path.toFile), md5) | |
try { while (dis.read(buffer) != -1) {} } | |
finally { dis.close() } | |
md5.digest.map("%02x".format(_)).mkString | |
} | |
// ===================================================================================================================== | |
object ElasticOps { | |
import com.sksamuel.elastic4s.zio.instances.* | |
import com.sksamuel.elastic4s.ziojson.* | |
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties} | |
import com.sksamuel.elastic4s.ElasticDsl.* | |
import com.sksamuel.elastic4s.Index | |
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties} | |
import com.sksamuel.elastic4s.http.JavaClient | |
import com.sksamuel.elastic4s.ElasticDsl.* | |
import com.sksamuel.elastic4s.requests.mappings.* | |
import com.sksamuel.elastic4s.Response | |
import com.sksamuel.elastic4s.requests.bulk.BulkResponse | |
import com.sksamuel.elastic4s.requests.searches.SearchResponse | |
import org.elasticsearch.client.RestClientBuilder.{HttpClientConfigCallback, RequestConfigCallback} | |
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials} | |
import org.apache.http.client.config.RequestConfig | |
import org.apache.http.impl.client.BasicCredentialsProvider | |
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder | |
import scala.concurrent.duration.FiniteDuration | |
import java.time.temporal.ChronoField | |
import java.util.concurrent.TimeUnit | |
import scala.util.Properties.{envOrNone, envOrElse} | |
// WARNING : ELASTIC_URL DEFAULT PORT IS 9200 !! (and not 80 or 443) SO BE EXPLICIT | |
val elasticUrl = envOrNone("PHOTOS_ELASTIC_URL").orElse(envOrNone("CEM_ELASTIC_URL")).getOrElse("http://127.0.0.1:9200") | |
val elasticUrlTrust = envOrNone("PHOTOS_ELASTIC_URL_TRUST_SSL").getOrElse("false").trim.toLowerCase | |
val elasticUsername = envOrNone("PHOTOS_ELASTIC_USERNAME").orElse(envOrNone("CEM_ELASTIC_USERNAME")) | |
val elasticPassword = envOrNone("PHOTOS_ELASTIC_PASSWORD").orElse(envOrNone("CEM_ELASTIC_PASSWORD")) | |
private val client = { // TODO rewrite to be fully effect based | |
val elasticProperties = ElasticProperties(elasticUrl) | |
val commonRequestConfigBuilder: RequestConfigCallback = (requestConfigBuilder: RequestConfig.Builder) => | |
requestConfigBuilder | |
.setConnectTimeout(10000) | |
.setRedirectsEnabled(true) | |
.setSocketTimeout(10000) | |
if (elasticPassword.isEmpty || elasticUsername.isEmpty) | |
ElasticClient(JavaClient(elasticProperties, commonRequestConfigBuilder)) | |
else { | |
lazy val provider = { | |
val basicProvider = new BasicCredentialsProvider | |
val credentials = new UsernamePasswordCredentials(elasticUsername.get, elasticPassword.get) | |
basicProvider.setCredentials(AuthScope.ANY, credentials) | |
basicProvider | |
} | |
import org.apache.http.ssl.SSLContexts | |
import org.apache.http.conn.ssl.TrustSelfSignedStrategy | |
val sslContext = elasticUrlTrust match { | |
case "true" => SSLContexts.custom().loadTrustMaterial(TrustSelfSignedStrategy()).build() | |
case _ => SSLContexts.createDefault() | |
} | |
val httpClientConfigCallback: HttpClientConfigCallback = | |
(httpClientBuilder: HttpAsyncClientBuilder) => | |
httpClientBuilder | |
.setDefaultCredentialsProvider(provider) | |
.setSSLContext(sslContext) | |
// .setSSLHostnameVerifier(org.apache.http.conn.ssl.NoopHostnameVerifier.INSTANCE) | |
ElasticClient(JavaClient(elasticProperties, commonRequestConfigBuilder, httpClientConfigCallback)) | |
} | |
} | |
private val scrollKeepAlive = FiniteDuration(30, "seconds") | |
private val timeout = 20.seconds | |
private val retrySchedule = (Schedule.exponential(500.millis, 2).jittered && Schedule.recurs(5)).onDecision((state, out, decision) => | |
decision match { | |
case Schedule.Decision.Done => ZIO.logInfo("No more retry attempt !") | |
case Schedule.Decision.Continue(interval) => ZIO.logInfo(s"Will retry at ${interval.start}") | |
} | |
) | |
val upsertGrouping = 50 | |
val searchPageSize = 500 | |
// ------------------------------------------------------ | |
private def indexNameFromTimestamp(indexPrefix: String, timestamp: OffsetDateTime): String = { | |
val year = timestamp.get(ChronoField.YEAR) | |
val month = timestamp.get(ChronoField.MONTH_OF_YEAR) | |
val day = timestamp.get(ChronoField.DAY_OF_MONTH) | |
val week = timestamp.get(ChronoField.ALIGNED_WEEK_OF_YEAR) | |
s"$indexPrefix-$year-$month" | |
} | |
// ------------------------------------------------------ | |
private def streamFromScroll(scrollId: String) = { | |
ZStream.paginateChunkZIO(scrollId) { currentScrollId => | |
for { | |
response <- client.execute(searchScroll(currentScrollId).keepAlive(scrollKeepAlive)) | |
nextScrollId = response.result.scrollId | |
results = Chunk.fromArray(response.result.hits.hits.map(_.sourceAsString)) | |
_ <- ZIO.log(s"Got ${results.size} more documents") | |
} yield results -> (if (results.size > 0) nextScrollId else None) | |
} | |
} | |
def fetchAll[T](indexName: String)(implicit decoder: JsonDecoder[T]) = { | |
// TODO something is going wrong here, sometimes not all results are returned without error being returned | |
// TODO deep pagination issue see https://www.elastic.co/guide/en/elasticsearch/reference/current/scroll-api.html | |
val result = for { | |
response <- client.execute(search(Index(indexName)).size(searchPageSize).scroll(scrollKeepAlive)) | |
scrollId <- ZIO.fromOption(response.result.scrollId) | |
firstResults = Chunk.fromArray(response.result.hits.hits.map(_.sourceAsString)) | |
_ <- ZIO.log(s"Got ${firstResults.size} first documents") | |
nextResultsStream = streamFromScroll(scrollId) | |
} yield ZStream.fromChunk(firstResults) ++ nextResultsStream | |
ZStream.unwrap(result).map(_.fromJson[T]).absolve.mapError(err => Exception(err.toString)) | |
} | |
// ------------------------------------------------------ | |
def upsert[T](indexPrefix: String, documents: Chunk[T])(timestampExtractor: T => OffsetDateTime, idExtractor: T => String)(implicit encoder: JsonEncoder[T]) = { | |
val responseEffect = client.execute { | |
bulk { | |
for { document <- documents } yield { | |
val indexName = indexNameFromTimestamp(indexPrefix, timestampExtractor(document)) | |
val id = idExtractor(document) | |
indexInto(indexName).id(id).doc(document) | |
} | |
} | |
} | |
val upsertEffect = for { | |
response <- responseEffect | |
failures = response.result.failures.flatMap(_.error).map(_.toString) | |
_ <- ZIO.log(s"${if (response.isSuccess) "Upserted" else "Failed to upsert"} ${documents.size} into elasticsearch") | |
_ <- ZIO.cond(response.isSuccess, (), failures.mkString("\n")) | |
} yield () | |
upsertEffect.timeout(timeout).retry(retrySchedule) | |
} | |
} | |
// ===================================================================================================================== | |
object AIPhotos { | |
import ai.djl.Application | |
import ai.djl.engine.Engine | |
import ai.djl.modality.cv.Image | |
import ai.djl.modality.cv.ImageFactory | |
import ai.djl.modality.Classifications | |
import ai.djl.modality.cv.output.DetectedObjects | |
import ai.djl.repository.zoo.Criteria | |
import ai.djl.repository.zoo.ModelZoo | |
import ai.djl.repository.zoo.ZooModel | |
import ai.djl.training.util.ProgressBar | |
import ai.djl.modality.Classifications.Classification | |
import java.nio.file.Files | |
import java.nio.file.Path | |
import java.nio.file.Paths | |
import scala.jdk.CollectionConverters._ | |
def basename(filename: String): String = { | |
filename | |
.split("[/](?=[^/]*$)", 2) | |
.last | |
.split("[.]", 2) | |
.head | |
} | |
val objectDetectionsCriteria = | |
Criteria.builder | |
.optApplication(Application.CV.OBJECT_DETECTION) | |
.setTypes(classOf[Image], classOf[DetectedObjects]) | |
.optFilter("backbone", "mobilenet1.0") | |
.optProgress(new ProgressBar) | |
.build | |
val objectDetectionsModel = ModelZoo.loadModel(objectDetectionsCriteria) | |
val objectDetectionPredictor = objectDetectionsModel.newPredictor() | |
def detectedObjects(path: Path): List[String] = { | |
val loadedImage: Image = ImageFactory.getInstance().fromFile(path) | |
val detection: DetectedObjects = objectDetectionPredictor.predict(loadedImage) | |
val detected: List[String] = | |
detection | |
.items() | |
.iterator() | |
.asScala | |
.toList | |
.asInstanceOf[List[Classifications.Classification]] | |
.filter(_.getProbability >= 0.5d) | |
.map(_.getClassName()) | |
.flatMap(_.split(""",\s+""")) | |
.distinct | |
detected | |
} | |
val imageClassificationCriteria = | |
Criteria.builder | |
.optApplication(Application.CV.IMAGE_CLASSIFICATION) | |
.setTypes(classOf[Image], classOf[Classifications]) | |
// ------------------------------------ | |
// .optFilter("flavor","v1") | |
// .optFilter("dataset","cifar10") | |
// ------------------------------------ | |
// .optFilter("flavor","v3_large") | |
// .optFilter("dataset","imagenet") | |
// ------------------------------------ | |
.optFilter("flavor", "v1d") | |
.optFilter("dataset", "imagenet") | |
.optProgress(new ProgressBar) | |
.build | |
val imageClassificationModel = ModelZoo.loadModel(imageClassificationCriteria) | |
val imageClassificationPredictor = imageClassificationModel.newPredictor() | |
def cleanupClassName(input: String): String = | |
input.replaceAll("""^n\d+ """, "") | |
def classifyImage(path: Path): List[String] = { | |
val img = ImageFactory.getInstance().fromFile(path) | |
val found: Classifications = imageClassificationPredictor.predict(img) | |
found | |
.items() | |
.asScala | |
.toList | |
.asInstanceOf[List[Classification]] | |
.filter(_.getProbability > 0.5d) | |
.map(_.getClassName) | |
.map(cleanupClassName) | |
.flatMap(_.split(""",\s+""")) | |
.distinct | |
} | |
} | |
// ===================================================================================================================== | |
object Photos extends ZIOAppDefault { | |
val generatorPUUID = Generators.nameBasedGenerator() | |
/* Attempt to generate a unique photo identifier */ | |
def makePUUID(camera: Option[String], shootDateTime: Option[Instant], filePath: Path, fileHash: String): UUID = { | |
// generatorPUUID.generate(filePath.getFileName().toString + shootDateTime.map(_.toString).getOrElse("")) | |
generatorPUUID.generate(filePath.toString) | |
} | |
case class GeoPoint(lat: Double, lon: Double) derives JsonCodec | |
case class Photo( | |
uuid: UUID, | |
timestamp: OffsetDateTime, | |
filePath: Path, | |
fileSize: Long, | |
fileHash: String, | |
fileLastUpdated: OffsetDateTime, | |
category: Option[String], | |
shootDateTime: Option[OffsetDateTime], | |
camera: Option[String], | |
tags: Map[String, String], | |
keywords: List[String], | |
classifications: List[String], | |
detectedObjects: List[String], | |
place: Option[GeoPoint] | |
) derives JsonCodec | |
object Photo { | |
implicit val pathEncoder: JsonEncoder[Path] = JsonEncoder[String].contramap(p => p.toString) | |
implicit val pathDecoder: JsonDecoder[Path] = JsonDecoder[String].map(p => Path.of(p)) | |
def makeTagKey(tag: com.drew.metadata.Tag): String = { | |
val prefix = tag.getDirectoryName().trim.replaceAll("""\s+""", "") | |
val name = tag.getTagName().trim.replaceAll("""\s+""", "") | |
val key = s"$prefix$name" | |
key.head.toLower + key.tail | |
} | |
def tagsToMap(tags: List[com.drew.metadata.Tag]): Map[String, String] = { | |
tags | |
.filterNot(_.getDescription == null) | |
.map(tag => makeTagKey(tag) -> tag.getDescription) | |
.toMap | |
} | |
def now = OffsetDateTime.now() // TODO : migrate to ZIO Clock.now | |
def checkTimestampValid(ts: OffsetDateTime) = ts.get(ChronoField.YEAR) >= 2000 & ts.isBefore(now) | |
def computeTimestamp(mayBeShootDateTime: Option[OffsetDateTime], fileLastUpdated: OffsetDateTime): OffsetDateTime = | |
mayBeShootDateTime match | |
case Some(shootDateTime) if checkTimestampValid(shootDateTime) => shootDateTime | |
case _ => fileLastUpdated | |
def makePhoto( | |
uuid: UUID, | |
filePath: Path, | |
fileSize: Long, | |
fileHash: String, | |
fileLastUpdated: Instant, | |
category: Option[String], | |
shootDateTime: Option[Instant], | |
camera: Option[String], | |
metaDataTags: List[com.drew.metadata.Tag], | |
keywords: List[String], // Extracted from category | |
classifications: List[String], // Extracted from AI DJL | |
detectedObjects: List[String] // Extracted from AI DJL | |
): Photo = { | |
val shootOffsetDateTime = shootDateTime.map(_.atOffset(ZoneOffset.UTC)) | |
val fileLastUpdatedOffsetDateTime = fileLastUpdated.atOffset(ZoneOffset.UTC) | |
val tags = tagsToMap(metaDataTags) | |
Photo( | |
uuid = uuid, | |
timestamp = computeTimestamp(shootOffsetDateTime, fileLastUpdatedOffsetDateTime), | |
filePath = filePath, | |
fileSize = fileSize, | |
fileHash = fileHash, | |
fileLastUpdated = fileLastUpdatedOffsetDateTime, | |
category = category, | |
shootDateTime = shootOffsetDateTime, | |
camera = camera, | |
tags = tags, | |
keywords = keywords, | |
classifications = classifications, | |
detectedObjects = detectedObjects, | |
place = computeGeoPoint(tags) | |
) | |
} | |
} | |
case class PhotoFileIssue(filepath: Path, throwable: Throwable) | |
case class PhotoFileContentIssue(filepath: Path, throwable: Throwable) | |
case class PhotoMetadataIssue(filepath: Path, throwable: Throwable) | |
case class PhotoUUIDIssue(filepath: Path, throwable: Throwable) | |
case class PhotoAIIssue(filepath: Path, throwable: Throwable) | |
type PhotoIssue = PhotoFileContentIssue | PhotoMetadataIssue | PhotoUUIDIssue | PhotoFileIssue | PhotoAIIssue | |
def categoryFromFilepath(filePath: Path, searchPath: Path): Option[String] = | |
Option(filePath.getParent) | |
.map(parent => searchPath.relativize(parent)) | |
.map(_.toString) | |
def camelTokenize(that: String): Array[String] = that.split("(?=[A-Z][^A-Z])|(?:(?<=[^A-Z])(?=[A-Z]+))") | |
def camelToKebabCase(that: String): String = camelTokenize(that).map(_.toLowerCase).mkString("-") | |
val excludes = Set("et", "par", "le", "la", "de", "du", "au", "aux", "pour", "à", "a", "les", "des", "avec", "du", "dans", "sur", "d") | |
def extractKeywords(input: Option[String]): List[String] = | |
input match { | |
case None => Nil | |
case Some(category) => | |
category | |
.split("[- /]+") | |
.toList | |
.filter(_.trim.size > 0) | |
.filterNot(_.matches("^[0-9]+$")) | |
.filterNot(_.contains("'")) | |
.flatMap(key => camelToKebabCase(key).split("-")) | |
.map(_.toLowerCase) | |
.filter(_.size > 1) | |
.filterNot(key => excludes.contains(key)) | |
} | |
/* | |
tags.gPSGPSLatitude : 45° 19' 19,29" | |
tags.gPSGPSLatitudeRef : N | |
tags.gPSGPSLongitude : 6° 32' 39,47" | |
tags.gPSGPSLongitudeRef : E | |
*/ | |
val dmsRE = """[-+]?(\d+)[°]\s*(\d+)['′]\s*(\d+(?:[.,]\d+)?)(?:(?:")|(?:'')|(?:′′)|(?:″))""".r | |
def convert(d: Double, m: Double, s: Double): Double = d + m / 60d + s / 3600d | |
def degreesMinuteSecondsToDecimalDegrees( | |
dms: String, | |
ref: String | |
): Try[Double] = Try { | |
val dd = dms.trim match { | |
case dmsRE(d, m, s) => convert(d.toDouble, m.toDouble, s.replaceAll(",", ".").toDouble) | |
} | |
if ("NE".contains(ref.trim.toUpperCase.head)) dd else -dd | |
} | |
def computeGeoPoint(photoTags: Map[String, String]): Option[GeoPoint] = | |
// Degrees Minutes Seconds to Decimal Degrees | |
for { | |
latitude <- photoTags.get("gPSGPSLatitude") | |
latitudeRef <- photoTags.get("gPSGPSLatitudeRef") | |
longitude <- photoTags.get("gPSGPSLongitude") | |
longitudeRef <- photoTags.get("gPSGPSLongitudeRef") | |
lat <- degreesMinuteSecondsToDecimalDegrees(latitude, latitudeRef).toOption // TODO enhance error processing | |
lon <- degreesMinuteSecondsToDecimalDegrees(longitude, longitudeRef).toOption // TODO enhance error processing | |
} yield GeoPoint(lat, lon) | |
def makePhoto(searchPath: Path, filePath: Path) = | |
for | |
metadataEither <- ZIO | |
.attemptBlockingIO(ImageMetadataReader.readMetadata(filePath.toFile)) | |
.tapError(th => ZIO.logWarning(s"readMetadata issue with $filePath : ${th.getMessage}")) | |
.either | |
exifSubIFD = metadataEither.toOption.flatMap(metaData => Option(metaData.getFirstDirectoryOfType(classOf[ExifSubIFDDirectory]))) | |
exifIFD0 = metadataEither.toOption.flatMap(metaData => Option(metaData.getFirstDirectoryOfType(classOf[ExifIFD0Directory]))) | |
shootDateTime = exifIFD0.flatMap(dir => Option(dir.getDate(ExifDirectoryBase.TAG_DATETIME))).map(_.toInstant) | |
camera = exifIFD0.flatMap(dir => Option(dir.getString(ExifDirectoryBase.TAG_MODEL))) | |
fileSize <- ZIO.attemptBlockingIO(filePath.toFile.length()).mapError(th => PhotoFileIssue(filePath, th)) | |
fileLastUpdated <- ZIO.attemptBlockingIO(filePath.toFile.lastModified()).mapAttempt(Instant.ofEpochMilli).mapError(th => PhotoFileIssue(filePath, th)) | |
fileHash <- ZIO.attemptBlockingIO(HashOps.fileDigest(filePath)).mapError(th => PhotoFileContentIssue(filePath, th)) | |
category = categoryFromFilepath(filePath, searchPath) | |
metaDirectories = metadataEither.map(_.getDirectories.asScala).getOrElse(Nil) | |
metaDataTags = metaDirectories.flatMap(dir => dir.getTags.asScala).toList | |
puuid <- ZIO.attemptBlockingIO(makePUUID(camera, shootDateTime, filePath, fileHash)).mapError(th => PhotoUUIDIssue(filePath, th)) | |
classificationsEither <- ZIO | |
.attemptBlockingIO(AIPhotos.classifyImage(filePath)) | |
.tapError(th => ZIO.logWarning(s"classifyImage issue with $filePath : ${th.getMessage}")) | |
.either | |
detectedObjectsEither <- ZIO | |
.attemptBlockingIO(AIPhotos.detectedObjects(filePath)) | |
.tapError(th => ZIO.logWarning(s"detectedImage issue with $filePath : ${th.getMessage}")) | |
.either | |
classifications = classificationsEither.toOption.getOrElse(Nil) | |
detectedObjects = detectedObjectsEither.toOption.getOrElse(Nil) | |
keywords = extractKeywords(category) | |
_ <- Console.printLine( | |
s"$filePath - $RED${camera.getOrElse("")}$RESET - $YELLOW${keywords.mkString(",")}$RESET - $GREEN${classifications.mkString(",")}$RESET - $BLUE${detectedObjects | |
.mkString(",")}$RESET - $RED$shootDateTime$RESET - $fileLastUpdated" | |
) | |
photo = Photo.makePhoto( | |
uuid = puuid, | |
filePath = filePath, | |
fileSize = fileSize, | |
fileHash = fileHash, | |
fileLastUpdated = fileLastUpdated, | |
category = category, | |
shootDateTime = shootDateTime, | |
camera = camera, | |
metaDataTags = metaDataTags, | |
keywords = keywords, | |
classifications = classifications, | |
detectedObjects = detectedObjects | |
) | |
// _ <- ZIO.logInfo(s"processed $photo") | |
yield photo | |
// ------------------------------------------------------------------------------------------------------------------- | |
def searchPredicate(includeMaskRegex: Option[Regex], ignoreMaskRegex: Option[Regex])(path: Path, attrs: BasicFileAttributes): Boolean = { | |
attrs.isRegularFile && | |
(ignoreMaskRegex.isEmpty || ignoreMaskRegex.get.findFirstIn(path.toString).isEmpty) && | |
(includeMaskRegex.isEmpty || includeMaskRegex.get.findFirstIn(path.toString).isDefined) | |
} | |
def findFromSearchRoot( | |
searchRoot: Path, | |
includeMaskRegex: Option[Regex], | |
ignoreMaskRegex: Option[Regex] | |
) = { | |
val result = for { | |
searchPath <- ZIO.attempt(searchRoot) | |
javaStream = Files.find(searchPath, 10, searchPredicate(includeMaskRegex, ignoreMaskRegex)) | |
pathStream = ZStream.fromJavaStream(javaStream).map(path => searchRoot -> path) | |
} yield pathStream | |
ZStream.unwrap(result) | |
} | |
def fetch() = { | |
val result = for { | |
_ <- ZIO.logInfo("photos inventory") | |
searchRoots <- System | |
.env("PHOTOS_SEARCH_ROOTS") | |
.someOrFail("nowhere to search") | |
.map(_.split("[,;]").toList.map(_.trim)) | |
includeMask <- System.env("PHOTOS_SEARCH_INCLUDE_MASK") | |
includeMaskRegex <- ZIO.attempt(includeMask.map(_.r)) | |
ignoreMask <- System.env("PHOTOS_SEARCH_IGNORE_MASK") | |
ignoreMaskRegex <- ZIO.attempt(ignoreMask.map(_.r)) | |
searchRootsStreams = Chunk.fromIterable(searchRoots).map(searchRoot => findFromSearchRoot(Path.of(searchRoot), includeMaskRegex, ignoreMaskRegex)) | |
zCandidatesStream = ZStream.concatAll(searchRootsStreams) | |
} yield zCandidatesStream | |
ZStream.unwrap(result) | |
} | |
def run = for { | |
started <- Clock.instant | |
_ <- Console.printLine(s"${GREEN}Synchronizing photos database$RESET") | |
alreadyPublished <- ElasticOps.fetchAll[Photo]("photos-*").runCollect | |
_ <- Console.printLine(s"$YELLOW${alreadyPublished.size} photos already published$RESET") | |
excludePaths = alreadyPublished.map(_.filePath.normalize().toString).toSet | |
results <- fetch() // AI DJL External native libraries are not thread safe :( | |
.filterNot((searchPath, path) => excludePaths.contains(path.normalize().toString)) | |
.mapZIOParUnordered(1)((searchPath, path) => makePhoto(searchPath, path)) | |
.grouped(ElasticOps.upsertGrouping) | |
.mapZIOParUnordered(1)(group => | |
ElasticOps.upsert[Photo]("photos", group)( | |
timestampExtractor = photo => photo.timestamp, | |
idExtractor = photo => photo.uuid.toString | |
) | |
) | |
.runDrain | |
.tapError(err => Console.printLine(err)) | |
finished <- Clock.instant | |
duration = finished.getEpochSecond - started.getEpochSecond | |
_ <- Console.printLine(s"${GREEN}Synchronize operations done in $duration seconds$RESET") | |
} yield () | |
} | |
Photos.main(Array.empty) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment