1

I have an Azure system divided into three parts:

  1. Azure Data lake storage where I have some csv file.
  2. Azure Databricks where I need to make some process - exactly is to convert that csv file to Redis hash format.
  3. Azure Redis cache where I should put that converted data.

After mounting storage in databricks filesystem there is a need to process some data. How to convert csv data located in databricks filesystem to redisHash format and correctly to put it to Redis? Specifically, I'm not sure how to make a correct mapping by the following code below. Or maybe is there some way of additional transfer to SQL table which I cannot find.

Here is my example of code written on scala:

import com.redislabs.provider.redis._

val redisServerDnsAddress = "HOST"
val redisPortNumber = 6379
val redisPassword = "Password"
val redisConfig = new RedisConfig(new RedisEndpoint(redisServerDnsAddress, redisPortNumber, redisPassword))


val data = spark.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/mnt/staging/data/file.csv")

// What is the right way of mapping?
val ds = table("data").select("Prop1", "Prop2", "Prop3", "Prop4", "Prop5" ).distinct.na.drop().map{x =>
  (x.getString(0), x.getString(1), x.getString(2), x.getString(3), x.getString(4))
}

sc.toRedisHASH(ds, "data")

The error:

error: type mismatch;
 found   : org.apache.spark.sql.Dataset[(String, String)]
 required: org.apache.spark.rdd.RDD[(String, String)]
sc.toRedisHASH(ds, "data")

If I write the last string of code this way:

sc.toRedisHASH(ds.rdd, "data")

The error:

org.apache.spark.sql.AnalysisException: Table or view not found: data;
CHEEKATLAPRADEEP
  • 12,191
  • 1
  • 19
  • 42
Maksim
  • 31
  • 5
  • That error occurs when you try to query a table or view. You has built your dataframe from the csv, transform it to RDD according to the REDIS connector doc. – Emiliano Martinez Nov 09 '20 at 22:57

1 Answers1

1

Prepare some sample data to mimic the data loading from CSV file.

    val rdd = spark.sparkContext.parallelize(Seq(Row("1", "2", "3", "4", "5", "6", "7")))
    val structType = StructType(
      Seq(
        StructField("Prop1", StringType),
        StructField("Prop2", StringType),
        StructField("Prop3", StringType),
        StructField("Prop4", StringType),
        StructField("Prop5", StringType),
        StructField("Prop6", StringType),
        StructField("Prop7", StringType)
      )
    )
    val data = spark.createDataFrame(rdd, structType)

Transformation:

val transformedData = data.select("Prop1", "Prop2", "Prop3", "Prop4", "Prop5").distinct.na.drop()

Write dataframe to Redis, use Prop1 as a key and data as a Redis table name. See docs

    transformedData
      .write
      .format("org.apache.spark.sql.redis")
      .option("key.column", "Prop1")
      .option("table", "data")
      .mode(SaveMode.Overwrite)
      .save()

Check data in Redis:

127.0.0.1:6379> keys data:*
1) "data:1"

127.0.0.1:6379> hgetall data:1
1) "Prop5"
2) "5"
3) "Prop2"
4) "2"
5) "Prop4"
6) "4"
7) "Prop3"
8) "3"
fe2s
  • 425
  • 2
  • 9
  • Thank you a lot for this answer. Seems everything should work but I cannot find in docs where to put my config for connection to redis by your code example. As I understand it correctly without this config the following error occurs: `redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool` – Maksim Nov 10 '20 at 08:52
  • You should put them as a spark config options, e.g. val spark = SparkSession .builder() .appName("redis-df") .master("local[*]") .config("spark.redis.host", "localhost") .config("spark.redis.port", "6379") .getOrCreate() Another way is to override these connection settings directly with dataframe options, see https://github.com/RedisLabs/spark-redis/blob/master/doc/dataframe.md#dataframe-options – fe2s Nov 10 '20 at 13:24
  • In Databricks the spark config options can be changed like described here https://stackoverflow.com/questions/58688544/azure-databricks-how-to-add-spark-configuration-in-databricks-cluster – fe2s Nov 10 '20 at 13:28
  • Yeah, with dataframe overrided options it works well. Thanks again! – Maksim Nov 10 '20 at 16:13
  • @Maksim Glad to know that your issue has resolved. You can accept it as an answer( click on the checkmark beside the answer to toggle it from greyed out to filled in). This can be beneficial to other community members. Thank you. – CHEEKATLAPRADEEP Nov 11 '20 at 10:03