2

I have a data frame in PySpark called df. I have registered this df as a temptable like below.

df.registerTempTable('mytempTable')

date=datetime.now().strftime('%Y-%m-%d %H:%M:%S')

Now from this temp table I will get certain values, like max_id of a column id

min_id = sqlContext.sql("select nvl(min(id),0) as minval from mytempTable").collect()[0].asDict()['minval']

max_id = sqlContext.sql("select nvl(max(id),0) as maxval from mytempTable").collect()[0].asDict()['maxval']

Now I will collect all these values like below.

test = ("{},{},{}".format(date,min_id,max_id))

I found that test is not a data frame but it is a str string

>>> type(test)
<type 'str'>

Now I want save this test as a file in HDFS. I would also like to append data to the same file in hdfs.

How can I do that using PySpark?

FYI I am using Spark 1.6 and don't have access to Databricks spark-csv package.

zero323
  • 322,348
  • 103
  • 959
  • 935

1 Answers1

1

Here you go, you'll just need to concat your data with concat_ws and right it as a text:

query = """select concat_ws(',', date, nvl(min(id), 0), nvl(max(id), 0))
from mytempTable"""

sqlContext.sql(query).write("text").mode("append").save("/tmp/fooo")

Or even a better alternative :

from pyspark.sql import functions as f

(sqlContext
    .table("myTempTable")
    .select(f.concat_ws(",", f.first(f.lit(date)), f.min("id"), f.max("id")))
    .coalesce(1)
    .write.format("text").mode("append").save("/tmp/fooo"))
eliasah
  • 39,588
  • 11
  • 124
  • 154
  • this appends to the same directory /tmp/fooo is a directory path and not a file – eliasah Jun 01 '17 at 19:26
  • there is no append to the same file thing with hadoop/spark – eliasah Jun 01 '17 at 19:26
  • I believe there is the option using `hdfs dfs -appendToFile` –  Jun 01 '17 at 19:28
  • it uses the hdfs merge api but isn't available in spark. You can always use it afterwards with hadoop – eliasah Jun 01 '17 at 19:29
  • 1
    https://stackoverflow.com/questions/5700068/merge-output-files-after-reduce-phase – eliasah Jun 01 '17 at 19:30
  • No module error means that the library isn't available. I'm not sure I can be much help if you don't post a question with what you are trying to do and the error stack message. – eliasah Jun 09 '17 at 16:27
  • I noticed that in your answer, the first option doesn't work –  Jun 11 '17 at 14:30
  • what do you mean ? – eliasah Jun 11 '17 at 14:31
  • in this `query = """select concat_ws(',', date, nvl(min(id), 0), nvl(max(id), 0)) from mytempTable"""` date is not in the `temptable`. According `OP` it is in the script –  Jun 11 '17 at 14:38
  • 1
    Are you sure we are reading the same question ? Is this a practical joke ? – eliasah Jun 11 '17 at 14:41