1

How do I remove multiple occurrences of row based on SessionId in apache beam java skd. I have tried with Distinct as well as Deduplicate but that takes entire row based and removes.

import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.values.PCollection;

public class DistinctExample {
    
    public static void main(String[] s) {
        Pipeline p = Pipeline.create();
        PCollection<String> input = p.apply(TextIO.read().from("C:\\ApacheBeam\\Beam\\distinct\\user.csv"));
        PCollection<String> unique = input.apply(Distinct.<String>create());
        unique.apply(TextIO.write().to("C:\\ApacheBeam\\Beam\\distinct\\dis_user.csv").withNumShards(1).withSuffix(".csv"));
        p.run();
    }

}

user.csv as below

SessionId,UserId,UserName,VideoId,Duration,StartTime,Gender,Phone
1001,1,James,1,500,2011-11-11T10:00:00Z,M,1111111111
1002,2,Leena,2,500,2011-11-12T10:00:00Z,F,1111111112
1003,3,James,2,1500,2011-12-12T10:00:00Z,M,11111113
1004,3,James,2,1500,2011-12-12T10:00:00Z,,1111111114
1005,3,James,2,1500,2011-12-12T10:00:00Z,O,1111111115
1005,3,abcd,2,1500,2011-12-12T10:00:00Z,O,1111111116
1004,3,def,2,1500,2011-12-12T10:00:00Z,,1111111117


Expected output
SessionId,UserId,UserName,VideoId,Duration,StartTime,Gender,Phone
1001,1,James,1,500,2011-11-11T10:00:00Z,M,1111111111
1002,2,Leena,2,500,2011-11-12T10:00:00Z,F,1111111112
1003,3,James,2,1500,2011-12-12T10:00:00Z,M,11111113```
Ashok
  • 13
  • 7

1 Answers1

3

By default, Distinct transform uses the whole input element (row) for comparison. So, if you need to filter the duplicates based only on specific column, then you have to use Distinct.withRepresentativeValueFn(SerializableFunction) and provide a function that maps each element to a representative value.

Alexey Romanenko
  • 1,353
  • 5
  • 11
  • Hey @Alexey, can you please share an example for the SerializableFunction in this case – Yoyo Mar 01 '23 at 17:20