I am newbie on spark and minikube. I faced the problem while running spark job in sparkapplication.yaml, spark driver and executor s' are created successfully but each of them does not mount hostPath. I referred Tom Louis's minikube-spark example. Everything runs fine if I put data into the sparkjob image file directly via Dockfile// COPY ~~//.
Currently, data(*.csv) are in localFolder - (mounted) - minikube - (not mounted) - spark driver Pod.
I don't know why hostPath does not mounted, there might be some error I did^^; Anybody can take a look into my problem? Appreciated..!
template/sparkapplication.yaml
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: {{ .Release.Name | trunc 63 }}
labels:
chartname: {{ .Chart.Name | trunc 63 | quote }}
release: {{ .Release.Name | trunc 63 | quote }}
revision: {{ .Release.Revision | quote }}
sparkVersion: {{ .Values.sparkVersion | quote }}
version: {{ .Chart.Version | quote }}
spec:
type: Scala
mode: cluster
image: {{ list .Values.imageRegistry .Values.image | join "/" | quote }}
imagePullPolicy: {{ .Values.imagePullPolicy }}
{{- if .Values.imagePullSecrets }}
imagePullSecrets:
{{- range .Values.imagePullSecrets }}
- {{ . | quote }}
{{- end }}
{{- end }}
mainClass: {{ .Values.mainClass | quote }}
mainApplicationFile: {{ .Values.jar | quote }}
{{- if .Values.arguments }}
arguments:
{{- range .Values.arguments }}
- {{ . | quote }}
{{- end }}
{{- end }}
sparkVersion: {{ .Values.sparkVersion | quote }}
restartPolicy:
type: Never
{{- if or .Values.jarDependencies .Values.fileDependencies .Values.sparkConf .Values.hadoopConf }}
deps:
{{- if .Values.jarDependencies }}
jars:
{{- range .Values.jarDependencies }}
- {{ . | quote }}
{{- end }}
{{- end }}
{{- if .Values.fileDependencies }}
files:
{{- range .Values.fileDependencies }}
- {{ . | quote }}
{{- end }}
{{- end }}
{{- if .Values.sparkConf }}
sparkConf:
{{- range $conf, $value := .Values.sparkConf }}
{{ $conf | quote }}: {{ $value | quote }}
{{- end }}
{{- end }}
{{- if .Values.hadoopConf }}
hadoopConf:
{{- range $conf, $value := .Values.hadoopConf }}
{{ $conf | quote }}: {{ $value | quote }}
{{- end }}
{{- end }}
{{- end }}
driver:
{{- if .Values.envSecretKeyRefs }}
envSecretKeyRefs:
{{- range $name, $value := .Values.envSecretKeyRefs }}
{{ $name }}:
name: {{ $value.name}}
key: {{ $value.key}}
{{- end }}
{{- end }}
{{- if .Values.envVars }}
envVars:
{{- range $name, $value := .Values.envVars }}
{{ $name }}: {{ $value | quote }}
{{- end }}
{{- end }}
securityContext:
runAsUser: {{ .Values.userId }}
cores: {{ .Values.driver.cores }}
coreLimit: {{ .Values.driver.coreLimit | default .Values.driver.cores | quote }}
memory: {{ .Values.driver.memory }}
hostNetwork: {{ .Values.hostNetwork }}
labels:
release: {{ .Release.Name | trunc 63 | quote }}
revision: {{ .Release.Revision | quote }}
sparkVersion: {{ .Values.sparkVersion | quote }}
version: {{ .Chart.Version | quote }}
serviceAccount: {{ .Values.serviceAccount }}
{{- if .Values.javaOptions }}
javaOptions: {{ .Values.javaOptions | quote}}
{{- end }}
{{- if .Values.mounts }}
volumeMounts:
{{- range $name, $path := .Values.mounts }}
- name: {{ $name }}
mountPath: {{ $path }}
{{- end }}
{{- end }}
{{- if .Values.tolerations }}
tolerations:
{{ toYaml .Values.tolerations | indent 6 }}
{{- end }}
executor:
{{- if .Values.envVars }}
envVars:
{{- range $name, $value := .Values.envVars }}
{{ $name | quote }}: {{ $value | quote }}
{{- end }}
{{- end }}
securityContext:
runAsUser: {{ .Values.userId }}
cores: {{ .Values.executor.cores }}
coreLimit: {{ .Values.executor.coreLimit | default .Values.executor.cores | quote }}
instances: {{ .Values.executor.instances }}
memory: {{ .Values.executor.memory }}
labels:
release: {{ .Release.Name | trunc 63 | quote }}
revision: {{ .Release.Revision | quote }}
sparkVersion: {{ .Values.sparkVersion | quote }}
version: {{ .Chart.Version | quote }}
serviceAccount: {{ .Values.serviceAccount }}
{{- if .Values.javaOptions }}
javaOptions: {{ .Values.javaOptions }}
{{- end }}
{{- if .Values.mounts }}
volumeMounts:
{{- range $name, $path := .Values.mounts }}
- name: {{ $name }}
mountPath: {{ $path }}
{{- end }}
{{- end }}
{{- if .Values.tolerations }}
tolerations:
{{ toYaml .Values.tolerations | indent 6 }}
{{- end }}
{{- if .Values.jmxExporterJar }}
monitoring:
exposeDriverMetrics: true
exposeExecutorMetrics: true
prometheus:
port: {{ .Values.jmxPort | default 8090 }}
jmxExporterJar: {{ .Values.jmxExporterJar }}
{{- end }}
{{- if .Values.volumes }}
volumes:
- name: input-data
hostPath:
path: /input-data
- name: output-data
hostPath:
path: /output-data
{{- end }}
{{- if .Values.nodeSelector }}
nodeSelector:
{{ toYaml .Values.nodeSelector | indent 4 }}
{{- end }}
values.yaml
# Generated by build.sbt. Please don't manually update
version: 0.1
sparkVersion: 3.0.2
image: kaspi/kaspi-sparkjob:0.1
jar: local:///opt/spark/jars/kaspi-kaspi-sparkjob.jar
mainClass: kaspi.sparkjob
fileDependencies: []
environment: minikube
serviceAccount: spark-spark
imageRegistry: localhost:5000
arguments:
- "/mnt/data-in/"
- "/mnt/data-out/"
volumes:
- name: input-data
hostPath:
path: /input-data
- name: output-data
hostPath:
path: /output-data
mounts:
input-data: /mnt/data-in
output-data: /mnt/data-out
driver:
cores: 1
memory: "2g"
executor:
instances: 2
cores: 1
memory: "1g"
hadoopConf:
sparkConf:
hostNetwork: false
imagePullPolicy: Never
userId: 0
build.sbt
val sparkVersion = "3.0.2"
val sparkLibs = Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion,
"org.apache.spark" %% "spark-mllib" % sparkVersion
)
lazy val commonSettings = Seq(
organization := "kaspi",
scalaVersion := "2.12.13",
version := "0.1",
libraryDependencies ++= sparkLibs
)
val domain = "kaspi"
// for building FAT jar
lazy val assemblySettings = Seq(
assembly / assemblyOption := (assemblyOption in assembly).value.copy(includeScala = false),
assembly / assemblyOutputPath := baseDirectory.value / "output" / s"${domain}-${name.value}.jar"
)
val targetDockerJarPath = "/opt/spark/jars"
val baseRegistry = sys.props.getOrElse("baseRegistry", default = "localhost:5000")
// for building docker image
lazy val dockerSettings = Seq(
imageNames in docker := Seq(
ImageName(s"$domain/${name.value}:latest"),
ImageName(s"$domain/${name.value}:${version.value}"),
),
buildOptions in docker := BuildOptions(
cache = false,
removeIntermediateContainers = BuildOptions.Remove.Always,
pullBaseImage = BuildOptions.Pull.Always
),
dockerfile in docker := {
// The assembly task generates a fat JAR file
val artifact: File = assembly.value
val artifactTargetPath = s"$targetDockerJarPath/$domain-${name.value}.jar"
new Dockerfile {
from(s"$baseRegistry/spark-runner:0.1")
}.add(artifact, artifactTargetPath)
}
)
// Include "provided" dependencies back to default run task
lazy val runLocalSettings = Seq(
// https://stackoverflow.com/questions/18838944/how-to-add-provided-dependencies-back-to-run-test-tasks-classpath/21803413#21803413
Compile / run := Defaults
.runTask(
fullClasspath in Compile,
mainClass in (Compile, run),
runner in (Compile, run)
)
.evaluated
)
lazy val root = (project in file("."))
.enablePlugins(sbtdocker.DockerPlugin)
.enablePlugins(AshScriptPlugin)
.settings(
commonSettings,
assemblySettings,
dockerSettings,
runLocalSettings,
name := "kaspi-sparkjob",
Compile / mainClass := Some("kaspi.sparkjob"),
Compile / resourceGenerators += createImporterHelmChart.taskValue
)
// Task to create helm chart
lazy val createImporterHelmChart: Def.Initialize[Task[Seq[File]]] = Def.task {
val chartFile = baseDirectory.value / "helm" / "Chart.yaml"
val valuesFile = baseDirectory.value / "helm" / "values.yaml"
val chartContents =
s"""# Generated by build.sbt. Please don't manually update
|apiVersion: v1
|name: $domain-${name.value}
|version: ${version.value}
|appVersion: ${version.value}
|description: ETL Job
|home: https://github.com/jyyoo0530/kaspi
|sources:
| - https://github.com/jyyoo0530/kaspi
|maintainers:
| - name: Jeremy Yoo
| email: jyyoo0530@gmail.com
| url: https://www.linkedin.com/in/jeeyoungyoo
|""".stripMargin
val valuesContents =
s"""# Generated by build.sbt. Please don't manually update
|version: ${version.value}
|sparkVersion: ${sparkVersion}
|image: $domain/${name.value}:${version.value}
|jar: local://$targetDockerJarPath/$domain-${name.value}.jar
|mainClass: ${(Compile / run / mainClass).value.getOrElse("__MAIN_CLASS__")}
|fileDependencies: []
|environment: minikube
|serviceAccount: spark-spark
|imageRegistry: localhost:5000
|arguments:
| - "/mnt/data-in/"
| - "/mnt/data-out/"
|volumes:
| - name: input-data
| hostPath:
| path: /input-data
| - name: output-data
| hostPath:
| path: /output-data
|mounts:
| input-data: /mnt/data-in
| output-data: /mnt/data-out
|driver:
| cores: 1
| memory: "2g"
|executor:
| instances: 2
| cores: 1
| memory: "1g"
|hadoopConf:
|sparkConf:
|hostNetwork: false
|imagePullPolicy: Never
|userId: 0
|""".stripMargin
IO.write(chartFile, chartContents)
IO.write(valuesFile, valuesContents)
Seq(chartFile, valuesFile)
}
lazy val showVersion = taskKey[Unit]("Show version")
showVersion := {
println((version).value)
}
assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
******2021/2/25 UPDATES ********
I tried below yaml for test purpose, then volume from hostpath mounted successfully in Pod. There are no differences, but the object characteristic is different, one is "container", one is "driver","executor"...etc. (Same problem happened while using gaffer-hdfs which k8s object name is "namenode", "datanode"...etc). Can it be a problem using custom kubernetes object name?? But if it is still inherited container properties,,, there is no reason to not to be mounted. .... so.... still struggling..! :)
apiVersion: v1
kind: Pod
metadata:
name: hostpath
namespace: spark-apps
spec:
containers:
- name: nginx
image: nginx
volumeMounts:
- name: volumepath
mountPath: /mnt/data
volumes:
- name: volumepath
hostPath:
path: /input-data
type: Directory