13

I have feather format file sales.feather that I am using for exchanging data between python and R.

In R I use the following command:

df = arrow::read_feather("sales.feather", as_data_frame=TRUE)

In python I used that:

df = pandas.read_feather("sales.feather")

What is the best way to load data from that file to memory into Spark instance operated from pyspark? I would like to also control pyspark.StorageLevel for data read from feather.

I don't want to use pandas to load data because it segfaults for my 19GB feather file, created from 45GB csv.

jangorecki
  • 16,384
  • 4
  • 79
  • 160
  • Is it required to directly load it into memory or is it allowed to store it into hive before loading it to spark? – cronoik Nov 23 '20 at 10:45
  • @cronoik Directly load into memory, or eventually mmap arrow file directly from spark with StorageLevel option. No hive in the middle. – jangorecki Nov 23 '20 at 10:54
  • 1
    Could you please try the following: `import pyarrow.feather as feather` `df = spark.createDataframe(feather.read_feather('sales.feather'))`. In case this is not working, can you please tell us how many columns your file has and if you have an index column (something like a primary key)? – cronoik Nov 26 '20 at 01:49
  • @cronoik if it's expected to work then you should post it as an answer. Will try. 9 columns, no index. – jangorecki Nov 26 '20 at 07:45
  • I am actually not sure whether this will also lead to a segfault. I currently don't have an infrastructure that I can crash. The proper approach would be to write an custom [converter](http://spark.apache.org/docs/latest/rdd-programming-guide.html#external-datasets) (check the python tab) on the jvm site, but I am currently a bit stuck. – cronoik Nov 26 '20 at 09:21
  • @cronoik it raises exception `module 'pandas' has no attribute '__version__'` in `read_feather: _check_pandas_version()` which seems like it tries to go via pandas – jangorecki Nov 26 '20 at 10:22
  • Yes, it will use pandas. It is also currently more like a test because I don't expect it to work. But `read_feather` can also slice your file which will reduce the memory (currently working on an answer). Which version of pandas are you using? Can upgrade it? – cronoik Nov 26 '20 at 10:26
  • @cronoik I do not use pandas in my pyspark environment. Neither pyspark and pyarrow installs pandas as dependency. Maybe they should? – jangorecki Nov 26 '20 at 10:28
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/225155/discussion-between-cronoik-and-jangorecki). – cronoik Nov 26 '20 at 10:36

4 Answers4

0

Ugly hack - use mapInArrow.

import pyarrow as pa


def read_arrow(spark, filename, schema=None):

    def mapper(iterator):
        with pa.memory_map(filename, "rb") as source:
            f = pa.ipc.open_file(source)
            for batch in iterator:
                for i in batch['id']:
                    yield f.get_batch(i.as_py())

    tmp_reader = pa.ipc.open_file(filename)
    num_batches = tmp_reader.num_record_batches
    if schema is None:
        # read first batch and convert just one row to pandas
        tmp_row = tmp_reader.get_batch(0)[:1]
        schema = spark.createDataFrame(tmp_row.to_pandas()).schema
    return spark.range(num_batches).mapInArrow(mapper, schema)


df = read_arrow(spark, "some-data.arrow")
df.show()

(BONUS: using zero-copied memmap values in Spark! yahoo!)

ei-grad
  • 792
  • 7
  • 19
-1

Perhaps you can consider switching to parquet format? Looks more suitable for your usecase, see What are the differences between feather and parquet?

dkapitan
  • 859
  • 2
  • 10
  • 21
-2

You can convert pandas dataframe to Spark dataframe as follows.

from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
spark_df = sqlContext.createDataFrame(pandas_df)
mayank agrawal
  • 2,495
  • 2
  • 13
  • 32
Vikram Patil
  • 628
  • 6
  • 20
  • 1
    this solution fails with out of memory error for 19GB feather file even on 120GB memory machine due to memory inefficiency, that is why I asked for direct way of reading to spark. – jangorecki Dec 22 '18 at 05:39
-2

from pyspark import SparkContext

sc = SparkContext("local", "App Name")
sql = SQLContext(sc)

then use createDataFrame like below:

spark_df = sql.createDataFrame(pandas_df)
Rubin bhandari
  • 1,873
  • 15
  • 20