0

I have a DataFrame that I want to loop through its rows and add its values to a list that can be used by the driver? Broadcast variables are read-only and as far as I know accumulators are only for sum.

Is there a way to do this? am using spark 1.6.1

Here is the code that runs on the worker nodes. I tried passing the List to the constructor, but it did not work as it seems once the code is streamed to the worker nodes it does not return any values to the driver.

public class EnrichmentIdentifiersBuilder implements Serializable{

/**
 * 
 */
private static final long serialVersionUID = 269187228897275370L;
private List<Map<String, String>> extractedIdentifiers;



public EnrichmentIdentifiersBuilder(List<Map<String, String>> extractedIdentifiers) {
    //super();
    this.extractedIdentifiers = extractedIdentifiers;
}



public void addIdentifiers(DataFrame identifiers)
{
    final List<String> parameters=Arrays.asList(identifiers.schema().fieldNames());


    identifiers.foreach(new MyFunction<Row, BoxedUnit>() {


        /**
         * 
         */
        private static final long serialVersionUID = 1L;

        @Override
        public BoxedUnit apply(Row line)
        {
            for (int i = 0; i < parameters.size(); i++) 
            {

                Map<String, String> identifier= new HashMap<>();
                identifier.put(parameters.get(i), line.getString(i));
                extractedIdentifiers.add(identifier);
            }

            return BoxedUnit.UNIT;
        }
    });

}

}
zero323
  • 322,348
  • 103
  • 959
  • 935
Akrem
  • 90
  • 1
  • 5
  • `collect` probably, thought you'll be losing the benefits of the distributed nature of spark – David Jan 19 '18 at 17:38
  • You should always avoid changing the value of objects created in the driver on the workers. You may need to post related code for answers to be proposed. – ernest_k Jan 19 '18 at 17:38
  • So, essentially what you want to do is take a data frame and convert it into a list of maps made of key/values where keys are just fields in the data frame and values are their corresponding row values. Is that right? – ernest_k Jan 19 '18 at 17:55
  • @ErnestKiwele No, I am trying to change a dataframe and store it is data in a list of maps where the key of the map is the column name and the value is the corresponding value of that column in the row – Akrem Jan 19 '18 at 18:47
  • @Akrem we're saying the same thing. – ernest_k Jan 19 '18 at 19:01
  • Sorry, Yes, The same as you said @ErnestKiwele – Akrem Jan 19 '18 at 19:09

2 Answers2

0

Instead of trying to expose the list to the workers, you can rather convert rows into maps, and then collect the result in the driver:

this.extractedIdentifiers = identifiers.rdd().map(
    new MyFunction<Row, Map<String, String>>() {

    private static final long serialVersionUID = 1L;

    @Override
    public Map<String, String> apply(Row line)
    {
        Map<String, String> identifier= new HashMap<>();

        for (int i = 0; i < parameters.size(); i++) 
        {
            identifier.put(parameters.get(i), line.getString(i));
        }

        return identifier;
    }
}).collect(); //This returns the list of maps...

This is the correct way to do it as concurrent modification (were it possible) would be problematic. This code transforms each element of the array into a map with its value and then all the maps are collected back to the driver as a list.

ernest_k
  • 44,416
  • 5
  • 53
  • 99
0

Thanks for the idea!

I made slight changes to make that work. here is the code so anyone might need it in the future

public List<Map<String, String>> addIdentifiers(DataFrame identifiers)
{
    final List<String> parameters=Arrays.asList(identifiers.schema().fieldNames());
    List<Map<String, String>> extractedIdentifiers = new ArrayList<>();

    extractedIdentifiers = identifiers.javaRDD().flatMap( new FlatMapFunction<Row, Map<String, String>>() {

        /**
         * 
         */
        private static final long serialVersionUID = -2369617506532322680L;

        @Override
        public List<Map<String, String>> call(Row line) throws Exception {
              List<Map<String, String>> identifier= new ArrayList<>();

                for (int i = 0; i < parameters.size(); i++) 
                {
                    Map<String, String> keyValue= new HashMap<>();
                    keyValue.put(parameters.get(i), line.getString(i));
                    identifier.add(keyValue);
                }

                return  identifier;
        }



    }).collect();


    return extractedIdentifiers;
}

Also, there is a collection accumulator which can be used with the code in the question and it can be generated using javaSparkContext.sc().accumulablecollection ()

Akrem
  • 90
  • 1
  • 5