2

I'm making my first steps into Spark after using Hive EMR for a while.

I want to read an Hive tabled that was saved to S3 in the following format:
s3://<bucket>/<rootpath>/date=<date>/fileNames

I can use the answer in this question but then I lose the connection between the dataRows to the date, that is because I didn't saved the date inside the files.

Is there a simple way to have the file name which each row of data?

Community
  • 1
  • 1
Roee Gavirel
  • 18,955
  • 12
  • 67
  • 94

2 Answers2

0

You can use the wholeTextFiles to read the rdd. This will read in each file with the filename as the key, and the entire content of the file as the value. From there, you should be able to use the flatMapValues to seperate each record into its own k/v pair.

val input = sc.wholeTextFiles(s3://...)
val inputFlat = input.flatMapValues(line => line.split("\n"))

For this example, if your path was /user/hive/date=December/part-0000 and the contents of part-0000 were

Joe December-28 Something
Ryan December-29 AnotherThing

The output would look like this:

input.take(1)
(/user/hive/date=December/part-0000, Joe December-28 Something\n Ryan December-29 AnotherThing)

inputFlat.take(2)
(/user/hive/date=December/part-0000, Joe December-28 Something)
(/user/hive/date=December/part-0000, Ryan December-29 AnotherThing)

I suppose you could try the following. It would be a bit slow to read the records, but after the repartition you can maxmize the parallel processing

inputFlat.flatMapValues(//some split).repartition(numWorkers)

One other potential thing we could try is using this: In hive, you can retrieve the file the record was found in using the virtual column named INPUT__FILE__NAME, for example:

select INPUT__FILE__NAME, id, name from users where ...;

I'm not sure it would work, but you could try using that in your .sql api. You would have to make sure your sqlContext has hive-site.xml.

Joe Widen
  • 2,378
  • 1
  • 15
  • 21
0

If all you need from the file name is the date as it sounds, you don't need to get the file name as spark and hive do that for you automatically if you create the table correctly. Let me demonstrate:

# in hive
hive> create table t1 ( name string) partitioned by  (date string) STORED AS TEXTFILE LOCATION 'your s3 path';
# data.txt contains 'john' and 'jay' in two different lines
hive> load data local inpath 'data.txt' into table t1 PARTITION(date='2015-12-30');
hive> select * from t1;
OK
john    2015-12-30
jay 2015-12-30
# in spark-shell
scala> sqlContext.sql("select * from t1").foreach(println);
[john,2015-12-30]
[jay,2015-12-30]

which I think it's what you want. It has the other advantage that your data can take the performance benefits of partitioning when querying.

Roberto Congiu
  • 5,123
  • 1
  • 27
  • 37
  • Not really sure how to implement it on Java. – Roee Gavirel Dec 30 '15 at 13:04
  • Which part ? Creating the table is just a hive query, and to add all the existing partitions you only need to issue "ALTER TABLE yourtable RECOVER PARTITIONS". After that you can just use your table normally. None of the above is scala-specific. – Roberto Congiu Dec 30 '15 at 18:47