33

I am writing record to Kinesis Firehose stream that is eventually written to a S3 file by Amazon Kinesis Firehose.

My record object looks like

ItemPurchase {
    String personId,
    String itemId
}

The data is written to S3 looks like:

{"personId":"p-111","itemId":"i-111"}{"personId":"p-222","itemId":"i-222"}{"personId":"p-333","itemId":"i-333"}

NO COMMA SEPERATION.

NO STARTING BRACKET as in a Json Array

[

NO ENDING BRACKET as in a Json Array

]

I want to read this data get a list of ItemPurchase objects.

List<ItemPurchase> purchases = getPurchasesFromS3(IOUtils.toString(s3ObjectContent))

What is the correct way to read this data?

John Rotenstein
  • 241,921
  • 22
  • 380
  • 470
learner_21
  • 593
  • 1
  • 5
  • 11
  • Kinesis delivers the data in this odd format so that it can be queried via Athena [link](https://stackoverflow.com/questions/52682426/amazon-athena-cant-read-s3-json-object-files-and-athena-select-query-returns-em). It is strange why they have fixed this kind of format at the first place. – sjain24 Jan 11 '23 at 09:27

13 Answers13

26

It boggles my mind that Amazon Firehose dumps JSON messages to S3 in this manner, and doesn't allow you to set a delimiter or anything.

Ultimately, the trick I found to deal with the problem was to process the text file using the JSON raw_decode method

This will allow you to read a bunch of concatenated JSON records without any delimiters between them.

Python code:

import json

decoder = json.JSONDecoder()

with open('giant_kinesis_s3_text_file_with_concatenated_json_blobs.txt', 'r') as content_file:

    content = content_file.read()

    content_length = len(content)
    decode_index = 0

    while decode_index < content_length:
        try:
            obj, decode_index = decoder.raw_decode(content, decode_index)
            print("File index:", decode_index)
            print(obj)
        except JSONDecodeError as e:
            print("JSONDecodeError:", e)
            # Scan forward and keep trying to decode
            decode_index += 1
Tom Chapin
  • 3,276
  • 1
  • 29
  • 18
6

I also had the same problem, here is how I solved.

  1. replace "}{" with "}\n{"
  2. line split by "\n".

    input_json_rdd.map(lambda x : re.sub("}{", "}\n{", x, flags=re.UNICODE))
                  .flatMap(lambda line: line.split("\n"))
    

A nested json object has several "}"s, so split line by "}" doesn't solve the problem.

Xuehua Jiang
  • 83
  • 1
  • 6
  • 4
    I considered doing something like this, but I think that if one of the strings inside the JSON object happens to include a }{ then this technique will break. Maybe if you go through each character, toggle a boolean if you hit a " (to indicate either entering or leaving a string), count the levels of objects you are in (increment on seeing { outside a string, decrement on seeing } outside a string), then consider the end of the object as when your level counter hits 0 again. – Krenair Mar 09 '18 at 16:14
  • 1
    The separator `}{` is problematic because inner strings can have jsons inside them like this: `}{\"` (with escaped quotes) , so using `}{"` as a separator is a bit better because inner strings can't have quotes – Eran H. Feb 11 '21 at 08:13
  • To build on Eran's answer, I used a negative lookahead to account for the case when `}{` occurs at the end of a string: `re.sub('}{"(?![,}])', '}\n{"', string)` – twhyte Jun 03 '21 at 14:11
3

I've had the same issue.

It would have been better if AWS allowed us to set a delimiter but we can do it on our own.

In my use case, I've been listening on a stream of tweets, and once receiving a new tweet I immediately put it to Firehose.

This, of course, resulted in a 1-line file which could not be parsed.

So, to solve this, I have concatenated the tweet's JSON with a \n. This, in turn, let me use some packages that can output lines when reading stream contents, and parse the file easily.

Hope this helps you.

johni
  • 5,342
  • 6
  • 42
  • 70
3

I think the best ways to tackle this is to first create a properly formatted json file containing well separated json objects within them. In my case I added ',' to the events which was pushed into the firehose. Then After a file is saved in s3, all the files will contain json object separated by some delimitter(comma- in our case). Another thing that must be added are '[' and ']' at the beginning and end of the file. Then you have a proper json file containing multiple json objects. Parsing them will be possible now.

  • This works for JSON but not for more complex markup such as XML. If every record is an XML document, they would need to be parsed and root elements wrapped into a new XML document and some kind enclosing element (I've used ``). I'm currently trying to figure out how to read from S3 this way. – Martynas Jusevičius Aug 12 '19 at 13:16
  • This won't work if you have multiple producers sending valid json arrays to Firehose. – alextsil Jun 07 '22 at 07:34
3

If the input source for the firehose is an Analytics application, this concatenated JSON without a delimiter is a known issue as cited here. You should have a lambda function as here that outputs JSON objects in multiple lines.

3

I used a transformation Lambda to add a line break at the end of every record

def lambda_handler(event, context):
    output = []

    for record in event['records']:

        # Decode from base64 (Firehose records are base64 encoded)
        payload = base64.b64decode(record['data'])

        # Read json as utf-8    
        json_string = payload.decode("utf-8")

        # Add a line break
        output_json_with_line_break = json_string + "\n"

        # Encode the data
        encoded_bytes = base64.b64encode(bytearray(output_json_with_line_break, 'utf-8'))
        encoded_string = str(encoded_bytes, 'utf-8')

        # Create a deep copy of the record and append to output with transformed data
        output_record = copy.deepcopy(record)
        output_record['data'] = encoded_string
        output_record['result'] = 'Ok'

        output.append(output_record)

    print('Successfully processed {} records.'.format(len(event['records'])))

    return {'records': output}
2

Use this simple Python code.

input_str = '''{"personId":"p-111","itemId":"i-111"}{"personId":"p-222","itemId":"i-222"}{"personId":"p-333","itemId":"i-333"}'''

data_str = "[{}]".format(input_str.replace("}{","},{"))
data_json = json.loads(data_str)

And then (if you want) convert to Pandas.

import pandas as pd   
df = pd.DataFrame().from_records(data_json)
print(df)

And this is result

itemId personId
0  i-111    p-111
1  i-222    p-222
2  i-333    p-333
y0j0
  • 3,369
  • 5
  • 31
  • 52
1

If there's a way to change the way data is written, please separate all the records by a line. That way you can read the data simply, line by line. If not, then simply build a scanner object which takes "}" as a delimiter and use the scanner to read. That would do the job.

psychorama
  • 323
  • 5
  • 17
1

You can find the each valid JSON by counting the brackets. Assuming the file starts with a { this python snippet should work:

import json

def read_block(stream):
    open_brackets = 0
    block = ''
    while True:
        c = stream.read(1)
        if not c:
            break

        if c == '{':
            open_brackets += 1
        elif c == '}':
            open_brackets -= 1

        block += c

        if open_brackets == 0:
            yield block
            block = ''


if __name__ == "__main__":
    c = 0
    with open('firehose_json_blob', 'r') as f:
        for block in read_block(f):
            record = json.loads(block)
            print(record)
Rafael Barbosa
  • 1,120
  • 12
  • 17
  • 2
    Warning: This is just a blind stream reader, so it will break if any of the JSON blobs contain strings that happen to have escaped brackets in them. – Tom Chapin Jul 26 '19 at 23:06
1

This problem can be solved with a JSON parser that consumes objects one at a time from a stream. The raw_decode method of the JSONDecoder exposes just such a parser, but I've written a library that makes it straightforward to do this with a one-liner.

from firehose_sipper import sip

for entry in sip(bucket=..., key=...):
    do_something_with(entry)

I've added some more details in this blog post

Bob Gregory
  • 186
  • 3
0

In Spark, we had the same problem. We're using the following:

from pyspark.sql.functions import *

@udf
def concatenated_json_to_array(text):
  final = "["
  separator = ""
  
  for part in text.split("}{"):
    final += separator + part
    separator = "}{" if re.search(r':\s*"([^"]|(\\"))*$', final) else "},{"
      
  return final + "]"


def read_concatenated_json(path, schema):
  return (spark.read
          .option("lineSep", None)
          .text(path)
          .withColumn("value", concatenated_json_to_array("value"))
          .withColumn("value", from_json("value", schema))
          .withColumn("value", explode("value"))
          .select("value.*"))  

It works as follows:

  1. Read the data as one string per file (no delimiters!)
  2. Use a UDF to introduce the JSON array and split the JSON objects by introducing a comma. Note: be careful not to break any strings with }{ in them!
  3. Parse the JSON with a schema into DataFrame fields.
  4. Explode the array into separate rows
  5. Expand the value object into column.

Use it like this:

from pyspark.sql.types import *

schema = ArrayType(
  StructType([
    StructField("type", StringType(), True),
    StructField("value", StructType([
      StructField("id", IntegerType(), True),
      StructField("joke", StringType(), True),
      StructField("categories", ArrayType(StringType()), True)  
    ]), True)
  ])
)

path = '/mnt/my_bucket_name/messages/*/*/*/*/'
df = read_concatenated_json(path, schema)

I've written more details and considerations here: Parsing JSON data from S3 (Kinesis) with Spark. Do not just split by }{, as it can mess up your string data! For example: { "line": "a\"r}{t" }.

Kees C. Bakker
  • 32,294
  • 27
  • 115
  • 203
0

You can use below script.

If streamed data size is not over buffer size that you set, each file of s3 have one pair of brackets([]) and comma.

import base64

print('Loading function')


def lambda_handler(event, context):
    output = []

    for record in event['records']:
        print(record['recordId'])
        payload = base64.b64decode(record['data']).decode('utf-8')+',\n'

        # Do custom processing on the payload here

        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(payload.encode('utf-8'))
        }
        output.append(output_record)

    last = len(event['records'])-1
    print('Successfully processed {} records.'.format(len(event['records'])))
    
    start = '['+base64.b64decode(output[0]['data']).decode('utf-8')
    end = base64.b64decode(output[last]['data']).decode('utf-8')+']'
    
    output[0]['data'] = base64.b64encode(start.encode('utf-8'))
    output[last]['data'] = base64.b64encode(end.encode('utf-8'))
    return {'records': output}

Emma
  • 1
0

Using JavaScript Regex.

JSON.parse(`[${item.replace(/}\s*{/g, '},{')}]`);
Jha Nitesh
  • 188
  • 1
  • 11