1

In my code below I try to instantiate redis-py connection using env variable at URL. The problem is that when I use foreach or foreachPartition the env variable isn't recognized in #save_on_redis method.

I just try to create redis connection outside, but I receive "pickle.PicklingError: Can't pickle 'lock' object", because spark try to run these two methods, at the same time, on all nodes.

Question: How I can use env variables on the method passed as argument to foreach or foreachPartition ?

import os
from pyspark.sql import SparkSession
import redis

spark = (SparkSession
        .builder
        .getOrCreate())

print "---------"
print os.getenv("REDIS_REPORTS_URL")
print "---------"

def save_on_redis(row):
    redis_ = redis.StrictRedis(host=os.getenv("REDIS_REPORTS_URL"), port=6379, db=0)
    print os.getenv("REDIS_REPORTS_URL")
    print redis_
    redis_.set("#teste#", "fagner")


df  = spark.createDataFrame([(0,1), (0,1), (0,2)], ["id", "score"])
df.foreach(save_on_redis)
seufagner
  • 1,290
  • 2
  • 18
  • 25

1 Answers1

1

I suggest you to get env variable in your driver process and pass it as a python variable to worker processes, where you could set your environment with using os.putenv

Example:

In [1]: import os

In [2]: a = sc.parallelize(range(20))

In [3]: os.getenv('MY_VAR')
Out[3]: 'some_value'

In [4]: def f(iter):
    import os
    return (str(os.getenv('MY_VAR')),)
   ...:

In [5]: a.mapPartitions(f).collect()
Out[5]: ['None', 'None']

In [6]: my_var = os.getenv('MY_VAR')

In [6]: def f2(iter):
    import os
    from subprocess import check_output
    os.putenv('MY_VAR', my_var)
    return (check_output('env | grep MY_VAR', shell=True), my_var)
   ....:

In [7]: a.mapPartitions(f2).collect()
Out[7]:
['MY_VAR=some_value\n',
 'some_value',
 'MY_VAR=some_value\n',
 'some_value']

PS. According to this answer, it is preferable to directly modify os.environ mapping object rather than use os.putenv

Community
  • 1
  • 1
Timofey Chernousov
  • 1,284
  • 8
  • 12