3

I'm trying to start use DeltaLakes using Pyspark.

To be able to use deltalake, I invoke pyspark on Anaconda shell-prompt as —

pyspark — packages io.delta:delta-core_2.11:0.3.0

Here is the reference from deltalake — https://docs.delta.io/latest/quick-start.html

All commands for delta lake works fine from Anaconda shell-prompt.

On jupyter notebook, reference to a deltalake table gives error.Here is the code I am running on Jupyter Notebook -

df_advisorMetrics.write.mode("overwrite").format("delta").save("/DeltaLake/METRICS_F_DELTA")
spark.sql("create table METRICS_F_DELTA using delta location '/DeltaLake/METRICS_F_DELTA'")

Below is the code I am using at start of notebook to connect to pyspark -

import findspark
findspark.init()
findspark.find()

import pyspark
findspark.find()

Below is the error I get:

Py4JJavaError: An error occurred while calling o116.save. : java.lang.ClassNotFoundException: Failed to find data source: delta. Please find packages at http://spark.apache.org/third-party-projects.html

Any suggestions?

A. Nadjar
  • 2,440
  • 2
  • 19
  • 20
Mauryas
  • 41
  • 2
  • 8

3 Answers3

4

I have created a Google Colab/Jupyter Notebook example that shows how to run Delta Lake.

https://github.com/prasannakumar2012/spark_experiments/blob/master/examples/Delta_Lake.ipynb

It has all the steps needed to run. This uses the latest spark and delta version. Please change the versions accordingly.

Prasanna
  • 51
  • 5
0

A potential solution is to follow the techniques noted in Import PySpark packages with a regular Jupyter notebook.

Another potential solution is to download the delta-core JAR and place it in the $SPARK_HOME/jars folder so when you run jupyter notebook it automatically includes the Delta Lake JAR.

Denny Lee
  • 3,154
  • 1
  • 20
  • 33
-1

I use DeltaLake all the time from a Jupyter notebook.

Try the following in you Jupyter notebook running Python 3.x.

### import Spark libraries
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

### spark package maven coordinates - in case you are loading more than just delta
spark_packages_list = [
    'io.delta:delta-core_2.11:0.6.1',
]
spark_packages = ",".join(spark_packages_list)

### SparkSession 
spark = (
    SparkSession.builder
    .config("spark.jars.packages", spark_packages)
    .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") 
    .getOrCreate()
)

sc = spark.sparkContext

### Python library in delta jar. 
### Must create sparkSession before import
from delta.tables import *

Assuming you have a spark dataframe df

HDFS

Save

### overwrite, change mode="append" if you prefer
(df.write.format("delta")
.save("my_delta_file", mode="overwrite", partitionBy="partition_column_name")
)

Load

df_delta = spark.read.format("delta").load("my_delta_file")

AWS S3 ObjectStore

Initial S3 setup

### Spark S3 access
hdpConf = sc._jsc.hadoopConfiguration()
user = os.getenv("USER")

### Assuming you have your AWS credentials in a jceks keystore.
hdpConf.set("hadoop.security.credential.provider.path", f"jceks://hdfs/user/{user}/awskeyfile.jceks")

hdpConf.set("fs.s3a.fast.upload", "true")

### optimize s3 bucket-level parquet column selection
### un-comment to use
# hdpConf.set("fs.s3a.experimental.fadvise", "random")


### Pick one upload buffer option
hdpConf.set("fs.s3a.fast.upload.buffer", "bytebuffer") # JVM off-heap memory
# hdpConf.set("fs.s3a.fast.upload.buffer", "array") # JVM on-heap memory
# hdpConf.set("fs.s3a.fast.upload.buffer", "disk") # DEFAULT - directories listed in fs.s3a.buffer.dir

s3_bucket_path = "s3a://your-bucket-name"
s3_delta_prefix = "delta"  # or whatever

Save

### overwrite, change mode="append" if you prefer
(df.write.format("delta")
.save(f"{s3_bucket_path}/{s3_delta_prefix}/", mode="overwrite", partitionBy="partition_column_name")
)

Load

df_delta = spark.read.format("delta").load(f"{s3_bucket_path}/{s3_delta_prefix}/")

Spark Submit

Not directly answering the original question, but for completeness, you can do the following as well.

Add the following to your spark-defaults.conf file

spark.jars.packages                 io.delta:delta-core_2.11:0.6.1
spark.delta.logStore.class          org.apache.spark.sql.delta.storage.S3SingleDriverLogStore
spark.sql.extensions                io.delta.sql.DeltaSparkSessionExtension
spark.sql.catalog.spark_catalog     org.apache.spark.sql.delta.catalog.DeltaCatalog

Refer to conf file in spark-submit command

spark-submit \
--properties-file /path/to/your/spark-defaults.conf \
--name your_spark_delta_app \
--py-files /path/to/your/supporting_pyspark_files.zip \
--class Main /path/to/your/pyspark_script.py
Clay
  • 2,584
  • 1
  • 28
  • 63