-1

I want to drop duplicate value of dataset

ex :

+----------+---------------+--------------------+--------------------+---------+----+-------------+
|     e_key|f_timestamp_day|                 key|               value|f_country|f_os|received_date|
+----------+---------------+--------------------+--------------------+---------+----+-------------+
|    Tryout|     2020-04-01|      item_guid_list|            a^a^a^b |       FR| iOS|   2020-04-01|
|    Tryout|     2020-04-01|            sku_list|         c^c^d^e^f^f|       FR| iOS|   2020-04-01|

to

+----------+---------------+--------------------+--------------------+---------+----+-------------+
|     e_key|f_timestamp_day|                 key|               value|f_country|f_os|received_date|
+----------+---------------+--------------------+--------------------+---------+----+-------------+
|    Tryout|     2020-04-01|      item_guid_list|                  a |       FR| iOS|   2020-04-01|
|    Tryout|     2020-04-01|      item_guid_list|                  b |       FR| iOS|   2020-04-01|
|    Tryout|     2020-04-01|            sku_list|                  c |       FR| iOS|   2020-04-01|
|    Tryout|     2020-04-01|            sku_list|                  d |       FR| iOS|   2020-04-01|
|    Tryout|     2020-04-01|            sku_list|                  e |       FR| iOS|   2020-04-01|
|    Tryout|     2020-04-01|            sku_list|                  f |       FR| iOS|   2020-04-01|

but

When I use flatmap

The result is

++
||
++
||
||
||
||
||

My code is

            StructType structType = new StructType();
            structType.add("e_key", DataTypes.StringType);
            structType.add("f_timestamp_day", DataTypes.StringType);
            structType.add("key", DataTypes.StringType);
            structType.add("value", DataTypes.StringType);
            structType.add("f_country", DataTypes.StringType);
            structType.add("f_os", DataTypes.StringType);
            structType.add("received_date", DataTypes.StringType);


            Dataset<Row> drop_duplicate_feature = 
            explode_events.flatMap(
                (FlatMapFunction<Row, Row>)row->{
                    List<Row> list = new ArrayList<Row>();
                    String value = row.getString(3);
                    String[] array_of_value = value.split("\\^");
                    array_of_value = new HashSet<String>(Arrays.asList(array_of_value)).toArray(new String[0]);
                    for(int index = 0; index < array_of_value.length; index++){
                        list.add(
                            RowFactory.create(row.get(0),row.get(1),row.get(2),array_of_value[index],row.get(4),row.get(5),row.get(6))
                        );
                    }
                    return list.iterator();
                }
                , RowEncoder.apply(structType)
            );

I use the flatmap to generate the distinct row and add it into list

Why the RowEncoder.apply() doesn't work ?

mthmulders
  • 9,483
  • 4
  • 37
  • 54
Hero
  • 9
  • 4

3 Answers3

1

Try this-

1. Load the test data provided

 String data = "   e_key|f_timestamp_day|                 key|               value|f_country|f_os|received_date\n" +
                "  Tryout|     2020-04-01|      item_guid_list|            a^a^a^b |       FR| iOS|   2020-04-01\n" +
                "  Tryout|     2020-04-01|            sku_list|         c^c^d^e^f^f|       FR| iOS|   2020-04-01";

        List<String> list1 = Arrays.stream(data.split(System.lineSeparator()))
                .map(s -> Arrays.stream(s.split("\\|"))
                        .map(s1 -> s1.replaceAll("^[ \t]+|[ \t]+$", ""))
                        .collect(Collectors.joining(","))
                )
                .collect(Collectors.toList());

        Dataset<Row> dataset = spark.read()
                .option("header", true)
                .option("inferSchema", true)
                .option("sep", ",")
                .option("nullValue", "null")
                .csv(spark.createDataset(list1, Encoders.STRING()));
        dataset.show(false);
        dataset.printSchema();
        /**
         * +------+-------------------+--------------+-----------+---------+----+-------------------+
         * |e_key |f_timestamp_day    |key           |value      |f_country|f_os|received_date      |
         * +------+-------------------+--------------+-----------+---------+----+-------------------+
         * |Tryout|2020-04-01 00:00:00|item_guid_list|a^a^a^b    |FR       |iOS |2020-04-01 00:00:00|
         * |Tryout|2020-04-01 00:00:00|sku_list      |c^c^d^e^f^f|FR       |iOS |2020-04-01 00:00:00|
         * +------+-------------------+--------------+-----------+---------+----+-------------------+
         *
         * root
         *  |-- e_key: string (nullable = true)
         *  |-- f_timestamp_day: timestamp (nullable = true)
         *  |-- key: string (nullable = true)
         *  |-- value: string (nullable = true)
         *  |-- f_country: string (nullable = true)
         *  |-- f_os: string (nullable = true)
         *  |-- received_date: timestamp (nullable = true)
         */

remove distinct from the array and explode it

        dataset.withColumn("value", explode(array_distinct(split(col("value"), "\\^"))))
                .show(false);
        /**
         * +------+-------------------+--------------+-----+---------+----+-------------------+
         * |e_key |f_timestamp_day    |key           |value|f_country|f_os|received_date      |
         * +------+-------------------+--------------+-----+---------+----+-------------------+
         * |Tryout|2020-04-01 00:00:00|item_guid_list|a    |FR       |iOS |2020-04-01 00:00:00|
         * |Tryout|2020-04-01 00:00:00|item_guid_list|b    |FR       |iOS |2020-04-01 00:00:00|
         * |Tryout|2020-04-01 00:00:00|sku_list      |c    |FR       |iOS |2020-04-01 00:00:00|
         * |Tryout|2020-04-01 00:00:00|sku_list      |d    |FR       |iOS |2020-04-01 00:00:00|
         * |Tryout|2020-04-01 00:00:00|sku_list      |e    |FR       |iOS |2020-04-01 00:00:00|
         * |Tryout|2020-04-01 00:00:00|sku_list      |f    |FR       |iOS |2020-04-01 00:00:00|
         * +------+-------------------+--------------+-----+---------+----+-------------------+
         */
Som
  • 6,193
  • 1
  • 11
  • 22
0

You can achieve same result using Spark SQL and org.apache.spark.sql.functions:

explode_events.select(
    $"e_key",
    $"f_timestamp_day",
    $"key",
    explode(split($"value","\\^")),
    $"f_country",
    $"f_os",
    $"received_date"
).show()
chlebek
  • 2,431
  • 1
  • 8
  • 20
0

use split function to split the value col and use array_disticnct and explode function to achieve the result .

from pyspark.sql.functions import *
#create df1
df1= df.withColumn("value",explode(array_distinct((split("VALUES","\\^")))))
RainaMegha
  • 126
  • 1
  • 4