I am using Spark ML_pipelines
to easily deploy operations that I have developed in Sparklyr
in a production environment using SCALA
. It is working pretty well, except for one part: it seems that when I read a table from Hive
and then create a pipeline that applies operations to this table the pipeline will also save the table reading operation and thereby the name of the table. However I want the pipeline to be independent of this.
Here is a reproducible example:
Sparklyr
part:
sc = spark2_context(memory = "4G")
iris <- copy_to(sc, iris, overwrite=TRUE)
spark_write_table(iris, "base.iris")
spark_write_table(iris, "base.iris2")
df1 <- tbl(sc, "base.iris")
df2 <- df1 %>%
mutate(foo = 5)
pipeline <- ml_pipeline(sc) %>%
ft_dplyr_transformer(df2) %>%
ml_fit(df1)
ml_save(pipeline,
paste0(save_pipeline_path, "test_pipeline_reading_from_table"),
overwrite = TRUE)
df2 <- pipeline %>% ml_transform(df1)
dbSendQuery(sc, "drop table base.iris")
SCALA
part:
import org.apache.spark.ml.PipelineModel
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
val df1 = spark.sql("select * from base.iris2")
val pipeline = PipelineModel.load(pipeline_path + "/test_pipeline_reading_from_table")
val df2 = pipeline.transform(df1)
I get this error:
org.apache.spark.sql.AnalysisException: Table or view not found: `base`.`iris`; line 2 pos 5;
'Project ['Sepal_Length, 'Sepal_Width, 'Petal_Length, 'Petal_Width, 'Species, 5.0 AS foo#110]
+- 'UnresolvedRelation `base`.`iris`
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:82)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:78)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:78)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:67)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:637)
at org.apache.spark.ml.feature.SQLTransformer.transformSchema(SQLTransformer.scala:86)
at org.apache.spark.ml.PipelineModel$$anonfun$transformSchema$5.apply(Pipeline.scala:310)
at org.apache.spark.ml.PipelineModel$$anonfun$transformSchema$5.apply(Pipeline.scala:310)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:186)
at org.apache.spark.ml.PipelineModel.transformSchema(Pipeline.scala:310)
at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:74)
at org.apache.spark.ml.PipelineModel.transform(Pipeline.scala:304)
... 71 elided
I can see 2 solutions:
It seems that persisting
dataframe
would be a solution, but then I would need to find a way not to overload my memory, hence my question on unpersistingPassing the name of the table in Hive as a parameter of the pipeline, which I am trying to solve in this question
Now, all of this being said, I might be missing something as I am only a beginner...
EDIT: this is significantly different from this question as this concerns the specific problem of integrating a dataframe that was just read in a pipeline, as specified in the title.
EDIT: as for my project, persisting the tables after I read them is a viable solution. I don't know if there is any better solution.