I am using Spark 3.4 with Python 3.10. I am trying to convert following data in Python dictionary format into a Pyspark dataframe.
import pyspark
from pyspark.sql import SparkSession, Row, SQLContext
from pyspark.sql.types import *
spark = SparkSession.builder.appName('MyApp').getOrCreate()
sc = spark.sparkContext
raw_data = {'regiment': ['Nighthawks', 'Nighthawks', 'Nighthawks', 'Nighthawks', 'Dragoons', 'Dragoons', 'Dragoons', 'Dragoons', 'Scouts', 'Scouts', 'Scouts', 'Scouts'],
'company': ['1st', '1st', '2nd', '2nd', '1st', '1st', '2nd', '2nd','1st', '1st', '2nd', '2nd'],
'deaths': [523, 52, 25, 616, 43, 234, 523, 62, 62, 73, 37, 35],
'battles': [5, 42, 2, 2, 4, 7, 8, 3, 4, 7, 8, 9],
'size': [1045, 957, 1099, 1400, 1592, 1006, 987, 849, 973, 1005, 1099, 1523],
'veterans': [1, 5, 62, 26, 73, 37, 949, 48, 48, 435, 63, 345],
'readiness': [1, 2, 3, 3, 2, 1, 2, 3, 2, 1, 2, 3],
'armored': [1, 0, 1, 1, 0, 1, 0, 1, 0, 0, 1, 1],
'deserters': [4, 24, 31, 2, 3, 4, 24, 31, 2, 3, 2, 3],
'origin': ['Arizona', 'California', 'Texas', 'Florida', 'Maine', 'Iowa', 'Alaska', 'Washington', 'Oregon', 'Wyoming', 'Louisana', 'Georgia']}
I tried various approaches but none of them seem to be working.
Here is one of the approach I followed:
- Convert dictionary of values as lists into list of dictionaries
raw_data_size = len(raw_data['origin'])
raw_data_keys = raw_data.keys()
mapping_list=[]
mapping = {}
for i in range(raw_data_size):
for key in raw_data_keys:
mapping[key]=raw_data[key][i]
mapping_list.append(mapping)
mapping={}
Output of mapping_list[:1]
[{'regiment': 'Nighthawks',
'company': '1st',
'deaths': 523,
'battles': 5,
'size': 1045,
'veterans': 1,
'readiness': 1,
'armored': 1,
'deserters': 4,
'origin': 'Arizona'}]
- Create Schema
schema = StructType([
StructField('regiment', StringType(), True),
StructField('company', StringType(), True),
StructField('deaths', IntegerType(), True),
StructField('battles', IntegerType(), True),
StructField('size', IntegerType(), True),
StructField('veterans', IntegerType(), True),
StructField('readiness', IntegerType(), True),
StructField('armored', IntegerType(), True),
StructField('deserters', IntegerType(), True),
StructField('origin', StringType(), True)
])
- Create dataframe out of it. Getting error in this step
army=spark.read.schema(schema).json(mapping_list)
Error stack:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
Cell In[23], line 1
----> 1 army=spark.read.schema(schema).json(mapping_list)
File C:\software\programming\spark-3.4.0-bin-hadoop3\python\pyspark\sql\readwriter.py:418, in DataFrameReader.json(self, path, schema, primitivesAsString, prefersDecimal, allowComments, allowUnquotedFieldNames, allowSingleQuotes, allowNumericLeadingZero, allowBackslashEscapingAnyCharacter, mode, columnNameOfCorruptRecord, dateFormat, timestampFormat, multiLine, allowUnquotedControlChars, lineSep, samplingRatio, dropFieldIfAllNull, encoding, locale, pathGlobFilter, recursiveFileLookup, modifiedBefore, modifiedAfter, allowNonNumericNumbers)
416 if type(path) == list:
417 assert self._spark._sc._jvm is not None
--> 418 return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))
419 elif isinstance(path, RDD):
421 def func(iterator: Iterable) -> Iterable:
File C:\ProgramData\anaconda3\lib\site-packages\py4j\java_gateway.py:1321, in JavaMember.__call__(self, *args)
1315 command = proto.CALL_COMMAND_NAME +\
1316 self.command_header +\
1317 args_command +\
1318 proto.END_COMMAND_PART
1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
1322 answer, self.gateway_client, self.target_id, self.name)
1324 for temp_arg in temp_args:
1325 temp_arg._detach()
File C:\software\programming\spark-3.4.0-bin-hadoop3\python\pyspark\errors\exceptions\captured.py:169, in capture_sql_exception.<locals>.deco(*a, **kw)
167 def deco(*a: Any, **kw: Any) -> Any:
168 try:
--> 169 return f(*a, **kw)
170 except Py4JJavaError as e:
171 converted = convert_exception(e.java_exception)
File C:\ProgramData\anaconda3\lib\site-packages\py4j\protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332 format(target_id, ".", name, value))
Py4JJavaError: An error occurred while calling o99.json.
: java.lang.ClassCastException: class java.util.HashMap cannot be cast to class java.lang.String (java.util.HashMap and java.lang.String are in module java.base of loader 'bootstrap')
at scala.collection.immutable.List.map(List.scala:293)
at org.apache.spark.sql.execution.datasources.DataSource$.checkAndGlobPathIfNecessary(DataSource.scala:722)
at org.apache.spark.sql.execution.datasources.DataSource.checkAndGlobPathIfNecessary(DataSource.scala:551)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:404)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:362)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:833)
Please let me know what could I be missing or doing wrong and how can I ensure it works smoothly.
Thanks in advance.
Some of the other approaches I tried are:
spark.createDataFrame(mapping_list).show()
Py4JJavaError: An error occurred while calling o174.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent failure: Lost task 0.0 in stage 5.0 (TID 27) (host.docker.internal executor driver): java.io.IOException: Cannot run program "C:\ProgramData\anaconda3": CreateProcess error=5, Access is denied
spark.createDataFrame(Row(**x) for x in mapping_list).show()
Py4JJavaError: An error occurred while calling o153.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 26) (host.docker.internal executor driver): java.io.IOException: Cannot run program "C:\ProgramData\anaconda3": CreateProcess error=5, Access is denied
- I also tried converting it into pandas dataframe and loading it but that didn't work either.