81

E.g

sqlContext = SQLContext(sc)

sample=sqlContext.sql("select Name ,age ,city from user")
sample.show()

The above statement prints theentire table on terminal. But I want to access each row in that table using for or while to perform further calculations.

ouflak
  • 2,458
  • 10
  • 44
  • 49
Arti Berde
  • 1,182
  • 1
  • 11
  • 23

7 Answers7

77

You simply cannot. DataFrames, same as other distributed data structures, are not iterable and can be accessed using only dedicated higher order function and / or SQL methods.

You can of course collect

for row in df.rdd.collect():
    do_something(row)

or convert toLocalIterator

for row in df.rdd.toLocalIterator():
    do_something(row)

and iterate locally as shown above, but it beats all purpose of using Spark.

zero323
  • 322,348
  • 103
  • 959
  • 935
  • 1
    Newbie question: As iterating an already collected dataframe "beats the purpose", from a dataframe, how should I pick the rows I need for further processing? – Jari Turkia Jun 23 '21 at 09:08
  • 2
    Did some reading and looks like forming a new dataframe with `where()` would be the Spark-way of doing it properly. – Jari Turkia Jun 23 '21 at 10:16
  • 3
    "it beats all purpose of using Spark" is pretty strong and subjective language. The `collect()` method exists for a reason, and there are many valid uses cases for it. Once Spark is done processing the data, iterating through the final results might be the only way to integrate with/write to external APIs or legacy systems. – Marco Roy May 07 '22 at 08:58
70

To "loop" and take advantage of Spark's parallel computation framework, you could define a custom function and use map.

def customFunction(row):

   return (row.name, row.age, row.city)

sample2 = sample.rdd.map(customFunction)

or

sample2 = sample.rdd.map(lambda x: (x.name, x.age, x.city))

The custom function would then be applied to every row of the dataframe. Note that sample2 will be a RDD, not a dataframe.

Map may be needed if you are going to perform more complex computations. If you just need to add a simple derived column, you can use the withColumn, with returns a dataframe.

sample3 = sample.withColumn('age2', sample.age + 2)
David
  • 11,245
  • 3
  • 41
  • 46
  • Can you please tell me how to actually use the customFunction so that the return values could be used inside a loop for further procesing? I have a collect() based approach but my data is too large and it causes the Pyspark (v. 3) to fail. Thank you! – IrfanClemson Jun 01 '23 at 19:49
14

Using list comprehensions in python, you can collect an entire column of values into a list using just two lines:

df = sqlContext.sql("show tables in default")
tableList = [x["tableName"] for x in df.rdd.collect()]

In the above example, we return a list of tables in database 'default', but the same can be adapted by replacing the query used in sql().

Or more abbreviated:

tableList = [x["tableName"] for x in sqlContext.sql("show tables in default").rdd.collect()]

And for your example of three columns, we can create a list of dictionaries, and then iterate through them in a for loop.

sql_text = "select name, age, city from user"
tupleList = [{name:x["name"], age:x["age"], city:x["city"]} 
             for x in sqlContext.sql(sql_text).rdd.collect()]
for row in tupleList:
    print("{} is a {} year old from {}".format(
        row["name"],
        row["age"],
        row["city"]))
aaronsteers
  • 2,277
  • 2
  • 21
  • 38
9

Give A Try Like this

    result = spark.createDataFrame([('SpeciesId','int'), ('SpeciesName','string')],["col_name", "data_type"]); 
    for f in result.collect(): 
        print (f.col_name)
Dustin Sun
  • 5,292
  • 9
  • 49
  • 87
Bala cse
  • 119
  • 1
  • 2
8

It might not be the best practice, but you can simply target a specific column using collect(), export it as a list of Rows, and loop through the list.

Assume this is your df:

+----------+----------+-------------------+-----------+-----------+------------------+ 
|      Date|  New_Date|      New_Timestamp|date_sub_10|date_add_10|time_diff_from_now|
+----------+----------+-------------------+-----------+-----------+------------------+ 
|2020-09-23|2020-09-23|2020-09-23 00:00:00| 2020-09-13| 2020-10-03| 51148            | 
|2020-09-24|2020-09-24|2020-09-24 00:00:00| 2020-09-14| 2020-10-04| -35252           |
|2020-01-25|2020-01-25|2020-01-25 00:00:00| 2020-01-15| 2020-02-04| 20963548         |
|2020-01-11|2020-01-11|2020-01-11 00:00:00| 2020-01-01| 2020-01-21| 22173148         |
+----------+----------+-------------------+-----------+-----------+------------------+

to loop through rows in Date column:

rows = df3.select('Date').collect()

final_list = []
for i in rows:
    final_list.append(i[0])

print(final_list)
TobiSH
  • 2,833
  • 3
  • 23
  • 33
SRH
  • 137
  • 1
  • 3
2

If you want to do something to each row in a DataFrame object, use map. This will allow you to perform further calculations on each row. It's the equivalent of looping across the entire dataset from 0 to len(dataset)-1.

Note that this will return a PipelinedRDD, not a DataFrame.

Katya Willard
  • 2,152
  • 4
  • 22
  • 43
1

above

tupleList = [{name:x["name"], age:x["age"], city:x["city"]} 

should be

tupleList = [{'name':x["name"], 'age':x["age"], 'city':x["city"]} 

for name, age, and city are not variables but simply keys of the dictionary.

Keyur Potdar
  • 7,158
  • 6
  • 25
  • 40
ten2the6
  • 11
  • 1