0

I am new to Scala as well as Spark ML. Trying to create a string matching algorithm based on recommendation from PySpark String Matching. Based on it I was able to implement below so far

import org.apache.spark.ml.Pipeline
import org.apache.spark.sql._
import org.apache.spark.ml.feature.{HashingTF, MinHashLSH, NGram, RegexTokenizer}

import spark.implicits._

val vendorData = spark.read.option("header", "true").option("inferSchema", "true").json(path = "Data/*.json").as[vendorData]

// Load IMDB file into an Dataset
val imdbData = spark.read.option("header", "True").option("inferSchema", "True").option("sep", "\t").csv(path = "Data/title.basics.tsv").as[imdbData]

// Remove Special chaacters
val newVendorData = vendorData.withColumn("newtitle", functions.regexp_replace(vendorData.col("title"), "[^A-Za-z0-9_]",""))
val newImdbData = imdbData.withColumn("newprimaryTitle", functions.regexp_replace(imdbData.col("primaryTitle"), "[^A-Za-z0-9_]", ""))

//Algo to find match percentage
val tokenizer = new RegexTokenizer().setPattern("").setInputCol("text").setMinTokenLength(1).setOutputCol("tokens")
val ngram = new NGram().setN(3).setInputCol("tokens").setOutputCol("ngrams")
val vectorizer = new HashingTF().setInputCol("ngrams").setOutputCol("vectors")
val lsh = new MinHashLSH().setInputCol("vectors").setOutputCol("lsh")
val pipeline = new Pipeline().setStages(Array(tokenizer, ngram, vectorizer, lsh))
val model = pipeline.fit(newVendorData.select("newtitle"))
val vendorHashed = model.transform(newVendorData.select("newtitle"))
val imdbHashed = model.transform(newImdbData.select("newprimaryTitle"))
model.stages.last.asInstanceOf[ml.feature.MinHashLSHModel].approxSimilarityJoin(vendorHashed, imdbHashed, .85).show()

When running I am getting below error. On Further investigation I could find that the issue is at line no:

val model = pipeline.fit(newVendorData.select("newtitle"))

But can't see what it is.

Exception in thread "main" java.lang.IllegalArgumentException: text does not exist. Available: newtitle
at org.apache.spark.sql.types.StructType.$anonfun$apply$1(StructType.scala:278)
at scala.collection.immutable.Map$Map1.getOrElse(Map.scala:168)
at org.apache.spark.sql.types.StructType.apply(StructType.scala:277)
at org.apache.spark.ml.UnaryTransformer.transformSchema(Transformer.scala:109)
at org.apache.spark.ml.Pipeline.$anonfun$transformSchema$4(Pipeline.scala:184)
at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:198)
at org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:184)
at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:74)
at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:136)
at MatchingJob$.$anonfun$main$1(MatchingJob.scala:84)
at MatchingJob$.$anonfun$main$1$adapted(MatchingJob.scala:43)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at MatchingJob$.main(MatchingJob.scala:43)
at MatchingJob.main(MatchingJob.scala)

Not sure what is wrong I am doing.

My inputs are below:

+------------------+
|          newtitle|
+------------------+
|  BhaagMilkhaBhaag|
|            Fukrey|
| DilTohBacchaHaiJi|
|IndiasJungleHeroes|
|     HrudayaGeethe|

**newprimaryTitle**
BhaagMilkhaBhaag
Fukrey
Carmencita
Leclownetseschiens
PauvrePierrot
Unbonbock
BlacksmithScene
ChineseOpiumDen
DilTohBacchaHaiJi
IndiasJungleHeroes
CorbettandCourtne
EdisonKinetoscopi
MissJerry
LeavingtheFactory
AkrobatischesPotp
TheArrivalofaTrain
ThePhotographical
TheWatererWatered
Autourdunecabine
Barquesortantduport
ItalienischerBaue
DasboxendeKnguruh
TheClownBarber
TheDerby1895
blackbishop
  • 30,945
  • 11
  • 55
  • 76
Saurabh
  • 65
  • 1
  • 8
  • 1
    `val tokenizer = new RegexTokenizer().setPattern("").setInputCol("text")` should be `val tokenizer = new RegexTokenizer().setPattern("").setInputCol("newtitle")` – mck Feb 21 '21 at 19:11
  • @mck Thanks. that was foolish mistake to make. Although correcting it resulted in somethin still unknown `Exception in thread "main" org.apache.spark.sql.AnalysisException: You're using untyped Scala UDF, which does not have the input type information. Spark may blindly pass null to the Scala closure with primitive-type argument, and the closure will see the default value of the Java type for the null argument, e.g. `udf((x: Int) => x, IntegerType)`, the result is 0 for null input. To get rid of this error, you could:` – Saurabh Feb 21 '21 at 19:29
  • `1. use typed Scala UDF APIs(without return type parameter), e.g. `udf((x: Int) => x)` 2. use Java UDF APIs, e.g. `udf(new UDF1[String, Integer] { override def call(s: String): Integer = s.length() }, IntegerType)`, if input types are all non primitive 3. set spark.sql.legacy.allowUntypedScalaUDF to true and use this API with caution;` – Saurabh Feb 21 '21 at 19:34
  • 1
    try `spark.sql("set spark.sql.legacy.allowUntypedScalaUDF = true")`. I'm not sure why this popped up. You're not using any udf. – mck Feb 21 '21 at 19:35
  • @mck Thanks again. adding above command before creating model resolved last issue. I will try to deal with remaining problems – Saurabh Feb 21 '21 at 22:05

0 Answers0