Last active
February 2, 2019 01:27
-
-
Save mmlac/136b95ae0edcd914027ec280a8c882b4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* MIT LICENSE | |
* | |
* Copyright 2019 Markus Lachinger | |
* | |
* Permission is hereby granted, free of charge, to any person obtaining a copy of | |
* this software and associated documentation files (the "Software"), to deal in | |
* the Software without restriction, including without limitation the rights to | |
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of | |
* the Software, and to permit persons to whom the Software is furnished to do so, | |
* subject to the following conditions: | |
* | |
* The above copyright notice and this permission notice shall be included in all | |
* copies or substantial portions of the Software. | |
* | |
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS | |
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR | |
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER | |
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN | |
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. | |
* | |
*/ | |
/* This is just a simple wrapper for the kubernetes-java library, written in Scala, to | |
* deal with Watches. | |
* | |
* I would not recommend using this if you don't already have committed to the Java library, as | |
* skuber makes this watch much more elegant using Akka Streams. Anyway, I figured this might be | |
* useful for someone. | |
* | |
* The Future will eventually end up on the OkHttp thread, so shouldn't be blocked when returned. | |
*/ | |
import io.kubernetes.client.ApiClient | |
import io.kubernetes.client.custom.Quantity | |
import io.kubernetes.client.models._ | |
import io.kubernetes.client.util.{Config, Watch} | |
import io.kubernetes.client.apis.{CoreV1Api => JavaCoreV1Api} | |
// And some more, I used Finagle for the Futurepool, etc | |
/** | |
* Generic Watch "waiting" implementation for the Kubernetes Java Client | |
* | |
* The hasNext call of the iterator is blocking, so we expect the [[apiClient.getHttpClient.getReadTimeout]] | |
* to be a reasonably low number (i.e. 30s) to either restart the Watch or realize the task has been interrupted | |
* and we should not keep watching. | |
* | |
* | |
* @param apiClient client | |
* @param call Function producing a new Call to execute. Parameter is whether this is a 'watch' Call or not. | |
* Watch calls are made when we are waiting for progress, a "regular" call is made after a | |
* watch is restarted to make sure nothing happened in between the two watches that made the | |
* watch succeed / fail | |
* @param validate Validation function for watch progress. | |
* None means everything is still pending / waiting, | |
* Some(Left(exception)) means what we wanted to do failed, i.e. no PV available, no auto provisioning, etc | |
* Some(Right(object)) means what we were watching for was successful | |
* @param watchTypeToken Type Token for GSON to decode the Watch response | |
* @param callTypeToken Type Token for GSON to decode the 'normal' call. | |
* Both of these token need to be supplied into the call is type erasure | |
* will not allow to create them correctly from a [[T]] | |
* @tparam T Type of the return object of the Watch | |
* @return On success or failure, Future of the succeeding / failing object. Otherwise throws an exception into the Future | |
*/ | |
def watcher[T](apiClient: ApiClient, | |
call: Boolean => Call, | |
validate: T => Option[Either[Exception, T]], | |
watchTypeToken: TypeToken[Watch.Response[T]], | |
callTypeToken: TypeToken[T] | |
) : Future[T] = | |
Concurrency.kubernetesFuturePool({ // generic interruptable Finagle Futurepool | |
def watchOut: T = { | |
if (Thread.currentThread().isInterrupted) throw new InterruptedException("Thread was interrupted") | |
val watch = Watch.createWatch[T]( | |
apiClient, | |
call(true), | |
watchTypeToken.getType()) | |
//Make blocking call to make sure we didn't miss a success / fail state between Watch restarts | |
validate(apiClient.execute[T](call(false), callTypeToken.getType).getData) match { | |
case Some(Left(ex)) => throw ex | |
case Some(Right(successfulObject)) => successfulObject | |
case None => | |
//Evaluate the watch | |
try { | |
// Drop all data that doesn't either succeed or fail (i.e. everything is still just pending) | |
val iterator = watch.iterator().asScala.map(_.`object`).dropWhile(validate(_).isEmpty) | |
Try{validate(iterator.next).get} match { | |
case Failure(a) => | |
try{watch.close} catch {case NonFatal(ex)=>} | |
watchOut | |
case Success(Left(ex)) => throw ex | |
case Success(Right(successfulObject)) => successfulObject | |
} | |
} catch { | |
case timeout: RuntimeException if timeout.getCause.isInstanceOf[java.net.SocketTimeoutException] => | |
watchOut | |
} finally { | |
try{watch.close} catch {case NonFatal(_) =>} // ignore Watch close exception | |
} | |
} | |
} | |
watchOut | |
}) | |
} | |
// Example for creating a PVC | |
def watchPVCBinding(apiClient: ApiClient, | |
name: String, | |
namespace: String | |
) : Future[V1PersistentVolumeClaim] = | |
{ | |
def pvcCall(client: JavaCoreV1Api)(watch: Boolean) = | |
client.listNamespacedPersistentVolumeClaimCall( | |
namespace, "false", null, s"metadata.name=$name", true, | |
null, 1, null, null, watch, | |
null, null) | |
def watchPVCBound(pvc: V1PersistentVolumeClaim): Option[Either[Exception, V1PersistentVolumeClaim]] = | |
Option(pvc.getStatus) match { | |
case None => None | |
case Some(status) => | |
status.getPhase match { | |
case "Bound" => Some(Right(pvc)) | |
case _ => None | |
} | |
} | |
watcher[V1PersistentVolumeClaim]( | |
apiClient, pvcCall(new JavaCoreV1Api(apiClient)), watchPVCBound, | |
new TypeToken[Watch.Response[V1PersistentVolumeClaim]](){}, | |
new TypeToken[V1PersistentVolumeClaim](){}) | |
} | |
def createPVC( | |
apiClient: ApiClient, | |
namespace: String, | |
claimName: String, | |
accessMode: String, // change to access mode object | |
size: String, | |
storageClass: String, | |
deadline: Instant = Instant.now.plusSeconds(30L), | |
watchApiClient: ApiClient = volumeWatchClient | |
) = { | |
val waitFor = Duration(Instant.now.until(deadline, ChronoUnit.SECONDS), TimeUnit.SECONDS) | |
val watch = | |
watchPVCBinding(apiClient, claimName, namespace) | |
.raiseWithin(waitFor) | |
// here is how you define how long to maximally wait before giving up and failing | |
// this will also set the futurepool thread used for waiting to interrupted and therefore | |
// not renewing the watch as soon as it times out. Even if the watch would produce a result | |
// after the raise, it wouldn't be set as the Future was already fulfilled | |
val create = createPVCAsync(apiClient, namespace, claimName, accessMode, size, storageClass) | |
Future.collect(Seq(create, watch)) | |
// simple way of immediately exiting if the create call didn't succeed | |
// won't raise the watch thread early | |
} | |
def createPVCAsync(apiClient: ApiClient, | |
namespace: String, | |
claimName: String, | |
accessMode: String, // change to access mode object | |
size: String, | |
storageClass: String | |
) = { | |
val meta = new V1ObjectMeta() | |
.namespace(namespace) | |
.name(claimName) | |
val spec = new V1PersistentVolumeClaimSpec() | |
.accessModes(List("ReadWriteOnce").asJava) | |
.volumeMode("Filesystem") | |
.resources(new V1ResourceRequirements().requests(Map("storage" -> new Quantity(size)) asJava)) | |
.storageClassName("standard") | |
val promise = Promise[V1PersistentVolumeClaim]() | |
new JavaCoreV1Api(apiClient) | |
.createNamespacedPersistentVolumeClaimAsync( | |
namespace, | |
new V1PersistentVolumeClaim().metadata(meta).spec(spec), | |
"false", | |
FACB[V1PersistentVolumeClaim](promise)) // Just a callback wrapper to set the promise | |
promise | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment