0

I am using Spark 3.4 with Python 3.10. I am trying to convert following data in Python dictionary format into a Pyspark dataframe.

import pyspark
from pyspark.sql import SparkSession, Row, SQLContext
from pyspark.sql.types import *

spark = SparkSession.builder.appName('MyApp').getOrCreate()
sc = spark.sparkContext

raw_data = {'regiment': ['Nighthawks', 'Nighthawks', 'Nighthawks', 'Nighthawks', 'Dragoons', 'Dragoons', 'Dragoons', 'Dragoons', 'Scouts', 'Scouts', 'Scouts', 'Scouts'],
            'company': ['1st', '1st', '2nd', '2nd', '1st', '1st', '2nd', '2nd','1st', '1st', '2nd', '2nd'],
            'deaths': [523, 52, 25, 616, 43, 234, 523, 62, 62, 73, 37, 35],
            'battles': [5, 42, 2, 2, 4, 7, 8, 3, 4, 7, 8, 9],
            'size': [1045, 957, 1099, 1400, 1592, 1006, 987, 849, 973, 1005, 1099, 1523],
            'veterans': [1, 5, 62, 26, 73, 37, 949, 48, 48, 435, 63, 345],
            'readiness': [1, 2, 3, 3, 2, 1, 2, 3, 2, 1, 2, 3],
            'armored': [1, 0, 1, 1, 0, 1, 0, 1, 0, 0, 1, 1],
            'deserters': [4, 24, 31, 2, 3, 4, 24, 31, 2, 3, 2, 3],
            'origin': ['Arizona', 'California', 'Texas', 'Florida', 'Maine', 'Iowa', 'Alaska', 'Washington', 'Oregon', 'Wyoming', 'Louisana', 'Georgia']}

I tried various approaches but none of them seem to be working.

Here is one of the approach I followed:

  1. Convert dictionary of values as lists into list of dictionaries
raw_data_size = len(raw_data['origin'])
raw_data_keys = raw_data.keys()
mapping_list=[]
mapping = {}
for i in range(raw_data_size):
    for key in raw_data_keys:
        mapping[key]=raw_data[key][i]
    mapping_list.append(mapping)
    mapping={}

Output of mapping_list[:1]

[{'regiment': 'Nighthawks',
  'company': '1st',
  'deaths': 523,
  'battles': 5,
  'size': 1045,
  'veterans': 1,
  'readiness': 1,
  'armored': 1,
  'deserters': 4,
  'origin': 'Arizona'}]
  1. Create Schema
schema = StructType([
    StructField('regiment', StringType(), True),
    StructField('company', StringType(), True),
    StructField('deaths', IntegerType(), True),
    StructField('battles', IntegerType(), True),
    StructField('size', IntegerType(), True),
    StructField('veterans', IntegerType(), True),
    StructField('readiness', IntegerType(), True),
    StructField('armored', IntegerType(), True),
    StructField('deserters', IntegerType(), True),
    StructField('origin', StringType(), True)
])
  1. Create dataframe out of it. Getting error in this step
army=spark.read.schema(schema).json(mapping_list)

Error stack:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Cell In[23], line 1
----> 1 army=spark.read.schema(schema).json(mapping_list)

File C:\software\programming\spark-3.4.0-bin-hadoop3\python\pyspark\sql\readwriter.py:418, in DataFrameReader.json(self, path, schema, primitivesAsString, prefersDecimal, allowComments, allowUnquotedFieldNames, allowSingleQuotes, allowNumericLeadingZero, allowBackslashEscapingAnyCharacter, mode, columnNameOfCorruptRecord, dateFormat, timestampFormat, multiLine, allowUnquotedControlChars, lineSep, samplingRatio, dropFieldIfAllNull, encoding, locale, pathGlobFilter, recursiveFileLookup, modifiedBefore, modifiedAfter, allowNonNumericNumbers)
    416 if type(path) == list:
    417     assert self._spark._sc._jvm is not None
--> 418     return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    419 elif isinstance(path, RDD):
    421     def func(iterator: Iterable) -> Iterable:

File C:\ProgramData\anaconda3\lib\site-packages\py4j\java_gateway.py:1321, in JavaMember.__call__(self, *args)
   1315 command = proto.CALL_COMMAND_NAME +\
   1316     self.command_header +\
   1317     args_command +\
   1318     proto.END_COMMAND_PART
   1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
   1322     answer, self.gateway_client, self.target_id, self.name)
   1324 for temp_arg in temp_args:
   1325     temp_arg._detach()

File C:\software\programming\spark-3.4.0-bin-hadoop3\python\pyspark\errors\exceptions\captured.py:169, in capture_sql_exception.<locals>.deco(*a, **kw)
    167 def deco(*a: Any, **kw: Any) -> Any:
    168     try:
--> 169         return f(*a, **kw)
    170     except Py4JJavaError as e:
    171         converted = convert_exception(e.java_exception)

File C:\ProgramData\anaconda3\lib\site-packages\py4j\protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o99.json.
: java.lang.ClassCastException: class java.util.HashMap cannot be cast to class java.lang.String (java.util.HashMap and java.lang.String are in module java.base of loader 'bootstrap')
    at scala.collection.immutable.List.map(List.scala:293)
    at org.apache.spark.sql.execution.datasources.DataSource$.checkAndGlobPathIfNecessary(DataSource.scala:722)
    at org.apache.spark.sql.execution.datasources.DataSource.checkAndGlobPathIfNecessary(DataSource.scala:551)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:404)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
    at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:362)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:833)

Please let me know what could I be missing or doing wrong and how can I ensure it works smoothly.

Thanks in advance.

Some of the other approaches I tried are:

  1. spark.createDataFrame(mapping_list).show()
Py4JJavaError: An error occurred while calling o174.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent failure: Lost task 0.0 in stage 5.0 (TID 27) (host.docker.internal executor driver): java.io.IOException: Cannot run program "C:\ProgramData\anaconda3": CreateProcess error=5, Access is denied
  1. spark.createDataFrame(Row(**x) for x in mapping_list).show()
Py4JJavaError: An error occurred while calling o153.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 26) (host.docker.internal executor driver): java.io.IOException: Cannot run program "C:\ProgramData\anaconda3": CreateProcess error=5, Access is denied
  1. I also tried converting it into pandas dataframe and loading it but that didn't work either.
AJ22
  • 1
  • 1

1 Answers1

2

You were on the good track by defining your schema! One of the ways you can do this, is by making your input data (your dictionary) in a format that the spark.createDataFrame method can use.

Let's reuse your code

import pyspark
from pyspark.sql.types import *

schema = StructType([
    StructField('regiment', StringType(), True),
    StructField('company', StringType(), True),
    StructField('deaths', IntegerType(), True),
    StructField('battles', IntegerType(), True),
    StructField('size', IntegerType(), True),
    StructField('veterans', IntegerType(), True),
    StructField('readiness', IntegerType(), True),
    StructField('armored', IntegerType(), True),
    StructField('deserters', IntegerType(), True),
    StructField('origin', StringType(), True)
])

raw_data = {'regiment': ['Nighthawks', 'Nighthawks', 'Nighthawks', 'Nighthawks', 'Dragoons', 'Dragoons', 'Dragoons', 'Dragoons', 'Scouts', 'Scouts', 'Scouts', 'Scouts'],
            'company': ['1st', '1st', '2nd', '2nd', '1st', '1st', '2nd', '2nd','1st', '1st', '2nd', '2nd'],
            'deaths': [523, 52, 25, 616, 43, 234, 523, 62, 62, 73, 37, 35],
            'battles': [5, 42, 2, 2, 4, 7, 8, 3, 4, 7, 8, 9],
            'size': [1045, 957, 1099, 1400, 1592, 1006, 987, 849, 973, 1005, 1099, 1523],
            'veterans': [1, 5, 62, 26, 73, 37, 949, 48, 48, 435, 63, 345],
            'readiness': [1, 2, 3, 3, 2, 1, 2, 3, 2, 1, 2, 3],
            'armored': [1, 0, 1, 1, 0, 1, 0, 1, 0, 0, 1, 1],
            'deserters': [4, 24, 31, 2, 3, 4, 24, 31, 2, 3, 2, 3],
            'origin': ['Arizona', 'California', 'Texas', 'Florida', 'Maine', 'Iowa', 'Alaska', 'Washington', 'Oregon', 'Wyoming', 'Louisana', 'Georgia']}

By turning your raw_data (which is a dictionary of lists) into a list of dictionaries, you can then use it in the createDataFrame method:

df = spark.createDataFrame([dict(zip(raw_data, col)) for col in zip(*raw_data.values())])

df.show()
+-------+-------+-------+------+---------+----------+---------+----------+----+--------+
|armored|battles|company|deaths|deserters|    origin|readiness|  regiment|size|veterans|
+-------+-------+-------+------+---------+----------+---------+----------+----+--------+
|      1|      5|    1st|   523|        4|   Arizona|        1|Nighthawks|1045|       1|
|      0|     42|    1st|    52|       24|California|        2|Nighthawks| 957|       5|
|      1|      2|    2nd|    25|       31|     Texas|        3|Nighthawks|1099|      62|
|      1|      2|    2nd|   616|        2|   Florida|        3|Nighthawks|1400|      26|
|      0|      4|    1st|    43|        3|     Maine|        2|  Dragoons|1592|      73|
|      1|      7|    1st|   234|        4|      Iowa|        1|  Dragoons|1006|      37|
|      0|      8|    2nd|   523|       24|    Alaska|        2|  Dragoons| 987|     949|
|      1|      3|    2nd|    62|       31|Washington|        3|  Dragoons| 849|      48|
|      0|      4|    1st|    62|        2|    Oregon|        2|    Scouts| 973|      48|
|      0|      7|    1st|    73|        3|   Wyoming|        1|    Scouts|1005|     435|
|      1|      8|    2nd|    37|        2|  Louisana|        2|    Scouts|1099|      63|
|      1|      9|    2nd|    35|        3|   Georgia|        3|    Scouts|1523|     345|
+-------+-------+-------+------+---------+----------+---------+----------+----+--------+

EDIT: Ahh damn I didn't completely look at the stack trace in your question, and just now noticed a fundamental piece in one of your tryouts.

Cannot run program "C:\ProgramData\anaconda3": CreateProcess error=5, Access is denied

This is not a Spark problem (that first "other approach" you put down here seems like it should work). If you look at this SO post, it seems like something might be wrong with your C:\ProgramData\anaconda3 executable. Try and change your permissions on that executable like one of the answers from that SO post :)

Koedlt
  • 4,286
  • 8
  • 15
  • 33
  • I tried following this approach but I am still getting an error ( Py4JJavaError: An error occurred while calling o86.showString. ). Could this be a bug/installation issue? Its really weird because data frames created from external csv's are working fine. – AJ22 Aug 07 '23 at 17:07
  • I've added an edit to my answer. Seems like I didn't fully read all of your stack traces, sorry. I hope you get it fixed now! – Koedlt Aug 07 '23 at 19:33
  • Thanks for redirecting me! It was a Pyspark issue. To be specific I followed https://stackoverflow.com/questions/60414394/createprocess-error-5-access-is-denied-pyspark – AJ22 Aug 11 '23 at 06:48
  • Happy to help you! – Koedlt Aug 11 '23 at 07:05