1

I am doing a small POC in AWS. I trying to read a csv file from S3 bucket and need to insert those csv file data into dynamodb. Everything is going fine but while iterating csv file data in the first row itself it is getting terminated.

I have created one table i.e vehicledata inside dynamodb with one primary key i.e veh_price_id.

Everything is going fine till "I am inside while loop" after that getting issue .

Coding

public class LambdaFunctionHandler implements RequestHandler<S3Event, Report> {

Region AWS_REGION = Region.getRegion(Regions.US_EAST_1);
String DYNAMO_TABLE_NAME = "vehicledata";

public Report handleRequest(S3Event s3event, Context context) {
long startTime = System.currentTimeMillis();
Report statusReport = new Report();
LambdaLogger logger = context.getLogger();


Helper helper = new Helper();

try {

    S3EventNotificationRecord record = s3event.getRecords().get(0);
    String srcBucket = record.getS3().getBucket().getName();
    String srcKey = record.getS3().getObject().getKey().replace('+', ' ');

    srcKey = URLDecoder.decode(srcKey, "UTF-8");
    AmazonS3 s3Client = new AmazonS3Client();
    S3Object s3Object = s3Client.getObject(new GetObjectRequest(srcBucket, srcKey));
    logger.log("I am inside lambda function8");
    statusReport.setFileSize(s3Object.getObjectMetadata().getContentLength());
    logger.log("I am inside lambda function9");

    logger.log("S3 Event Received: " + srcBucket + "/" + srcKey);
    logger.log("I am inside lambda function10");

  BufferedReader br = new BufferedReader(new InputStreamReader(s3Object.getObjectContent())); 

        logger.log("I am inside lambda function13");
        CSVReader reader = new CSVReader(br);
        logger.log("I am inside lambda function14");

        AmazonDynamoDB dynamoDBClient = new AmazonDynamoDBClient();

        dynamoDBClient.setRegion(AWS_REGION);
        DynamoDB dynamoDB = new DynamoDB(dynamoDBClient);
        logger.log("I am inside DynamoDB");
        TableWriteItems energyDataTableWriteItems = new TableWriteItems(DYNAMO_TABLE_NAME);
        logger.log("I am inside DynamoDB-Table");
        List<Item> itemList = new ArrayList<Item>();

        String[] nextLine;
        while ((nextLine = reader.readNext()) != null) {
            logger.log("I am inside while loop");
            Item newItem = helper.parseIt(nextLine);

            itemList.add(newItem);
        }

        for (List<Item> partition : Lists.partition(itemList, 25)) {
            energyDataTableWriteItems.withItemsToPut(partition);
            BatchWriteItemOutcome outcome = dynamoDB.batchWriteItem(energyDataTableWriteItems);
            logger.log("I am inside for loop");
            do {                

                Map<String, List<WriteRequest>> unprocessedItems = outcome.getUnprocessedItems();

                if (outcome.getUnprocessedItems().size() > 0) {
                    logger.log("Retrieving the unprocessed " + String.valueOf(outcome.getUnprocessedItems().size())
                            + " items.");
                    outcome = dynamoDB.batchWriteItemUnprocessed(unprocessedItems);
                }

            } while (outcome.getUnprocessedItems().size() > 0);

        }

        logger.log("Load finish in " + String.valueOf(System.currentTimeMillis() - startTime) + "ms");

        reader.close();
        br.close();
        //isr.close();
        //gis.close();
        s3Object.close();

        statusReport.setStatus(true);
    } catch (Exception ex) {
        logger.log(ex.getMessage());
    }

    statusReport.setExecutiongTime(System.currentTimeMillis() - startTime);
    return statusReport;
  }

 }

Helper.java

public class Helper {

/**
 * Parses the it.
 *
 * @param nextLine the next line
 * @return the item
 * @throws ParseException the parse exception
 */
public Item parseIt(String[] nextLine) throws ParseException {
    Item newItem = new Item();
    String Id = nextLine[0];

    DateFormat df = new SimpleDateFormat("yyyy-MM-dd");

    // Specify your time zone
    df.setTimeZone(TimeZone.getTimeZone("GMT+8:00")); 

    Date parsedDate = df.parse(nextLine[1]);

    // Convert ms to seconds
    Long dateTime = parsedDate.getTime() / 1000; 

    newItem.withPrimaryKey("Id", Id, "Date", dateTime);
    return newItem;
   }
 }

Inside the CloudWatch console the output looks like enter image description here

My csv file data looks like

enter image description here

Can anyone tell me where I doing mistake ? How to solve this issue ?

Binay Kumar
  • 279
  • 6
  • 23
  • 2
    It's a lot of code, so I haven't read it (see [here](https://stackoverflow.com/help/mcve))... but are you by chance trying to parse the first row, whose values are all non-date strings? – yshavit May 02 '18 at 14:22
  • 1
    +1 to yshavit. You error code is saying that "vehicle_id" is not a date. Clearly, you are reading the wrong line + the wrong column. I suppose that vehicle_id is a string, not a date. I think you need to refactor your code. It is some **heavy** shit to transform a CSV to JSON. Use a supported library instead. For node: [CSV to JSON](https://stackoverflow.com/questions/16831250/how-to-convert-csv-to-json-in-node-js) – Clément Duveau May 02 '18 at 14:31

0 Answers0