You need to set multiline to true, for multi line json, refer to this answer
Here's the code to read your json and transform it into multiple columns:
df = spark.read.option("inferSchema", "true").option("multiline", "true").json("file.json")
df.withColumn("tables", explode(col("tables"))).select("tables.*")\
.withColumn("rows", explode(col("rows"))).withColumn("rows", explode(col("rows"))).withColumn("columns", explode(col("columns")))\
.select("columns.*", "name", "rows").show()
Result:
+-------------+--------+-------------+---------------+
| name| type| name| rows|
+-------------+--------+-------------+---------------+
| TenantId| string|PrimaryResult| 1341234|
|TimeGenerated|datetime|PrimaryResult| 1341234|
|CorrelationId| string|PrimaryResult| 1341234|
| UserName| string|PrimaryResult| 1341234|
| TenantId| string|PrimaryResult| 5143534|
|TimeGenerated|datetime|PrimaryResult| 5143534|
|CorrelationId| string|PrimaryResult| 5143534|
| UserName| string|PrimaryResult| 5143534|
| TenantId| string|PrimaryResult| 14314123|
|TimeGenerated|datetime|PrimaryResult| 14314123|
|CorrelationId| string|PrimaryResult| 14314123|
| UserName| string|PrimaryResult| 14314123|
| TenantId| string|PrimaryResult|test@test.cloud|
|TimeGenerated|datetime|PrimaryResult|test@test.cloud|
|CorrelationId| string|PrimaryResult|test@test.cloud|
| UserName| string|PrimaryResult|test@test.cloud|
+-------------+--------+-------------+---------------+
UPDATE:
If you want to pivot your data, you can just call .pivot(col), note that I applied .first() when pivoting, which is random, if you want to call a specific value than you have to adjust your agg function:
df = spark.read.option("inferSchema", "true").option("multiline", "true").json("file.json")
df = df.withColumn("tables", explode(col("tables"))).select("tables.*") \
.withColumn("rows", explode(col("rows"))).withColumn("rows", explode(col("rows"))).drop("name")\
.withColumn("columns", explode(col("columns"))).select("columns.*", "rows").orderBy("name")
df.groupBy(lit(1)).pivot("name").agg(first(col("rows"))).drop("1").show()
Result:
+-------------+--------+-------------+--------+
|CorrelationId|TenantId|TimeGenerated|UserName|
+-------------+--------+-------------+--------+
| 1341234| 1341234| 1341234| 1341234|
+-------------+--------+-------------+--------+
UPDATE 2:
If you want to associate a value from an array column to the same index value from another array column, you can use arrays_zip function:
df = spark.read.option("inferSchema", "true").option("multiline", "true").json("file.json")
df = df.withColumn("tables", explode(col("tables"))).select("tables.*").withColumn("rows", explode(col("rows")))\
.withColumn("tmp", explode(arrays_zip("columns", "rows"))).select("tmp.columns.name", "tmp.rows")
df.groupBy(lit(1)).pivot("name").agg(first(col("rows"))).drop("1").show()
Result:
+-------------+--------+-------------+---------------+
|CorrelationId|TenantId|TimeGenerated| UserName|
+-------------+--------+-------------+---------------+
| 14314123| 1341234| 5143534|test@test.cloud|
+-------------+--------+-------------+---------------+