19

I am working in Databricks.

I have a dataframe which contains 500 rows, I would like to create two dataframes on containing 100 rows and the other containing the remaining 400 rows.

+--------------------+----------+
|              userid| eventdate|
+--------------------+----------+
|00518b128fc9459d9...|2017-10-09|
|00976c0b7f2c4c2ca...|2017-12-16|
|00a60fb81aa74f35a...|2017-12-04|
|00f9f7234e2c4bf78...|2017-05-09|
|0146fe6ad7a243c3b...|2017-11-21|
|016567f169c145ddb...|2017-10-16|
|01ccd278777946cb8...|2017-07-05|

I have tried the below but I receive an error

df1 = df[:99]
df2 = df[100:499]


TypeError: unexpected item type: <type 'slice'>
pault
  • 41,343
  • 15
  • 107
  • 149
Data_101
  • 893
  • 7
  • 14
  • 25

7 Answers7

17

Initially I misunderstood and thought you wanted to slice the columns. If you want to select a subset of rows, one method is to create an index column using monotonically_increasing_id(). From the docs:

The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive.

You can use this ID to sort the dataframe and subset it using limit() to ensure you get exactly the rows you want.

For example:

import pyspark.sql.functions as f
import string

# create a dummy df with 500 rows and 2 columns
N = 500
numbers = [i%26 for i in range(N)]
letters = [string.ascii_uppercase[n] for n in numbers]

df = sqlCtx.createDataFrame(
    zip(numbers, letters),
    ('numbers', 'letters')
)

# add an index column
df = df.withColumn('index', f.monotonically_increasing_id())

# sort ascending and take first 100 rows for df1
df1 = df.sort('index').limit(100)

# sort descending and take 400 rows for df2
df2 = df.sort('index', ascending=False).limit(400)

Just to verify that this did what you wanted:

df1.count()
#100
df2.count()
#400

Also we can verify that the index column doesn't overlap:

df1.select(f.min('index').alias('min'), f.max('index').alias('max')).show()
#+---+---+
#|min|max|
#+---+---+
#|  0| 99|
#+---+---+

df2.select(f.min('index').alias('min'), f.max('index').alias('max')).show()
#+---+----------+
#|min|       max|
#+---+----------+
#|100|8589934841|
#+---+----------+
pault
  • 41,343
  • 15
  • 107
  • 149
  • use monotonically_increasing_id() can add monotonically increasing id to the table BUT it is not consecutive. so if you use limit(100), you wont' get 100 rows exactly – Gavin Oct 15 '18 at 18:05
  • @Gavin you will get exactly 100 rows because `limit(n)` will return exactly `n` rows (assuming there's that many in the table, if there are not it will return all of the rows.). This only relies on the `monotonically_increasing_id()` to sort the table. – pault Oct 15 '18 at 18:09
  • 1
    got it. but I cannot take the 200 to 300 rows using the index created by monotonically_increasing_id. right? – Gavin Oct 15 '18 at 18:13
  • @Gavin That is correct, you can not. But as I explained in [my answer to your other question](https://stackoverflow.com/questions/52792762/is-there-a-way-to-slice-dataframe-based-on-index-in-pyspark/52819758#52819758), relying on the order of the data is not recommended. In this example, the OP had a DataFrame with 500 rows- this technique likely does not generalize well for larger data. – pault Oct 15 '18 at 18:15
13

Spark dataframes cannot be indexed like you write. You could use head method to Create to take the n top rows. This will return a list of Row() objects and not a dataframe. So you can convert them back to dataframe and use subtract from the original dataframe to take the rest of the rows.

#Take the 100 top rows convert them to dataframe 
#Also you need to provide the schema also to avoid errors
df1 = sqlContext.createDataFrame(df.head(100), df.schema)

#Take the rest of the rows
df2 = df.subtract(df1)

You can use also SparkSession instead of spark sqlContext if you work on spark 2.0+. Also if you are not interested in taking the first 100 rows and you want a random split you can use randomSplit like this:

df1,df2 = df.randomSplit([0.20, 0.80],seed=1234)
Michail N
  • 3,647
  • 2
  • 32
  • 51
5

If I don't mind having same rows in both dataframe's then I can use sample. For e.g. I have a dataframe with 354 rows.

>>> df.count()
354

>>> df.sample(False,0.5,0).count() //approx. 50%
179

>>> df.sample(False,0.1,0).count() //approx. 10%
34

Alternatively, If I want to strictly split without duplicates being present, I could do

df1 = df.limit(100)     //100 rows
df2 = df.subtract(df1)  //Remaining rows
Bala
  • 11,068
  • 19
  • 67
  • 120
  • Using limit() is not straight forward due to lazy evaluation https://stackoverflow.com/questions/37147032/why-does-df-limit-keep-changing-in-pyspark – Gal_M Feb 10 '21 at 17:41
2

Try by this way :

df1_list = df.collect()[:99] #this will return list    
df1 = spark.createDataFrame(df1) #convert it to spark dataframe

similarly for this as well:

df2_list = df.collect()[100:499]
df2 = spark.createDataFrame(df2)
ou_ryperd
  • 2,037
  • 2
  • 18
  • 23
Arpit
  • 21
  • 2
0

In both solutions, I believe we need to change df1 to df1_list, and change df2 to df2_list in the second sentences.

  • As it’s currently written, your answer is unclear. Please [edit] to add additional details that will help others understand how this addresses the question asked. You can find more information on how to write good answers [in the help center](/help/how-to-answer). – Community Apr 23 '22 at 10:09
0

Here is my solution to slice a data frame by row:

def slice_df(df,start,end):
    return spark.createDataFrame(df.limit(end).tail(end - start))
G. Cohen
  • 604
  • 5
  • 4
-2

Providing a much less complicated solution here more similar to what was requested:

(Works in Spark 2.4 +)

# Starting
print('Starting row count:',df.count())
print('Starting column count:',len(df.columns))

# Slice rows
df2 = df.limit(3)
print('Sliced row count:',df2.count())

# Slice columns
cols_list = df.columns[0:1]
df3 = df.select(cols_list)
print('Sliced column count:',len(df3.columns))
Statmonger
  • 415
  • 6
  • 14