4

I am newbie on spark and minikube. I faced the problem while running spark job in sparkapplication.yaml, spark driver and executor s' are created successfully but each of them does not mount hostPath. I referred Tom Louis's minikube-spark example. Everything runs fine if I put data into the sparkjob image file directly via Dockfile// COPY ~~//.

Currently, data(*.csv) are in localFolder - (mounted) - minikube - (not mounted) - spark driver Pod.

I don't know why hostPath does not mounted, there might be some error I did^^; Anybody can take a look into my problem? Appreciated..!

template/sparkapplication.yaml

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name:  {{ .Release.Name | trunc 63 }}
  labels:
    chartname: {{ .Chart.Name | trunc 63 | quote }}
    release: {{ .Release.Name | trunc 63 | quote }}
    revision: {{ .Release.Revision | quote }}
    sparkVersion: {{ .Values.sparkVersion | quote }}
    version: {{ .Chart.Version | quote }}
spec:
  type: Scala
  mode: cluster
  image: {{ list .Values.imageRegistry .Values.image | join "/" | quote }}
  imagePullPolicy: {{ .Values.imagePullPolicy }}
  {{- if .Values.imagePullSecrets }}
  imagePullSecrets:
  {{- range .Values.imagePullSecrets }}
    - {{ . | quote }}
  {{- end }}
  {{- end }}
  mainClass: {{ .Values.mainClass | quote }}
  mainApplicationFile: {{ .Values.jar | quote }}
  {{- if .Values.arguments }}
  arguments:
  {{- range .Values.arguments }}
    - {{ . | quote }}
  {{- end }}
  {{- end }}
  sparkVersion: {{ .Values.sparkVersion | quote }}
  restartPolicy:
    type: Never
  {{- if or .Values.jarDependencies .Values.fileDependencies .Values.sparkConf .Values.hadoopConf }}
  deps:
    {{- if .Values.jarDependencies }}
    jars:
    {{- range .Values.jarDependencies }}
      - {{ . | quote }}
    {{- end }}
    {{- end }}
    {{- if .Values.fileDependencies }}
    files:
    {{- range .Values.fileDependencies }}
      - {{ . | quote }}
    {{- end }}
    {{- end }}
    {{- if .Values.sparkConf }}
    sparkConf:
    {{- range $conf, $value := .Values.sparkConf }}
      {{ $conf | quote }}: {{ $value | quote }}
    {{- end }}
    {{- end }}
    {{- if .Values.hadoopConf }}
    hadoopConf:
    {{- range $conf, $value := .Values.hadoopConf }}
      {{ $conf | quote }}: {{ $value | quote }}
    {{- end }}
    {{- end }}
  {{- end }}
  driver:
    {{- if .Values.envSecretKeyRefs }}
    envSecretKeyRefs:
    {{- range $name, $value := .Values.envSecretKeyRefs }}
      {{ $name }}:
        name: {{ $value.name}}
        key: {{ $value.key}}
    {{- end }}
    {{- end }}
    {{- if .Values.envVars }}
    envVars:
    {{- range $name, $value := .Values.envVars }}
      {{ $name }}: {{ $value | quote }}
    {{- end }}
    {{- end }}
    securityContext:
      runAsUser: {{ .Values.userId }}
    cores: {{ .Values.driver.cores }}
    coreLimit: {{ .Values.driver.coreLimit | default .Values.driver.cores | quote }}
    memory: {{ .Values.driver.memory }}
    hostNetwork: {{ .Values.hostNetwork }}
    labels:
      release: {{ .Release.Name | trunc 63 | quote }}
      revision: {{ .Release.Revision | quote }}
      sparkVersion: {{ .Values.sparkVersion | quote }}
      version: {{ .Chart.Version | quote }}
    serviceAccount: {{ .Values.serviceAccount }}
    {{- if .Values.javaOptions }}
    javaOptions: {{ .Values.javaOptions | quote}}
    {{- end }}
    {{- if .Values.mounts }}
    volumeMounts:
    {{- range $name, $path := .Values.mounts }}
      - name: {{ $name }}
        mountPath: {{ $path }}
    {{- end }}
    {{- end }}
    {{- if .Values.tolerations }}
    tolerations:
{{ toYaml .Values.tolerations | indent 6 }}
    {{- end }}
  executor:
    {{- if .Values.envVars }}
    envVars:
    {{- range $name, $value := .Values.envVars }}
      {{ $name | quote }}: {{ $value | quote }}
    {{- end }}
    {{- end }}
    securityContext:
      runAsUser: {{ .Values.userId }}
    cores: {{ .Values.executor.cores }}
    coreLimit: {{ .Values.executor.coreLimit | default .Values.executor.cores | quote }}
    instances: {{ .Values.executor.instances }}
    memory: {{ .Values.executor.memory }}
    labels:
      release: {{ .Release.Name | trunc 63 | quote }}
      revision: {{ .Release.Revision | quote }}
      sparkVersion: {{ .Values.sparkVersion | quote }}
      version: {{ .Chart.Version | quote }}
    serviceAccount: {{ .Values.serviceAccount }}
    {{- if .Values.javaOptions }}
    javaOptions: {{ .Values.javaOptions }}
    {{- end }}
    {{- if .Values.mounts }}
    volumeMounts:
    {{- range $name, $path := .Values.mounts }}
      - name: {{ $name }}
        mountPath: {{ $path }}
    {{- end }}
    {{- end }}
    {{- if .Values.tolerations }}
    tolerations:
{{ toYaml .Values.tolerations | indent 6 }}
    {{- end }}
  {{- if .Values.jmxExporterJar }}
  monitoring:
    exposeDriverMetrics: true
    exposeExecutorMetrics: true
    prometheus:
      port: {{ .Values.jmxPort | default 8090 }}
      jmxExporterJar: {{ .Values.jmxExporterJar }}
  {{- end }}
  {{- if .Values.volumes }}
  volumes:
    - name: input-data
      hostPath:
        path: /input-data
    - name: output-data
      hostPath:
        path: /output-data
  {{- end }}
  {{- if .Values.nodeSelector }}
  nodeSelector:
{{ toYaml .Values.nodeSelector | indent 4 }}
  {{- end }}

values.yaml

# Generated by build.sbt. Please don't manually update
version: 0.1
sparkVersion: 3.0.2
image: kaspi/kaspi-sparkjob:0.1
jar: local:///opt/spark/jars/kaspi-kaspi-sparkjob.jar
mainClass: kaspi.sparkjob
fileDependencies: []
environment: minikube
serviceAccount: spark-spark
imageRegistry: localhost:5000
arguments:
  - "/mnt/data-in/"
  - "/mnt/data-out/"
volumes:
  - name: input-data
    hostPath:
      path: /input-data
  - name: output-data
    hostPath:
      path: /output-data
mounts:
  input-data: /mnt/data-in
  output-data: /mnt/data-out
driver:
  cores: 1
  memory: "2g"
executor:
  instances: 2
  cores: 1
  memory: "1g"
hadoopConf:
sparkConf:
hostNetwork: false
imagePullPolicy: Never
userId: 0

build.sbt

val sparkVersion = "3.0.2"

val sparkLibs = Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-streaming" % sparkVersion,
  "org.apache.spark" %% "spark-mllib" % sparkVersion
)

lazy val commonSettings = Seq(
  organization := "kaspi",
  scalaVersion := "2.12.13",
  version := "0.1",
  libraryDependencies ++= sparkLibs
)

val domain = "kaspi"

// for building FAT jar
lazy val assemblySettings = Seq(
  assembly / assemblyOption := (assemblyOption in assembly).value.copy(includeScala = false),
  assembly / assemblyOutputPath := baseDirectory.value / "output" / s"${domain}-${name.value}.jar"
)

val targetDockerJarPath = "/opt/spark/jars"
val baseRegistry = sys.props.getOrElse("baseRegistry", default = "localhost:5000")

// for building docker image
lazy val dockerSettings = Seq(
  imageNames in docker := Seq(
    ImageName(s"$domain/${name.value}:latest"),
    ImageName(s"$domain/${name.value}:${version.value}"),
  ),
  buildOptions in docker := BuildOptions(
    cache = false,
    removeIntermediateContainers = BuildOptions.Remove.Always,
    pullBaseImage = BuildOptions.Pull.Always
  ),
  dockerfile in docker := {
    // The assembly task generates a fat JAR file
    val artifact: File = assembly.value
    val artifactTargetPath = s"$targetDockerJarPath/$domain-${name.value}.jar"
    new Dockerfile {
      from(s"$baseRegistry/spark-runner:0.1")
    }.add(artifact, artifactTargetPath)
  }
)

// Include "provided" dependencies back to default run task
lazy val runLocalSettings = Seq(
  // https://stackoverflow.com/questions/18838944/how-to-add-provided-dependencies-back-to-run-test-tasks-classpath/21803413#21803413
  Compile / run := Defaults
    .runTask(
      fullClasspath in Compile,
      mainClass in (Compile, run),
      runner in (Compile, run)
    )
    .evaluated
)

lazy val root = (project in file("."))
  .enablePlugins(sbtdocker.DockerPlugin)
  .enablePlugins(AshScriptPlugin)
  .settings(
    commonSettings,
    assemblySettings,
    dockerSettings,
    runLocalSettings,
    name := "kaspi-sparkjob",
    Compile / mainClass := Some("kaspi.sparkjob"),
    Compile / resourceGenerators += createImporterHelmChart.taskValue
  )

// Task to create helm chart
lazy val createImporterHelmChart: Def.Initialize[Task[Seq[File]]] = Def.task {
  val chartFile = baseDirectory.value / "helm" / "Chart.yaml"
  val valuesFile = baseDirectory.value / "helm" / "values.yaml"

  val chartContents =
    s"""# Generated by build.sbt. Please don't manually update
       |apiVersion: v1
       |name: $domain-${name.value}
       |version: ${version.value}
       |appVersion: ${version.value}
       |description: ETL Job
       |home: https://github.com/jyyoo0530/kaspi
       |sources:
       |  - https://github.com/jyyoo0530/kaspi
       |maintainers:
       |  - name: Jeremy Yoo
       |    email: jyyoo0530@gmail.com
       |    url: https://www.linkedin.com/in/jeeyoungyoo
       |""".stripMargin

  val valuesContents =
    s"""# Generated by build.sbt. Please don't manually update
       |version: ${version.value}
       |sparkVersion: ${sparkVersion}
       |image: $domain/${name.value}:${version.value}
       |jar: local://$targetDockerJarPath/$domain-${name.value}.jar
       |mainClass: ${(Compile / run / mainClass).value.getOrElse("__MAIN_CLASS__")}
       |fileDependencies: []
       |environment: minikube
       |serviceAccount: spark-spark
       |imageRegistry: localhost:5000
       |arguments:
       |  - "/mnt/data-in/"
       |  - "/mnt/data-out/"
       |volumes:
       |  - name: input-data
       |    hostPath:
       |      path: /input-data
       |  - name: output-data
       |    hostPath:
       |      path: /output-data
       |mounts:
       |  input-data: /mnt/data-in
       |  output-data: /mnt/data-out
       |driver:
       |  cores: 1
       |  memory: "2g"
       |executor:
       |  instances: 2
       |  cores: 1
       |  memory: "1g"
       |hadoopConf:
       |sparkConf:
       |hostNetwork: false
       |imagePullPolicy: Never
       |userId: 0
       |""".stripMargin

  IO.write(chartFile, chartContents)
  IO.write(valuesFile, valuesContents)
  Seq(chartFile, valuesFile)
}

lazy val showVersion = taskKey[Unit]("Show version")
showVersion := {
  println((version).value)
}

assemblyMergeStrategy in assembly := {
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case x => MergeStrategy.first
}

******2021/2/25 UPDATES ********

I tried below yaml for test purpose, then volume from hostpath mounted successfully in Pod. There are no differences, but the object characteristic is different, one is "container", one is "driver","executor"...etc. (Same problem happened while using gaffer-hdfs which k8s object name is "namenode", "datanode"...etc). Can it be a problem using custom kubernetes object name?? But if it is still inherited container properties,,, there is no reason to not to be mounted. .... so.... still struggling..! :)

apiVersion: v1
kind: Pod
metadata:
  name: hostpath
  namespace: spark-apps
spec:
  containers:
    - name: nginx
      image: nginx
      volumeMounts:
        - name: volumepath
          mountPath: /mnt/data
  volumes:
    - name: volumepath
      hostPath:
        path: /input-data
        type: Directory
mck
  • 40,932
  • 13
  • 35
  • 50
유지영
  • 71
  • 2
  • ********2021/2/25 Updates2 ********* I pulled spark-operator to the local and modified to adapt apiextensions.k8s.io/v1beta1 -> apiextensions.k8s.io/v1. Nothing solved, problem remains same. – 유지영 Feb 25 '21 at 06:51

0 Answers0