I am trying to get unique value pairs from multiple columns of [1,2,3]. The size of data is very large with multiple files (the total size about 1TB).
I just want to filter the lines with "client" string and grep the columns of [1,2,3] that are unique in each file.
I have used tuple and Distinct()
function first, but the process stops with Java memory error.
if __name__ == "__main__":
sc=SparkContext(appName="someapp")
cmd = 'hdfs dfs -ls /user/path'.split()
files = subprocess.check_output(cmd).strip().split('\n')
rdds=[]
for ff in files[1:]:
rdd=sc.textFile(ff.split()[-1])
rdd2=rdd.filter(lambda x: "client" in x.lower())
rdd3=rdd2.map(lambda x: tuple(x.split("\t")[y] for y in [1,2,3]))
rdd4=rdd3.distinct()
rdds.append(rdd4)
rdd0=sc.union(rdds)
rdd0.collect()
rdd0.saveAsTextFile('/somedir')
So I have tried another script using reduceByKey()
method, which works fine.
if __name__ == "__main__":
sc=SparkContext(appName="someapp")
cmd = "hdfs dfs -ls airties/eventU".split()
files = subprocess.check_output(cmd).strip().split('\n')
rdds=[]
for ff in files[1:]:
rdd=sc.textFile(ff.split()[-1])
rdd2=rdd.filter(lambda x: "client" in x.lower())
rdd3=rdd2.map(lambda x: ','.join([x.split("\t")[y] for y in [1,2,3]]))
rdds.append(rdd3)
rdd0=sc.union(rdds)
rddA=rdd0.map(lambda x: (x,1)).reduceByKey(lambda a,b: a+b)
rddA.collect()
rddA.saveAsTextFile('/somedir')
But I try to understand why Distinct()
does not work well, but reduceByKey()
method works. Is distinct()
not a proper way to find unique values?
Also trying to find is there a better way to optimize the processing of multiple files finding unique value pairs in each file and aggregate them. When each files contain exclusive contents, then I just need to apply unique to each file and aggregate altogether in the final step. But it seems my current code creates too much overhead to the system.
The data is like this: Lots of redundancy
+-----+---+------+----------+
|1 |2 |3 |4 |
+-----+---+------+----------+
| 1| 1| A|2017-01-01|
| 2| 6|client|2017-01-02|
| 2| 3| B|2017-01-02|
| 3| 5| A|2017-01-03|
| 3| 5|client|2017-01-03|
| 2| 2|client|2017-01-02|
| 3| 5| A|2017-01-03|
| 1| 3| B|2017-01-02|
| 3| 5|client|2017-01-03|
| 3| 5|client|2017-01-04|
+-----+---+------+----------+
The data is like this: Lots of redundancy
+-----+---+------+
|1 |2 |3 |
+-----+---+------+
| 2| 6|client|
| 3| 5|client|
| 2| 2|client|
| 3| 5|client|
| 3| 5|client|
+-----+---+------+
Column 3 is redundant, but just use this as an example scenario.