How can we achieve using Apache Pig :
File :
A 2014/10/01
A 2014/09/01
A 2014/08/01
A 2014/02/01
Result should A count 3, since i want to count the number of records using rolling window of 30 days between records group by A.
How can we achieve using Apache Pig :
File :
A 2014/10/01
A 2014/09/01
A 2014/08/01
A 2014/02/01
Result should A count 3, since i want to count the number of records using rolling window of 30 days between records group by A.
Please find the solution, i hope you can do further enhancement if it required. Try to execute with your input and let me know how it works.
input.txt
A 2014/12/01
A 2014/11/01
A 2014/10/01
A 2014/07/01
A 2014/05/01
A 2014/04/01
B 2014/09/01
B 2014/07/01
B 2014/06/01
B 2014/02/01
C 2014/09/01
C 2014/07/01
C 2014/05/01
Expected output
A 5
B 2
C 0
PigScript:
REGISTER rollingCount.jar;
A = LOAD 'input.txt' Using PigStorage(' ') AS (f1:chararray,f2:chararray);
B = GROUP A BY f1;
C = FOREACH B GENERATE mypackage.ROLLINGCOUNT(BagToString($1)) AS rollingCnt;
DUMP C;
OutPut from the Script:
(A,5)
(B,2)
(C,0)
Java Code:
1. Compile the below java code and create jar file name rollingCount.jar
2. I just wrote the code temporarily, you can optimize if required.
ROLLINGCOUNT.java
package mypackage;
import java.io.*;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
import java.text.SimpleDateFormat;
import java.util.concurrent.TimeUnit;
import java.util.*;
public class ROLLINGCOUNT extends EvalFunc<Integer> {
public Integer exec(Tuple input) throws IOException {
//Get the input String from request
String inputString = (String)input.get(0);
Date[] arrayOfDates = getArrayOfDate(inputString);
long diffDays[] = getDaysBetweenList(arrayOfDates);
int rollingCount = getRollingCount(diffDays);
return rollingCount;
}
//Function to convert strings to array of dates
static protected Date[] getArrayOfDate(String inputString)
{
//Get the 1st column, this will be the Id
String ID = inputString.split("_")[0];
//Replace all the Ids with Null, bcoz its a duplicate columns
String modifiedString = inputString.replace(ID+"_","");
//Split the string into multiple columns using '_' as delimiter
String list[] = modifiedString.split("_");
//Convert the string to list of array dates
Date[] dateList = new Date[list.length];
int index=0;
for (String dateString: list)
{
try
{
//Convert the date string to date object in the give format
SimpleDateFormat dFormat = new SimpleDateFormat("yyyy/MM/dd");
dateList[index++] = dFormat.parse(dateString);
}
catch(Exception e)
{
// error handling goes here
}
}
return dateList;
}
//Function to get difference between two dates
static protected long[] getDaysBetweenList(Date[] arrayOfDate)
{
long diffDays[] = new long[arrayOfDate.length-1];
int cnt=0;
for (int index=0; index<arrayOfDate.length-1;index++)
{
long diff = Math.abs(arrayOfDate[index+1].getTime() - arrayOfDate[index].getTime());
long days = TimeUnit.DAYS.convert(diff, TimeUnit.MILLISECONDS);
diffDays[cnt++] = days;
}
return diffDays;
}
//Function to get the total rolling count
static protected int getRollingCount(long diffDays[])
{
int result =0;
for(int index=0;index<diffDays.length;index++)
{
int cnt =0;
//hardcoded the values of 30 and 31 days, may need to handle Feb month 28 or 29 days
while((index<diffDays.length)&&((diffDays[index]==30)||(diffDays[index]==31)))
{
cnt++;
index++;
}
if(cnt>0)
{
result = result + cnt+1;
}
}
return result;
}
}