3

I have read data in chunks over a pyodbc connection using something like this :

import pandas as pd
import pyodbc
conn = pyodbc.connect("Some connection Details")
sql = "SELECT * from TABLES;"
df1 = pd.read_sql(sql,conn,chunksize=10)

Now I want to read all these chunks into one single spark dataframe using something like:

i = 0
for chunk in df1:
    if i==0:
        df2 = sqlContext.createDataFrame(chunk)
    else:
        df2.unionAll(sqlContext.createDataFrame(chunk))
    i = i+1

The problem is when i do a df2.count() i get the result as 10 which means only the i=0 case is working.Is this a bug with unionAll. Am i doing something wrong here??

Gaurav Dhama
  • 1,346
  • 8
  • 19

1 Answers1

5

The documentation for .unionAll() states that it returns a new dataframe so you'd have to assign back to the df2 DataFrame:

i = 0
for chunk in df1:
    if i==0:
        df2 = sqlContext.createDataFrame(chunk)
    else:
        df2 = df2.unionAll(sqlContext.createDataFrame(chunk))
    i = i+1

Furthermore you can instead use enumerate() to avoid having to manage the i variable yourself:

for i,chunk in enumerate(df1):
    if i == 0:
        df2 = sqlContext.createDataFrame(chunk)
    else:
        df2 = df2.unionAll(sqlContext.createDataFrame(chunk))

Furthermore the documentation for .unionAll() states that .unionAll() is deprecated and now you should use .union() which acts like UNION ALL in SQL:

for i,chunk in enumerate(df1):
    if i == 0:
        df2 = sqlContext.createDataFrame(chunk)
    else:
        df2 = df2.union(sqlContext.createDataFrame(chunk))

Edit:
Furthermore I'll stop saying furthermore but not before I say furthermore: As @zero323 says let's not use .union() in a loop. Let's instead do something like:

def unionAll(*dfs):
    ' by @zero323 from here: http://stackoverflow.com/a/33744540/42346 '
    first, *rest = dfs  # Python 3.x, for 2.x you'll have to unpack manually
    return first.sql_ctx.createDataFrame(
        first.sql_ctx._sc.union([df.rdd for df in dfs]),
        first.schema
    )

df_list = []
for chunk in df1:
    df_list.append(sqlContext.createDataFrame(chunk))

df_all = unionAll(df_list)
mechanical_meat
  • 163,903
  • 24
  • 228
  • 223
  • 1
    Furthermore you should really think twice before using `union(All)` in loop :) Hint: there is such thing like lineage out there. – zero323 Jul 31 '16 at 00:07
  • 1
    @zero323: I will suggest that the OP create a list then pass that list to `.union()`... – mechanical_meat Jul 31 '16 at 00:10
  • This will work only with RDDs so you'll need a little bit more code. You can also truncate but it is ugly in Python. – zero323 Jul 31 '16 at 00:13
  • @zero323: please see edit and let me know what you think. Thanks. – mechanical_meat Jul 31 '16 at 00:16
  • I am pretty sure that there is no varargs version of `union`. You can check the last snippet in http://stackoverflow.com/a/33744540/1560062 which is still suboptimal but doesn't use private API. Also there is this https://github.com/high-performance-spark/high-performance-spark-examples/blob/master/high_performance_pyspark/SQLLineage.py – zero323 Jul 31 '16 at 00:21
  • @zero323: thanks! I have incorporated your code -- with attribution and an upvote -- into my answer. – mechanical_meat Jul 31 '16 at 00:24
  • But by creating a df_list of all the chunks, wont you load all the chunks into memory at once. Chunks are primarily used so that we can load a table in small parts – Gaurav Dhama Jul 31 '16 at 09:55