I have received a task to complete. i have a a Json file which has 2000+ records. So the requirement is :
- Read from CSV
- Form a JSON payload (need to cover suitable amount of fields, consider order header fields)
- Push to Kinesis (ensure each row has its own correct key)
- Retrieve the same from Kinesis.
My Json file look like this :
{"Affliate Number": "350", "Bonus Period": "12003", "Business Entity": "350", "Distributor Number": "00000971728", "Payment Amount": "00000000000393.45", "BANK ID": "SBC", "Account": "0000007659007", "Payment Date": "2020-04-15", "Payment Group": "90", "Payment Method": "02", "": ""}
{"Affliate Number": "350", "Bonus Period": "12003", "Business Entity": "350", "Distributor Number": "00000829264", "Payment Amount": "00000000000211.20", "BANK ID": "SBC", "Account": "0515096412533", "Payment Date": "2020-04-15", "Payment Group": "90", "Payment Method": "02", "": ""}
{"Affliate Number": "350", "Bonus Period": "12003", "Business Entity": "350", "Distributor Number": "00001070013", "Payment Amount": "00000000000329.72", "BANK ID": "BCOM", "Account": "017200075595", "Payment Date": "2020-04-15", "Payment Group": "90", "Payment Method": "02", "": ""}
My producer code looks like this:
import boto3
import json
import csv
from datetime import datetime
import calendar
import time
import random
# Reading CSV and saving as json file
csvFilePath="062019.csv"
jsonFilePath="output.json"
data=[]
with open (csvFilePath) as csvFile:
csvReader=csv.DictReader(csvFile)
with open(jsonFilePath,"w") as jsonfile:
for csvRow in csvReader:
jsonfile.write(json.dumps(csvRow)+"\n")
print(data)
# putting data to Kinesis
my_stream_name='ApacItTeamTstOrderStream'
kinesis_client=boto3.client('kinesis',region_name='us-east-1')
with open('output.json', 'r') as file:
for line in file:
put_response=kinesis_client.put_record(
StreamName=my_stream_name,
Data=line,
PartitionKey=str(random.randrange(100)))
print(put_response)
CONSUMER CODE like this:
import boto3
import json
from datetime import datetime
import time
my_stream_name='ApacItTeamTstOrderStream'
kinesis_client=boto3.client('kinesis',region_name='us-east-1')
response=kinesis_client.describe_stream(StreamName=my_stream_name)
my_shard_id=response['StreamDescription']['Shards'][0]['ShardId']
shard_iterator=kinesis_client.get_shard_iterator(
StreamName=my_stream_name,
ShardId=my_shard_id,
ShardIteratorType='LATEST')
my_shard_iterator=shard_iterator['ShardIterator']
record_response=kinesis_client.get_records(ShardIterator=my_shard_iterator,Limit=2)
print(record_response)
while 'NextShardIterator' in record_response:
record_response=kinesis_client.get_records(ShardIterator=record_response['NextShardIterator'],Limit=2)
if record_response['Records']:
print(record_response)
but I am getting results for a single records. Can someone please help me with:
- Need to use PUT Records
- Need to get producer data line by line (now partition key used as (
PartitionKey=str(random.randrange(100))
) - When I run consumer, I should get output as all records.
I have received help from @john Rotenstein, thank you so much and please help me to get the exact way I needed the results.