Using spark.kubernetes.driver.volumes...
configurations:
spark.kubernetes.executor.volumes.hostPath.spark-local-dir-tmp1.mount.path=/tmp/data1
spark.kubernetes.executor.volumes.hostPath.spark-local-dir-tmp1.mount.readOnly=false
spark.kubernetes.executor.volumes.hostPath.spark-local-dir-tmp1.options.path=/data1
spark.kubernetes.executor.volumes.hostPath.spark-local-dir-tmp2.mount.path=/tmp/data2
spark.kubernetes.executor.volumes.hostPath.spark-local-dir-tmp2.mount.readOnly=false
spark.kubernetes.executor.volumes.hostPath.spark-local-dir-tmp2.options.path=/data2
KubernetesVolumeUtils.scala:
def parseVolumesWithPrefix(sparkConf: SparkConf, prefix: String): Seq[KubernetesVolumeSpec] = {
val properties = sparkConf.getAllWithPrefix(prefix).toMap
getVolumeTypesAndNames(properties).map { case (volumeType, volumeName) =>
val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_PATH_KEY"
val readOnlyKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_READONLY_KEY"
val subPathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY"
KubernetesVolumeSpec(
volumeName = volumeName,
mountPath = properties(pathKey),
mountSubPath = properties.get(subPathKey).getOrElse(""),
mountReadOnly = properties.get(readOnlyKey).exists(_.toBoolean),
volumeConf = parseVolumeSpecificConf(properties, volumeType, volumeName))
}.toSeq
}
private def parseVolumeSpecificConf(
options: Map[String, String],
volumeType: String,
volumeName: String): KubernetesVolumeSpecificConf = {
volumeType match {
case KUBERNETES_VOLUMES_HOSTPATH_TYPE =>
val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_PATH_KEY"
KubernetesHostPathVolumeConf(options(pathKey))
case KUBERNETES_VOLUMES_PVC_TYPE =>
val claimNameKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY"
KubernetesPVCVolumeConf(options(claimNameKey))
case KUBERNETES_VOLUMES_EMPTYDIR_TYPE =>
val mediumKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY"
val sizeLimitKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY"
KubernetesEmptyDirVolumeConf(options.get(mediumKey), options.get(sizeLimitKey))
case _ =>
throw new IllegalArgumentException(s"Kubernetes Volume type `$volumeType` is not supported")
}
}
k8s/Config.scala:
val KUBERNETES_VOLUMES_HOSTPATH_TYPE = "hostPath"
val KUBERNETES_VOLUMES_PVC_TYPE = "persistentVolumeClaim"
val KUBERNETES_VOLUMES_EMPTYDIR_TYPE = "emptyDir"
val KUBERNETES_VOLUMES_MOUNT_PATH_KEY = "mount.path"
val KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY = "mount.subPath"
val KUBERNETES_VOLUMES_MOUNT_READONLY_KEY = "mount.readOnly"
val KUBERNETES_VOLUMES_OPTIONS_PATH_KEY = "options.path"
val KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY = "options.claimName"
val KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY = "options.medium"
val KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY = "options.sizeLimit"
- name: spark-local-dir-tmp
persistentVolumeClaim:
claimName: shuffle-pvc-claim
volumeMounts:
- name: spark-local-dir-tmp
mountPath: /shuffletmp
env:
- name: SPARK_LOCAL_DIRS
value: /shuffletmp
Pod is pending, Shoud I make the PVC ?
conditions:
- type: PodScheduled
status: 'False'
lastProbeTime: null
lastTransitionTime: '2020-05-21T07:24:20Z'
reason: Unschedulable
message: persistentvolumeclaim "shuffle-pvc-claim" not found
TBD