This might be a long shot, but figured it couldn't hurt to ask. I'm attempting to use Elsevier's open-sourced spark-xml-utils
package in pyspark to transform some XML records with XSLT.
I've had a bit of success with some exploratory code getting a transformation to work:
# open XSLT processor from spark's jvm context
with open('/tmp/foo.xsl', 'r') as f:
proc = sc._jvm.com.elsevier.spark_xml_utils.xslt.XSLTProcessor.getInstance(f.read())
# transform XML record with 'proc'
with open('/tmp/bar.xml','r') as f:
transformed = proc.transform(f.read())
However, in a more realistic situation, I was unable to drop the proc.transform
into a lambda
map function, getting errors similar to:
"An error occurred while calling o55.getstate. Trace: py4j.Py4JException: Method getstate([]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) at py4j.Gateway.invoke(Gateway.java:272) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748)"
When I got the small example to work on a single record, that was operating in a pyspark shell, which I'm assuming was using the spark driver. But in the map function mentioned above, this was in Spark, via Livy and YARN, which introduces workers. This SO question/answer suggests that perhaps I cannot use the function from the jvm in that context.
Now, the spark-xml-utils
library provides some examples in scala, doing precisely what I'd like to do:
import com.elsevier.spark_xml_utils.xslt.XSLTProcessor
val xmlKeyPair = sc.sequenceFile[String, String]("s3n://spark-xml-utils/xml/part*")
val stylesheet = sc.textFile("s3n://spark-xml-utils/stylesheets/srctitle.xsl").collect.head
val srctitles = xmlKeyPair.mapPartitions(recsIter => {
val proc = XSLTProcessor.getInstance(stylesheet)
recsIter.map(rec => proc.transform(rec._2))
})
I'm wondering, how can I translate this to pyspark code, such that I could run it over an RDD? Ideally, on an RDD with the following input and output format:
id | document | other | columns
-----------------------------------------------------
sprog | <xml here...> | more | data
baz | <xml here...> | more | data
that could become
id | document | other | columns
-----------------------------------------------------
sprog | <*transformed* xml here...> | more | data
baz | <*transformed* xml here...> | more | data
Any help or suggestions would be most appreciated.
Update 8/28/2018: Also tried running through mapPartitions
, no dice. Same error of __getstate__()