2

In python, bytes string can be simply saved to single xml file:

with open('/home/user/file.xml' ,'wb') as f:
    f.write(b'<Value>1</Value>') 
   

Current output : /home/user/file.xml (file saved in local file)

Question: How to save string to xml file on hdfs in pyspark:

Expected output: 'hdfs://hostname:9000/file.xml'

Background: Large amount of xml files are provided by 3rd party web APIs. I build ETL pipeline in pyspark to delta lake. Data are extracted asynchronously by aiohttp, next I want to use spark-xml for transformation before saving spark data frame to delta lake(requires pyspark). I'm looking for most efficient way to buid the pipeline.

Similar question was asked spark-xml developers on github. https://github.com/databricks/spark-xml/issues/515

Latest research:

  1. spark-xml use as an input xml files directly stored as text on the disk or spark dataframe

  2. So I'm limited to use one of 2 options below:

a) some hdfs client(pyarrow,hdfs,aiohdfs) to save files to hdfs (text file on hdfs is not very efficient format)

b) load data to spark dataframe for spark-xml transformation(native format for delta lake)

If you have other ideas, please let me know.

Dan
  • 437
  • 7
  • 24
  • These seem to be unrelated questions. What issues are you having reading the files? – OneCricketeer Jan 15 '21 at 17:38
  • I have modified example to minimum code .Problem I have is that I don't know how to directly save bytestring to single hdfs file. I download from web api the xml files to byte string in python. I can save the file.xml to the local file system in python. I can not find the way how to save string b'1' directly to hdfs using pyspark, so at the moment I'm saving file first to local file system and after copy it from local fs to hdfs which is waste of resources. – Dan Jan 15 '21 at 18:10
  • You dont need Spark for that. You can use WebHDFS or other Python libraries for writing files. https://stackoverflow.com/questions/47926758/python-write-to-hdfs-file However, you still need the local file in some way... – OneCricketeer Jan 15 '21 at 18:34
  • thanks for your quick answer, I will test proposed libraries, I have large ETL ,where i'm downloading files from different apis in different formates to databricks delta lake in pyspark, I'm trying to keep as less libraries as possible so I'm trying to code it in pyspark directly, when all code is in pyspark. I use asyncio for downloading, so I'm also trying aiohdfs. – Dan Jan 15 '21 at 18:50

1 Answers1

0

Don't be misled by databricks spark-xml docs, as they lead to use uncompressed XML file as an input. This is very inefficient and much faster is to download XMLs directly to spark dataframe. Databricks xml-pyspark version doesn't include it but there is a workaround:

from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.types import _parse_datatype_json_string

def ext_from_xml(xml_column, schema, options={}):
    java_column = _to_java_column(xml_column.cast('string'))
    java_schema = spark._jsparkSession.parseDataType(schema.json())
    scala_map = spark._jvm.org.apache.spark.api.python.PythonUtils.toScalaMap(options)
    jc = spark._jvm.com.databricks.spark.xml.functions.from_xml(
        java_column, java_schema, scala_map)
    return Column(jc)

def ext_schema_of_xml_df(df, options={}):
    assert len(df.columns) == 1

    scala_options = spark._jvm.PythonUtils.toScalaMap(options)
    java_xml_module = getattr(getattr(
        spark._jvm.com.databricks.spark.xml, "package$"), "MODULE$")
    java_schema = java_xml_module.schema_of_xml_df(df._jdf, scala_options)
    return _parse_datatype_json_string(java_schema.json())

XMLs downloaded to list

xml = [('url',"""<Level_0 Id0="Id0_value_file1">
    <Level_1 Id1_1 ="Id3_value" Id_2="Id2_value">
      <Level_2_A>A</Level_2_A>
      <Level_2>
        <Level_3>
          <Level_4>
            <Date>2021-01-01</Date>
            <Value>4_1</Value>
          </Level_4>
          <Level_4>
            <Date>2021-01-02</Date>
            <Value>4_2</Value>
          </Level_4>
        </Level_3>
      </Level_2>
    </Level_1>
  </Level_0>"""),

  ('url',"""<Level_0 I"d0="Id0_value_file2">
    <Level_1 Id1_1 ="Id3_value" Id_2="Id2_value">
      <Level_2_A>A</Level_2_A>
      <Level_2>
        <Level_3>
          <Level_4>
            <Date>2021-01-01</Date>
            <Value>4_1</Value>
          </Level_4>
          <Level_4>
            <Date>2021-01-02</Date>
            <Value>4_2</Value>
          </Level_4>
        </Level_3>
      </Level_2>
    </Level_1>
  </Level_0>""")]

Spark data frame transformation of XML string:

#create df with XML strings  
 rdd = sc.parallelize(xml)
 df = spark.createDataFrame(rdd,"url string, content string")

# XML schema
 payloadSchema = ext_schema_of_xml_df(df.select("content"))

 # parse xml
 parsed = df.withColumn("parsed", ext_from_xml(df.content, payloadSchema, {"rowTag":"Level_0"}))

# select required data
  df2 = parsed.select(
    'parsed._Id0',
    F.explode_outer('parsed.Level_1.Level_2.Level_3.Level_4').alias('Level_4')
  ).select(
      '`parsed._Id0`',
      'Level_4.*'
  )

To decode bytes: b'string'.decode('utf-8')

@mck answer for more info about XMLs: How to transform to spark Data Frame data from multiple nested XML files with attributes

Dan
  • 437
  • 7
  • 24