0

I need to save dataframes iteratively in libsvm format. My code is something like this

im_df = im_table.select("m_id", "fsz", "fnm")
all_recs_df = None
fake_df = None
firstRec = True
for eachRec in (im_df.rdd.zipWithIndex().map(lambda ((mi, fs, fn), i): (mi, fs, fn)).collect()):
    m_id = eachRec[0]
    fsz = eachRec[1]
    fnm = eachRec[2]

    volume_df = volume_table.select("id","m_id").filter(volume_table['m_id']==m_id)
    m_bytes = 0
    for eachVolRec in (volume_df.rdd.zipWithIndex().map(lambda ((id), i): (id)).collect()):
        each_v_id = eachVolRec[0]
        volume_m_id = eachVolRec[1]
        vsnp_df = vsnp_table.select("v_id","ssb").filter(vsnp_table['v_id']==each_v_id)
        vsnp_sum_df = vsnp_df.groupBy("v_id").agg(sum("ssb").alias("ssb_sum"))
        v_bytes = vsnp_sum_df.rdd.zipWithIndex().map(lambda ((vi, vb), i): (vi, vb)).collect()[0][1]
        print "\t total = %s" %(v_bytes)
        m_bytes += v_bytes

    print "im.fnm = %s, im.fsz = %s , total_snaphot_size_bytes: %s" %(fnm, fsz, m_bytes)
    if firstRec:
        firstRec = False
        all_recs_df = sqlContext.createDataFrame(sc.parallelize([Row(features=Vectors.dense(fsz, m_bytes), label=0.0)]))
        fake_df = sqlContext.createDataFrame(sc.parallelize([Row(features=Vectors.dense(fsz, 1000 * m_bytes), label=1.0)]))
        all_recs_df = all_recs_df.unionAll(fake_df)
        all_recs_df.registerTempTable("temp_table")
    else:
        each_rec_df = sqlContext.createDataFrame(sc.parallelize([Row(features=Vectors.dense(fsz, m_bytes), label=0.0)]))
        all_recs_df = sqlContext.sql("select * from temp_table")
        all_recs_df = all_recs_df.unionAll(each_rec_df)
        all_recs_df.registerTempTable("temp_table")

Now running the command all_recs_df = sqlContext.sql("select * from temp_table") gives the error no such table temp_table

and running the command all_recs_df.collect() gives the error 'NoneType' object has no attribute 'collect'

Apparently all_recs_df and temp_tableare out of context once the program exits for loop.

QUESTION: So what is the alternative to save dataframes in libsvm format iteratively

I tried to save dataframes to disk right away but I can't append data to the same file

MLUtils.saveAsLibSVMFile(d, "/tmp/test1")

Here d is a LabeledPoint RDD. Running the above command in for loop gives Output directory file:/tmp/test1 already exists

QUESTION: Is there a way to append data to existing libsvm format file

sah.stc
  • 105
  • 2
  • 2
  • 8

1 Answers1

1

I tried to save dataframes to disk right away but I can't append data to the same file

MLUtils.saveAsLibSVMFile(d, "/tmp/test1")

Here d is a LabeledPoint RDD. Running the above command in for loop gives Output directory file:/tmp/test1 already exists

QUESTION: Is there a way to append data to existing libsvm format file

You can save and overwrite your files as here but they are not handled by MLUtils.saveAsLibSVMFile().

With MLUtils.saveAsLibSVMFile() I think you can't overwrite existing files.

So,the following code doesn't append data to existing libsvm format files but it's a loop where you combine the data that you get at every cycle with data that you got at the previous cycles, so at the end you will save a single file:

from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.util import MLUtils

yourRDD = sc.emptyRDD() # start with an empty RDD

for elem in xrange(0,3): # your loop
    rdd_aux = sc.parallelize([LabeledPoint(elem,[elem*2,elem*3])]) #just an example
    #store and overwrite your new data in an auxiliary RDD at every cycle 
    yourRDD = yourRDD.union(rdd_aux) # combine your RDD_aux with the RDD that you want to make longer at every cycle

#yourRDD.take(3)
#[LabeledPoint(0.0, [0.0,0.0]), LabeledPoint(1.0, [2.0,3.0]), LabeledPoint(2.0, [4.0,6.0])]

MLUtils.saveAsLibSVMFile(yourRDD,"/your/path")

In this way you can append new RDD to your previous RDD, and then save a single file, instead of append new data to your existing file.

Community
  • 1
  • 1
titiro89
  • 2,058
  • 1
  • 19
  • 31