0

I have received a task to complete. i have a a Json file which has 2000+ records. So the requirement is :

  • Read from CSV
  • Form a JSON payload (need to cover suitable amount of fields, consider order header fields)
  • Push to Kinesis (ensure each row has its own correct key)
  • Retrieve the same from Kinesis.

My Json file look like this :

{"Affliate Number": "350", "Bonus Period": "12003", "Business Entity": "350", "Distributor Number": "00000971728", "Payment Amount": "00000000000393.45", "BANK ID": "SBC", "Account": "0000007659007", "Payment Date": "2020-04-15", "Payment Group": "90", "Payment Method": "02", "": ""}
{"Affliate Number": "350", "Bonus Period": "12003", "Business Entity": "350", "Distributor Number": "00000829264", "Payment Amount": "00000000000211.20", "BANK ID": "SBC", "Account": "0515096412533", "Payment Date": "2020-04-15", "Payment Group": "90", "Payment Method": "02", "": ""}
{"Affliate Number": "350", "Bonus Period": "12003", "Business Entity": "350", "Distributor Number": "00001070013", "Payment Amount": "00000000000329.72", "BANK ID": "BCOM", "Account": "017200075595", "Payment Date": "2020-04-15", "Payment Group": "90", "Payment Method": "02", "": ""}

My producer code looks like this:

import boto3
import json
import csv
from datetime import datetime
import calendar
import time
import random


# Reading CSV and saving as json file

csvFilePath="062019.csv"
jsonFilePath="output.json"

data=[]

with open (csvFilePath) as csvFile:
    csvReader=csv.DictReader(csvFile)
    with open(jsonFilePath,"w") as jsonfile:

        for csvRow in csvReader:
            jsonfile.write(json.dumps(csvRow)+"\n")


print(data)

# putting data to Kinesis

my_stream_name='ApacItTeamTstOrderStream'

kinesis_client=boto3.client('kinesis',region_name='us-east-1')


with open('output.json', 'r') as file:
    for line in file:
        put_response=kinesis_client.put_record(
            StreamName=my_stream_name,
            Data=line,
            PartitionKey=str(random.randrange(100)))
    
        print(put_response)

CONSUMER CODE like this:

import boto3
import json
from datetime import datetime
import time

my_stream_name='ApacItTeamTstOrderStream'

kinesis_client=boto3.client('kinesis',region_name='us-east-1')

response=kinesis_client.describe_stream(StreamName=my_stream_name)

my_shard_id=response['StreamDescription']['Shards'][0]['ShardId']

shard_iterator=kinesis_client.get_shard_iterator(

                            StreamName=my_stream_name,
                            ShardId=my_shard_id,
                            ShardIteratorType='LATEST')

my_shard_iterator=shard_iterator['ShardIterator']

record_response=kinesis_client.get_records(ShardIterator=my_shard_iterator,Limit=2)

print(record_response)

while 'NextShardIterator' in record_response:
        record_response=kinesis_client.get_records(ShardIterator=record_response['NextShardIterator'],Limit=2)

    if record_response['Records']:
        print(record_response)

but I am getting results for a single records. Can someone please help me with:

  1. Need to use PUT Records
  2. Need to get producer data line by line (now partition key used as (PartitionKey=str(random.randrange(100)))
  3. When I run consumer, I should get output as all records.

I have received help from @john Rotenstein, thank you so much and please help me to get the exact way I needed the results.

John Rotenstein
  • 241,921
  • 22
  • 380
  • 470
binu456m
  • 61
  • 11
  • Where is the app obtaining the data? Or is that part working okay and you just want to get `put_to_stream()` working? Is it generatng any errors at the moment? Did you write this code? If so, can you explain the `while` section? It seems to be doing an infinite loop of sending the same data. – John Rotenstein Jun 22 '20 at 08:14
  • i was using the code for testing purpose, since i was very new to kinesis, wanted to check how 'while'' is working. But now i need help to read the above json file to publish to kinesis. any help? – binu456m Jun 22 '20 at 08:19
  • Where is the JSON file — is it simply a text file on disk? If so, your first step would be to write some code that opens the file, then loops through each line in the file. Start by simply printing the line to the screen. Once you have that working, add the `put_record()` code that you have. If you have problems, feel free to edit your Question to show the updated code and let us know what problem you're having. – John Rotenstein Jun 22 '20 at 08:22
  • the Json file is on my desk...Let me try the way you suggested and will let you know with results – binu456m Jun 22 '20 at 08:26
  • i wrote a code like this data={} abc=open("output.json",'r') test_file=json.load(abc) for test in test_file: print(test) print(data) – binu456m Jun 22 '20 at 08:52
  • Where are the "requirements" coming from? Is this an actual business need, or are you doing a course and this is your homework? – John Rotenstein Jun 23 '20 at 02:57
  • Side-comment: There is no reason to create a JSON file. You could simply read the CSV file and send that to Kinesis. – John Rotenstein Jun 23 '20 at 02:58
  • this is for a course need. i am stuck over 2 days. i have submitted the codes yesterday and asking me, 1) why getting only one result 2. I have been asked to use put records, so why didnt i use it. Since i am very new to kinesis, i dont know what to do or how to change the code. – binu456m Jun 23 '20 at 03:00
  • and i am not aware of reading it directly from CSV. i always convert to JSON and then push to kinesis – binu456m Jun 23 '20 at 03:01
  • Ah! So this is for your assignment. You're welcome to ask questions, but people typically won't write code for you here on StackOverflow. Your code (above) first reads a CSV file then outputs it to a JSON file, then reads the JSON file and sends it to Kinesis. There is no need for this. The loop that sends to Kinesis could simply read from the CSV, convert to JSON and send it to Kinesis without needing an "in between" file. If your code is not "getting" multiple records, it is either because Kinesis only has one record, or your code to retrieve and process the records is not correct. – John Rotenstein Jun 23 '20 at 03:07
  • Understood. Thank you so much. But still am confused why am i asked to mention "Put records". how its differ from Put record? – binu456m Jun 23 '20 at 03:13
  • Read the documentation! `put_record()` sends one record, `put_records()` sends multiple records (as per the name). – John Rotenstein Jun 23 '20 at 03:15
  • Yes i got it.. so here when i mention put records i am getting error. Also how this is working? PartitionKey=str(random.randrange(100))) is this selectiong records randomly for 100 lines? because the file has 2000+ records. so bit confused – binu456m Jun 23 '20 at 03:22
  • Please see the link in my answer that explains the purpose of the PartitionKey. Basically, you don't want them all set to the same value, so the random function keeps them sufficiently different. You are welcome to make it a larger range, but it won't really matter, as long as there are a few different values. If you are receiving an error with `put_records()`, then you'll need to debug why the error is happening. – John Rotenstein Jun 23 '20 at 05:27
  • Okay understood. But why am getting only one record when i run consumer? because the json has 2000+ lines. i am rechking consumer file, but not getting where its gng wrong – binu456m Jun 23 '20 at 05:35
  • I face one more issue now.. when i ran the Producer, i could see the output show empty list. Yesterday i could able to see the records, but now am not able to see it – binu456m Jun 23 '20 at 05:59

1 Answers1

1

Your code would need to look something like this:

import boto3
import json
import random

my_stream_name='ApacItTeamTstOrderStream'

kinesis_client=boto3.client('kinesis',region_name='us-east-1')

with open('foo.json', 'r') as file:
    for line in file:
        put_response=kinesis_client.put_record(
            StreamName=my_stream_name,
            Data=line,
            PartitionKey=str(random.randrange(100)))

If you do not wish to store the whole line in Kinesis, then you'll need to extract the desired records similar to the code in your Question.

Note that I put a random number in the PartitionKey. This is common practice if records do not need to be consumed in a particular way. If, however, records for a given field need to be consumed by the same consumer, then put that field in the PartitionKey. (If that doesn't make sense, see: What is partition key in AWS Kinesis all about?)

John Rotenstein
  • 241,921
  • 22
  • 380
  • 470
  • i ran the command but got the error message like this :Exception has occurred: ParamValidationError Parameter validation failed: Missing required parameter in input: "Records" Unknown parameter in input: "Data", must be one of: Records, StreamName Unknown parameter in input: "PartitionKey", must be one of: Records, StreamName – binu456m Jun 22 '20 at 11:12
  • Are you sure you are using the above code? It sounds like you are using `put_records()` rather than `put_record()`. Oh, and `PartitionKey` needs to be a string, so I made a small update above. – John Rotenstein Jun 22 '20 at 12:08
  • Thanks John. I could see its running. but shows empty list. [] {'ShardId': 'shardId-000000000000', 'SequenceNumber': '49603116058855582341585161097865252438075008731773403138', 'ResponseMetadata': {'RequestId': 'dc173846-6fc2-a0ed-8b7d-9bdafb9cc987', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'dc173846-6fc2-a0ed-8b7d-9bdafb9cc987', 'x-amz-id-2': 'Y3yX5Vh9JqCpt5Yza0ef7XdDI9XnTF8cscmsdKpSm19k/wkJL9w7DXt/xcG35N+WlwxAjy5yAgH5DiUTZaKpRrCBIuM3lA6P', 'date': 'Mon, 22 Jun 2020 12:17:32 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '110'}, 'RetryAttempts': 0}} – binu456m Jun 22 '20 at 12:18
  • That `print(put_response)` was just there from your original code. I have removed it. The records should be in Kinesis, so your next task is to have a process consume the records from Kinesis. – John Rotenstein Jun 22 '20 at 12:21
  • Thanks John, i am getting results when i ran consumer file. But in the producer file i have used put record instead of 'records's so how to change that? when i wrote "put records", i got error. – binu456m Jun 23 '20 at 02:09
  • Sorry, but I do not understand. Do you wish to send multiple records in the one `put` command? Or are you fine with sending one record at a time (as shown in the code above)? – John Rotenstein Jun 23 '20 at 02:39
  • I have been asked to mention "Put records instead of put record". the requirement i received is : Read from CSV, Form a JSON payload (need to cover suitable amount of fields, consider order header fields), Push to Kinesis (ensure each row has its own correct key), Retrieve the same from Kinesis. – binu456m Jun 23 '20 at 02:49
  • Your question has now changed a lot from your original question. Unfortunately, it has become confusing for other people to read and has been marked as closed. I suggest you identify a **single question** that you would like answered, and create a new question with the details. Try to avoid simply providing lots of code, because people won't read all of it. Rather, you should identify the code that relates to the specific question you wish to ask (eg "Why is my `get_records()` only receiving one record?"). See: [How do I ask a good question?](http://stackoverflow.com/help/how-to-ask) – John Rotenstein Jun 23 '20 at 02:56