3

I am trying to bring in JIRA data into Foundry using an external API. When it comes in via Magritte, the data gets stored in AVRO and there is a column called response. The response column has data that looks like this...


[{"id":"customfield_5","name":"test","custom":true,"orderable":true,"navigable":true,"searchable":true,"clauseNames":["cf[5]","test"],"schema":{"type":"user","custom":"com.atlassian.jira.plugin.system.customfieldtypes:userpicker","customId":5}},{"id":"customfield_2","name":"test2","custom":true,"orderable":true,"navigable":true,"searchable":true,"clauseNames":["test2","cf[2]"],"schema":{"type":"option","custom":"com.atlassian.jira.plugin.system.customfieldtypes:select","customId":2}}]

Due to the fact that this imports as AVRO, the documentation that talks about how to convert this data that's in Foundry doesn't work. How can I convert this data into individual columns and rows?

Here is the code that I've attempted to use:

from transforms.api import transform_df, Input, Output
from pyspark import SparkContext as sc
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf
import json
import pyspark.sql.types as T


@transform_df(
    Output("json output"),
    json_raw=Input("json input"),
)
def my_compute_function(json_raw, ctx):

    sqlContext = SQLContext(sc)

    source = json_raw.select('response').collect()  # noqa

    # Read the list into data frame
    df = sqlContext.read.json(sc.parallelize(source))

    json_schema = T.StructType([
        T.StructField("id", T.StringType(), False),
        T.StructField("name", T.StringType(), False),
        T.StructField("custom", T.StringType(), False),
        T.StructField("orderable", T.StringType(), False),
        T.StructField("navigable", T.StringType(), False),
        T.StructField("searchable", T.StringType(), False),
        T.StructField("clauseNames", T.StringType(), False),
        T.StructField("schema", T.StringType(), False)
    ])

    udf_parse_json = udf(lambda str: parse_json(str), json_schema)

    df_new = df.select(udf_parse_json(df.response).alias("response"))

    return df_new


# Function to convert JSON array string to a list
def parse_json(array_str):
    json_obj = json.loads(array_str)
    for item in json_obj:
        yield (item["a"], item["b"])
fmsf
  • 36,317
  • 49
  • 147
  • 195
Robert F
  • 187
  • 5

2 Answers2

2

Parsing Json in a string column to a struct column (and then into separate columns) can be easily done using the F.from_json function.

In your case, you need to do:

df = df.withColumn("response_parsed", F.from_json("response", json_schema))

Then you can do this or similar to get the contents into different columns:

df = df.select("response_parsed.*")

However, this won't work as your schema is incorrect, you actually have a list of json structs in each row, not just 1, so you need a T.ArrayType(your_schema) wrapping around the whole thing, you'll also need to do an F.explode before selecting, to get each array element in its own row.

An additional useful function is F.get_json_object, which allows you to get json one json object from a json string.

Using a UDF like you've done could work, but UDFs are generally much less performant than native spark functions.

Additionally, all the AVRO file format does in this case is to merge multiple json files into one big file, with each file in its own row, so the example under "Rest API Plugin" - "Processing JSON in Foundry" should work as long as you skip the 'put this schema on the raw dataset' step.

ollie299792458
  • 222
  • 3
  • 16
  • Thanks for the quick response. When I run the code you provided, I get the following in the response parsed field: {"id":"null","name":"null","custom":"null","orderable":"null","navigable":"null","searchable":"null","clauseNames":"null","schema":"null"} – Robert F Aug 27 '21 at 15:45
  • That is because your schema is wrong, you need to add an ArrayType bit, see my updated answer – ollie299792458 Aug 27 '21 at 19:25
  • Thanks for the response. I had to use the from_json as you suggested and then explode. My schema for the from_json was simple T.ArrayType(T.StringType()). From here I had to call each json object individually using a withColumn and F.get_json_object. – Robert F Aug 31 '21 at 20:33
  • That also works, however if the schema is consistent across json objects calling from_json with T.ArrayType(T.StructType(rest of schema)) should work. – ollie299792458 Sep 01 '21 at 00:18
0

I used the magritte-rest connector to walk through the paged results from search:

type: rest-source-adapter2
restCalls:
  - type: magritte-paging-inc-param-call
    method: GET
    path: search
    paramToIncrease: startAt
    increaseBy: 50
    initValue: 0
    parameters:
      startAt: '{%startAt%}'
    extractor:
      - type: json
        assign:
          issues: /issues
        allowNull: true
    condition:
      type: magritte-rest-non-empty-condition
      var: issues
    maxIterationsAllowed: 4096
cacheToDisk: false
oneFilePerResponse: false

That yielded a dataset that looked like this:

raw JIRA issues dataset

Once I had that, this parsed expanded and parsed the returned JSON issues into a properly-typed DataFrame with fields holding the inner structure of the issue as a (very complex) struct:

import json
from pyspark.sql import Row
from pyspark.sql.functions import explode

def issues_enumerated(All_Issues_Paged):

    def generate_issue_row(input_row: Row) -> Row:
        """
        Generates a dataframe of each responses issue array as a single array record per-Row
        """
        d = input_row.asDict()
        resp_json = d['response']
        resp_obj = json.loads(resp_json)
        issues = list(map(json.dumps,resp_obj['issues']))

        return Row(issues=issues)
    
    # array-per-record
    unexploded_df = All_Issues_Paged.rdd.map(generate_issue_row).toDF()
    # row-per-record
    row_per_record_df = unexploded_df.select(explode(unexploded_df.issues))
    # raw JSON string per-record RDD
    issue_json_strings_rdd = row_per_record_df.rdd.map(lambda _: _.col)
    # JSON object dataframe
    issues_df = spark.read.json(issue_json_strings_rdd)
    issues_df.printSchema()
    return issues_df
fmsf
  • 36,317
  • 49
  • 147
  • 195
Ari Gesher
  • 603
  • 4
  • 6