16

From a PySpark SQL dataframe like

name age city
abc   20  A
def   30  B

How to get the last row.(Like by df.limit(1) I can get first row of dataframe into new dataframe).

And how can I access the dataframe rows by index.like row no. 12 or 200 .

In pandas I can do

df.tail(1) # for last row
df.ix[rowno or index] # by index
df.loc[] or by df.iloc[]

I am just curious how to access pyspark dataframe in such ways or alternative ways.

Thanks

ZygD
  • 22,092
  • 39
  • 79
  • 102
Satya
  • 5,470
  • 17
  • 47
  • 72

4 Answers4

9

How to get the last row.

If you have a column that you can use to order dataframe, for example "index", then one easy way to get the last record is using SQL: 1) order your table by descending order and 2) take 1st value from this order

df.createOrReplaceTempView("table_df")
query_latest_rec = """SELECT * FROM table_df ORDER BY index DESC limit 1"""
latest_rec = self.sqlContext.sql(query_latest_rec)
latest_rec.show()

And how can I access the dataframe rows by index.like row no. 12 or 200 .

Similar way you can get record in any line

row_number = 12
df.createOrReplaceTempView("table_df")
query_latest_rec = """SELECT * FROM (select * from table_df ORDER BY index ASC limit {0}) ord_lim ORDER BY index DESC limit 1"""
latest_rec = self.sqlContext.sql(query_latest_rec.format(row_number))
latest_rec.show()

If you do not have "index" column you can create it using

from pyspark.sql.functions import monotonically_increasing_id

df = df.withColumn("index", monotonically_increasing_id())
Danylo Zherebetskyy
  • 1,429
  • 13
  • 9
  • Many Thanks for the well explained Answer. great to now a new approach. – Satya Feb 26 '18 at 11:48
  • 1
    `monotonically_increasing_id()` [documentation](http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.functions.monotonically_increasing_id) ***"The current implementation puts the partition ID in the upper 31 bits, and the record number within each partition in the lower 33 bits."*** Therefore, this will not work as you think for a large DataFrames that may be stored across different partitions. You will not be able to reference the last row of your DataFrame, unless it is all in one partition. – Clay Feb 27 '18 at 20:17
  • @Clay The last part was more supplementary. But if a large DataFrame is really HUGE, i.e. does not fit `monotonically_increasing_id()` 's assumption **"data frame has less than 1 billion partitions, and each partition has less than 8 billion records"**, then one could use sql `ROW_NUMBER() OVER (PARTITION BY xxx ORDER BY yyy)` as an alternative. – Danylo Zherebetskyy Mar 01 '18 at 20:11
  • @DanyloZherebetskyy I was not referring to the case where each partition has >= 8 billion records. `monotonically_increasing_id()` does not guarantee ***consecutive*** indices. Therefore, you cannot use it to create an "index" column. If you do, you might use `filter()` to view the 20,000th row, and there may not be the number 20,000 in the "index" column you created with `monotonically_increasing_id()`. I have seen the output of this function jump from 526 to 28,622 as it moves from the end of the first partition to the beginning of the second partition. – Clay Mar 01 '18 at 20:28
  • @Clay Sorry, but I do not agree with you. The documentation, which you cite, explicitly says that this function works across partitions. In your practical example, I would assume that your data frame was ordered with respect to some other column or there was a bug on the server, etc. otherwise report the issue to the Spark-team – Danylo Zherebetskyy Mar 03 '18 at 01:41
  • @DanyloZherebetskyy The [documentation](http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.functions.monotonically_increasing_id) I cited explicitly says that the partition id is first 31 bits of the output 64 bit integer. Furthermore, the actual example shown in the [documentation](http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.functions.monotonically_increasing_id) shows the integers in the 'id' column created with `monotonically_increasing_id()` jump from *2* to *8589934592* '...`Row(id=2), Row(id=8589934592)`...' – Clay Mar 03 '18 at 12:23
  • 1
    @Clay ,probably, I see the point of confusion. `monotonically_increasing_id()` does what it says. The beauty of the provided SQL solutions within the context of the questions is that they do not require the created `index` be without `jumps` or start from 1, but rather just increase monotonically (the last index will have the largest value anyway, 12th largest index will have 12th largest value and `limit` takes care of it). If one needs creating index over the whole DataFrame, where index starts from 1 and increases by 1, then use SQL `row_number() over (order by ...)`. – Danylo Zherebetskyy Mar 04 '18 at 01:48
8

How to get the last row.

Long and ugly way which assumes that all columns are oderable:

from pyspark.sql.functions import (
    col, max as max_, struct, monotonically_increasing_id
)

last_row = (df
    .withColumn("_id", monotonically_increasing_id())
    .select(max(struct("_id", *df.columns))
    .alias("tmp")).select(col("tmp.*"))
    .drop("_id"))

If not all columns can be order you can try:

with_id = df.withColumn("_id", monotonically_increasing_id())
i = with_id.select(max_("_id")).first()[0]

with_id.where(col("_id") == i).drop("_id")

Note. There is last function in pyspark.sql.functions/ `o.a.s.sql.functions but considering description of the corresponding expressions it is not a good choice here.

how can I access the dataframe rows by index.like

You cannot. Spark DataFrame and accessible by index. You can add indices using zipWithIndex and filter later. Just keep in mind this O(N) operation.

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Hi, currently i am handling the last row by autoincrement ID column adding way or for small df, i was using toPandas().tail(1). Anyways thanks for answering. And that index access of dataframes that I have asked is because of, sometimes I have to replace a column value (by some col value equality condition and to do that I was taking help of an udf). But If i want to replace only one instance(particular index no. row), then i was having no ways to do that. Now I can use "zipWithIndex" as suggested. Thanks. – Satya Sep 17 '16 at 19:30
8
from pyspark.sql import functions as F

expr = [F.last(col).alias(col) for col in df.columns]

df.agg(*expr)

Just a tip: Looks like you still have the mindset of someone who is working with pandas or R. Spark is a different paradigm in the way we work with data. You don't access data inside individual cells anymore, now you work with whole chunks of it. If you keep collecting stuff and doing actions, like you just did, you lose the whole concept of parallelism that spark provide. Take a look on the concept of transformations vs actions in Spark.

Henrique Florencio
  • 3,440
  • 1
  • 18
  • 19
  • That tip makes me think that spark is not a great tool for working with time-series data then. Correct? – yeliabsalohcin Dec 27 '19 at 15:56
  • 2
    @yeliabsalohcin Spark is a great tool to work with time-series. But, in the question, the user wants to know a way to access data using indexes similar to pandas and what I meant, was that you should beware that spark works differently than pandas or R. – Henrique Florencio Jan 06 '20 at 15:27
0

Use the following to get a index column that contains monotonically increasing, unique, and consecutive integers, which is not how monotonically_increasing_id() work. The indexes will be ascending in the same order as colName of your DataFrame.

import pyspark.sql.functions as F
from pyspark.sql.window import Window as W

window = W.orderBy('colName').rowsBetween(W.unboundedPreceding, W.currentRow)

df = df\
 .withColumn('int', F.lit(1))\
 .withColumn('index', F.sum('int').over(window))\
 .drop('int')\

Use the following code to look at the tail, or last rownums of the DataFrame.

rownums = 10
df.where(F.col('index')>df.count()-rownums).show()

Use the following code to look at the rows from start_row to end_row the DataFrame.

start_row = 20
end_row = start_row + 10
df.where((F.col('index')>start_row) & (F.col('index')<end_row)).show()

zipWithIndex() is an RDD method that does return monotonically increasing, unique, and consecutive integers, but appears to be much slower to implement in a way where you can get back to your original DataFrame amended with an id column.

Clay
  • 2,584
  • 1
  • 28
  • 63