6

I am importing a Postgres database into Spark. I know that I can partition on import, but that requires that I have a numeric column (I don't want to use the value column because it's all over the place and doesn't maintain order):

df = spark.read.format('jdbc').options(url=url, dbtable='tableName', properties=properties).load()
df.printSchema()

root
 |-- id: string (nullable = false)
 |-- timestamp: timestamp (nullable = false)
 |-- key: string (nullable = false)
 |-- value: double (nullable = false)

Instead, I am converting the dataframe into an rdd (of enumerated tuples) and trying to partition that instead:

rdd = df.rdd.flatMap(lambda x: enumerate(x)).partitionBy(20)

Note that I used 20 because I have 5 workers with one core each in my cluster, and 5*4=20.

Unfortunately, the following command still takes forever to execute:

result = rdd.first()

Therefore I am wondering if my logic above makes sense? Am I doing anything wrong? From the web GUI, it looks like the workers are not being used:

enter image description here

FullStack
  • 5,902
  • 4
  • 43
  • 77

1 Answers1

7

Since you already know you can partition by a numeric column this is probably what you should do. Here is the trick. First lets find a minimum and maximum epoch:

url = ...
properties = ...

min_max_query = """(
    SELECT
        CAST(min(extract(epoch FROM timestamp)) AS bigint), 
        CAST(max(extract(epoch FROM timestamp)) AS bigint)
    FROM tablename
) tmp"""

min_epoch, max_epoch = spark.read.jdbc(
    url=url, table=min_max_query, properties=properties
).first()

and use it to query the table:

numPartitions = ...

query = """(
    SELECT *, CAST(extract(epoch FROM timestamp) AS bigint) AS epoch
    FROM tablename) AS tmp"""

spark.read.jdbc(
    url=url, table=query,
    lowerBound=min_epoch, upperBound=max_epoch + 1, 
    column="epoch", numPartitions=numPartitions, properties=properties
).drop("epoch")

Since this splits data into ranges of the same size it is relatively sensitive to data skew so you should use it with caution.

You could also provide a list of disjoint predicates as a predicates argument.

predicates= [
    "id BETWEEN 'a' AND 'c'",
    "id BETWEEN 'd' AND 'g'",
    ...   # Continue to get full coverage an desired number of predicates
]

spark.read.jdbc(
    url=url, table="tablename", properties=properties, 
    predicates=predicates
)

The latter approach is much more flexible and can address certain issues with non-uniform data distribution but requires more knowledge about the data.

Using partitionBy fetches data first and then performs full shuffle to get desired number of partitions so it is relativistically expensive.

zero323
  • 322,348
  • 103
  • 959
  • 935
  • Cool, I didn't know we had these options. However, although `rdd.getNumPartitions()` returned `20` as expected when partitioning by the epoch, I am still stuck on the `rdd.first()` step and the slaves not being utilized. I should start another question about that. Thanks – FullStack Sep 20 '16 at 21:04
  • The first method partitions data into uniform size ranges. If data itself is not uniformly distributed this can result in a skew. The second approach is more flexible but requires some knowledge about data distribution. – zero323 Sep 20 '16 at 21:11
  • For my data, partitioning by timestamp should be quite uniform. Any way to get the size of each partition without performing another exhaustive iteration? I'm mapping over partitions and counting now, but it's taking forever. – FullStack Sep 20 '16 at 21:41
  • You can split range into uniform pieces and submit queries for each range directly against the database. – zero323 Sep 21 '16 at 11:09
  • Really useful answer! – frb Feb 18 '19 at 20:33