6

How to upload the data from csv to aws kinesis using boto3

I have tried three methods and it is all working for me.

  1. To upload the data from csv to kinesis in chunks.
  2. Upload the random generated data from local to kinesis.
  3. Upload the csv data row by row from local to kinesis using boto3

Moreover how to consume data from kinesis to python sdk

Saurabh Kukreti
  • 119
  • 1
  • 6

4 Answers4

3

Method1 Chunk by chunk

import csv
import json
import boto3
from random import randint
def chunkit(l, n):
"""Yield successive n-sized chunks from l."""
 for i in range(0, len(l), n):
    yield l[i:i + n]

kinesis = boto3.client("kinesis")
with open("flights.csv") as f:
#Creating the ordered Dict
 reader = csv.DictReader(f)
#putting the json as per the number of chunk we will give in below function 
#Create the list of json and push like a chunk. I am sending 100 rows together
 records = chunkit([{"PartitionKey": 'sau', "Data": json.dumps(row)} for row in reader], 100)
for chunk in records:
    kinesis.put_records(StreamName="Flight-Simulator", Records=chunk)
Community
  • 1
  • 1
Saurabh Kukreti
  • 119
  • 1
  • 6
1
Method2- Random generated JSON to Kinesis
#Generating the random number of record and sendint to Kinesis data stream

import boto3
import json
from datetime import datetime
import calendar
import random
import time

my_stream_name = 'Flight-Simulator'

kinesis_client = boto3.client('kinesis', region_name='us-east-1')

def put_to_stream(thing_id, property_value, property_timestamp):
payload = {
            'prop': str(property_value),
            'timestamp': str(property_timestamp),
            'thing_id': thing_id
          }

print(payload)

put_response = kinesis_client.put_record(
                    StreamName=my_stream_name,
                    Data=json.dumps(payload),
                    PartitionKey=thing_id)

while True:
    property_value = random.randint(40, 120)
    property_timestamp = calendar.timegm(datetime.utcnow().timetuple())
    thing_id = 'aa-bb'

    put_to_stream(thing_id, property_value, property_timestamp)

    # wait for 5 second
    time.sleep(5)
Saurabh Kukreti
  • 119
  • 1
  • 6
1
Method3 - Row by row from csv to Kinesis
#Sending the data from CSV to Kinesis data stream row by row
my_stream_name = 'Flight-Simulator'
thing_id ='XYZ'
kinesis_client = boto3.client('kinesis', region_name='us-east-1')

with open("flights_Test.csv") as f:
#Creating the ordered Dict
    reader = csv.DictReader(f)
    for row in reader:
        put_response = kinesis_client.put_record(
                StreamName=my_stream_name,
                Data=json.dumps(row),
                PartitionKey=thing_id)
Saurabh Kukreti
  • 119
  • 1
  • 6
1
# Consumer SDK using python3
import boto3
import json
from datetime import datetime
import time

my_stream_name = 'Flight-Simulator'

kinesis_client = boto3.client('kinesis', region_name='us-east-1')

#Get the description of kinesis shard, it is json from which we will get the the 
shard ID
response = kinesis_client.describe_stream(StreamName=my_stream_name)
my_shard_id = response['StreamDescription']['Shards'][0]['ShardId']


shard_iterator = kinesis_client.get_shard_iterator(StreamName=my_stream_name,
                                                  ShardId=my_shard_id,
                                                  ShardIteratorType='LATEST')

my_shard_iterator = shard_iterator['ShardIterator']

record_response = kinesis_client.get_records(ShardIterator=my_shard_iterator,
                                          Limit=2)

while 'NextShardIterator' in record_response:
    record_response = 
kinesis_client.get_records(ShardIterator=record_response['NextShardIterator'],
                                              Limit=2)
    if len(record_response['Records'])>0:
        print(json.loads(record_response['Records'][0]['Data']))

    time.sleep(5)
Saurabh Kukreti
  • 119
  • 1
  • 6