Created
July 10, 2015 01:59
-
-
Save dbousamra/2d955936388646a79b03 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package com.cammy.firehose.uploader | |
| import java.io.File | |
| import java.text.DecimalFormat | |
| import com.cammy.firehose.config.Configuration | |
| import com.cammy.firehose.ftp.user.impl.CameraDetails | |
| import com.cammy.firehose.uploader.domain._ | |
| import com.cammy.firehose.uploader.imaging.Resizer | |
| import com.cammy.firehose.uploader.storage.{StorageLocation, RemoteFilename, Container, Storage} | |
| import com.cammy.metrics.MetricsCollector | |
| import com.typesafe.scalalogging.LazyLogging | |
| import org.joda.time.Duration | |
| import scala.concurrent.{ExecutionContext, Await, Future} | |
| import scala.concurrent.blocking | |
| import scalaz._ | |
| import Scalaz._ | |
| // TODO - Fix the abuse of the suffix URL. I am bad | |
| // TODO - Fix the RemoteFilename stuff | |
| case class CammyCameraSnapshotUploader(resizer: Resizer, storage: Storage, metrics: MetricsCollector)(implicit val ec: ExecutionContext) extends CameraSnapshotUploader with LazyLogging { | |
| // TODO: Fix the buffer | |
| def uploadCameraSnapshot(file: File, cameraDetails: CameraDetails): Future[UploadedCameraSnapshot] = { | |
| // TODO - Pull this out somehow | |
| val defaultBuffer = Duration.standardMinutes(15) | |
| val unuploaded = UnuploadedCameraSnapshot.fromFileAndCameraDetails(file, cameraDetails, defaultBuffer) match { | |
| case \/-(snapshot) => Future.successful(snapshot) | |
| case -\/(error) => Future.failed(handleParsingFileErrors(error)) | |
| } | |
| unuploaded.flatMap(uploadCameraSnapshot) | |
| } | |
| def uploadCameraSnapshot(snapshot: UnuploadedCameraSnapshot): Future[UploadedCameraSnapshot] = { | |
| sendMetrics(snapshot) | |
| val container = Container.fromCameraImageTimestamp(snapshot.timestamps.uploaded.value) | |
| val thumbResize = metrics.timeFuture("resize.thumb") { resizeToThumbnail(snapshot) } | |
| val stdResResize = metrics.timeFuture("resize.stdres") { resizeToStdRes(snapshot) } | |
| // TODO - Fix | |
| val thumbRemoteFilename = RemoteFilename.fromUnuploadedCameraSnapshot(snapshot, true) | |
| val stdResRemoteFilename = RemoteFilename.fromUnuploadedCameraSnapshot(snapshot, false) | |
| val thumbUpload = thumbResize.flatMap { resizedSnapshot => | |
| metrics.timeFuture("upload.thumb") { uploadThumbnail(container, thumbRemoteFilename, resizedSnapshot) } | |
| } | |
| val stdResUpload = stdResResize.flatMap { s => | |
| metrics.timeFuture("upload.stdres") { uploadStdRes(container, stdResRemoteFilename, s) } | |
| } | |
| for { | |
| uploadedStdRes <- stdResUpload | |
| uploadedThumbnail <- thumbUpload | |
| } yield { | |
| metrics.increment("upload.count") | |
| UploadedCameraSnapshot(snapshot, uploadedStdRes, uploadedThumbnail) | |
| } | |
| } | |
| private def resizeToThumbnail(snapshot: UnuploadedCameraSnapshot) = { | |
| Future { blocking { resizer.createThumbnail(snapshot.fullRes, ImageDimensions(Width(360), Height(240))) } } | |
| } | |
| private def resizeToStdRes(snapshot: UnuploadedCameraSnapshot) = { | |
| Future.successful(StdRes(snapshot.fullRes.image)) | |
| } | |
| private def uploadThumbnail(container: Container, filename: RemoteFilename, image: Thumbnail): Future[UploadedThumbnail] = { | |
| uploadCameraImage(container, filename, image).map(url => UploadedThumbnail(image, url)) | |
| } | |
| private def uploadStdRes(container: Container, filename: RemoteFilename, image: StdRes): Future[UploadedStdRes] = { | |
| uploadCameraImage(container, filename, image).map(url => UploadedStdRes(image, url)) | |
| } | |
| private def uploadCameraImage(container: Container, filename: RemoteFilename, image: CameraImage): Future[StorageLocation] = { | |
| Future { | |
| blocking { | |
| storage.uploadFile(container, image.asBytes, filename) | |
| } | |
| }.flatMap { | |
| case \/-(url) => Future.successful(url) | |
| case -\/(error) => Future.failed(error) | |
| } | |
| } | |
| def sendMetrics(snapshot: UnuploadedCameraSnapshot) = { | |
| // Megapixel count to two decimal places, by tag. Eg: 0.3 | |
| metrics.histogram("image.megapixel", snapshot.fullRes.megapixels, s"megapixel:${new DecimalFormat("#.#").format( snapshot.fullRes.megapixels)}") | |
| // Size of image in bytes | |
| metrics.histogram("image.size", snapshot.fullRes.asBytes.length) | |
| // Whether we use canonical or uploaded. | |
| if (snapshot.timestamps.canonical.isDefined) | |
| metrics.increment("image.timestamps.canonical") | |
| else | |
| metrics.increment("image.timestamps.uploaded") | |
| } | |
| def handleParsingFileErrors(error: InvalidSnapshotException): Exception = { | |
| error match { | |
| case CameraIdParsingException(reason) => { | |
| metrics.increment("parsing.camera_id.error") | |
| new Exception(reason) | |
| } | |
| case CTimeOfImageException(reason) => { | |
| metrics.increment("parsing.ctime.error") | |
| new Exception(reason) | |
| } | |
| case TimestampException(reason) => { | |
| metrics.increment("parsing.timestamp.error") | |
| new Exception(reason) | |
| } | |
| case ImageReadingException(reason) => { | |
| metrics.increment("parsing.image.error") | |
| new Exception(reason) | |
| } | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment