2

We have our data in SQL Server at the moment, we are trying to move them to our s3 bucket as parquet files. The intention is to analyse this s3 data in AWS EMR (Spark, Hive & Presto mainly). We don't want to store our data in HDFS.

  1. What are the choices here? so far from our knowledge, it seems we can use either spark or sqoop for this import. Though sqoop is faster than Spark in this case due to parallelism (parallel db connections), it seems writing parquet file from sqoop to s3 is not possible - Sqoop + S3 + Parquet results in Wrong FS error . Workaround is to move to hdfs and then to s3. However this seems to be non-efficient. How about using SparkSQL to pull this data from SQL Server and write as parquet in s3 ?

  2. Once we load this data as parquet in this format

    s3://mybucket/table_a/day_1/(parquet files 1 ... n).
    s3://mybucket/table_a/day_2/(parquet files 1 ... n).
    s3://mybucket/table_a/day_3/(parquet files 1 ... n).
    

How can I combine them together as a single table and query using Hive. I understand that we can create hive external table pointing to s3, but can we point to multiple files?

Thanks.

EDIT: Adding this as requested.

org.apache.hive.service.cli.HiveSQLException: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask at org.apache.hive.service.cli.operation.Operation.toSQLException(Operation.java:380) at org.apache.hive.service.cli.operation.SQLOperation.runQuery(SQLOperation.java:257) at org.apache.hive.service.cli.operation.SQLOperation.access$800(SQLOperation.java:91) at org.apache.hive.service.cli.operation.SQLOperation$BackgroundWork$1.run(SQLOperation.java:348) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) at org.apache.hive.service.cli.operation.SQLOperation$BackgroundWork.run(SQLOperation.java:362) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

ds_user
  • 2,139
  • 4
  • 36
  • 71

4 Answers4

3

Though I am little late, however for future reference. In our project, we are exactly doing this and I would prefer Sqoop over Spark.

Reason: I used Glue to read data from Mysql to S3 and the reads are not parallel (Has AWS Support looks at it and that's how Glue(which uses Pyspark) work but writing to S3 once the read is complete its parallel). This is not efficient and its slow. 100GB of data to be read and written to S3 takes 1.5Hr.

So i used Sqoop on EMR with Glue Catalog turned on(so hive metastore is on AWS) and i am able to write to S3 directly from Sqoop which is way faster 100GB of data read takes 20mins.

You will have to set the set hive.metastore.warehouse.dir=s3:// and you should see you data being written to S3 if you do an hive-import or just direct write.

Krish
  • 390
  • 4
  • 15
2

The Spark read jdbc pull the data with mutliple connections. Here is the link

def
jdbc(url: String, table: String, columnName: String, lowerBound: Long, upperBound: Long, numPartitions: Int, connectionProperties: Properties): 

Construct a DataFrame representing the database table accessible via JDBC URL url named table. Partitions of the table will be retrieved in parallel based on the parameters passed to this function.

Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash your external database systems.

url
JDBC database url of the form jdbc:subprotocol:subname.

table
Name of the table in the external database.

columnName
the name of a column of integral type that will be used for partitioning.

lowerBound
the minimum value of columnName used to decide partition stride.

upperBound
the maximum value of columnName used to decide partition stride.

numPartitions
the number of partitions. This, along with lowerBound (inclusive), upperBound (exclusive), form partition strides for generated WHERE clause expressions used to split the column columnName evenly. When the input is less than 1, the number is set to 1.

connectionProperties
JDBC database connection arguments, a list of arbitrary string tag/value. Normally at least a "user" and "password" property should be included. "fetchsize" can be used to control the number of rows per fetch.DataFrame

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader

http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases

Create hive table with partition columns as date and save and specify the following location

create table table_name (
  id                int,
  dtDontQuery       string,
  name              string
)
partitioned by (date string) Location s3://s3://mybucket/table_name/

Add a column in your data called as date and populate it with sysdate. You no need to add the column if it is not required, we can just populate the location. But it can be an audit column for your analytics also. Use spark dataframe.partitionBy(date).write.parquet.location(s3://mybucket/table_name/)

Daily Perform the MSCK repair on the hive table So the New Partition is added to the table.

Apply the numPartitions on non numerical columns is by creating the hash function of that column into number of connections you want and use that column

loneStar
  • 3,780
  • 23
  • 40
  • Thanks, but where is multiple connection here? And where is that baselocation? – ds_user Feb 03 '18 at 02:39
  • Thank you. But how can we do multiple connections here? – ds_user Feb 03 '18 at 03:02
  • numPartitions is number of connections. – loneStar Feb 03 '18 at 03:04
  • Thank you. Is it possible to use datetime column of sql server as partition column and specify one day as one partition? How can we decide the numPartitions? What will happen if we have 30 days, but we specify numPartitions = 4. ? – ds_user Feb 03 '18 at 05:50
  • The problem is that I dont have incremental id column, but have many date columns – ds_user Feb 03 '18 at 06:16
  • Hi. If i have a table with 10000 records and i specify 5 numpartitions with lowerbound as 1000 and upperbound as 6000, does that mean the rows below 1000 and above 6000 will be discarded? – ds_user Feb 08 '18 at 01:27
  • @ds_user yes it will skip the records, If you dont have incremental id use the hash of a field as incremental id. – loneStar Feb 08 '18 at 13:10
1

Spark is a pretty good utility tool. You can easily connect to a JDBC data source, and you can write to S3 by specifying credentials and an S3 path (e.g. Pyspark Save dataframe to S3).

If you're using AWS, your best bet for Spark, Presto and Hive is to use the AWS Glue Metastore. This is a data catalog that registers your s3 objects as tables within databases, and provides an API for locating those objects.

The answer to your Q2 is yes, you can have a table that refers to multiple files. You'd normally want to do this if you have partitioned data.

Kirk Broadhurst
  • 27,836
  • 16
  • 104
  • 169
  • Thanks. For your answer to first question, spark is good for processing, but how good it is for load compared to sqoop which supports parallelism. I have more than 100 tables each with millions of records. Next thing, answer to second question, can you point to any link or document to create a table that refers to multiple files. – ds_user Feb 03 '18 at 02:45
0

You can create the hive external table as follows

create external table table_a (
 siteid                    string,
 nodeid                    string,
 aggregation_type          string
 )
 PARTITIONED BY (day string)
 STORED AS PARQUET
 LOCATION 's3://mybucket/table_a';

Then you can run the following command to register the partition files stored under each days directory into HiveMatastore

 MSCK REPAIR TABLE table_a;

Now you can access your files through hive queries. We have used this approach in our project and working well. After the above command, you can run the query

 select * from table_a where day='day_1';

Hope this helps.

-Ravi

Ravikumar
  • 1,121
  • 1
  • 12
  • 23
  • 1
    Thanks. But can you answer first question, load data from sql server to s3? – ds_user Feb 03 '18 at 02:59
  • Use SparSQL to read the data from sql server and save the results into s3 table. spark.read.jdbc(url=jdbcUrl, table='tablename', column='partition_column', lowerBound=1, upperBound=100000, numPartitions=100) – Ravikumar Feb 03 '18 at 03:07
  • The number of partitions is important, you should carefully choose the parallelism to avoid the database lock. – Ravikumar Feb 03 '18 at 03:13
  • Thank you. Is it possible to use datetime column of sql server as partition column and specify one day as one partition? How can we decide the numPartitions? What will happen if we have 30 days, but we specify numPartitions = 4. ? – ds_user Feb 03 '18 at 05:50
  • The problem is that I dont have incremental id column, but have many date columns – ds_user Feb 03 '18 at 06:16
  • If you know the date range upfront, you can build the queries by setting the date range and build your logic in iterative manner to get all the dates. query= "(select * from employees where date < '2018-02-01' and date < '2018-02-03') table_rec" val df = spark.read.jdbc(url=jdbcUrl, dbtable=query, properties=connectionProperties) df.partitionBy(date).write.parquet.location(s3://mybucket/table_name/) – Ravikumar Feb 07 '18 at 20:57
  • Thanks, i managed to do that. But when i run MSCK REPAIR TABLE table_a through spark.sql, i am getting an error - MSCK REPAIR TABLE only works on partitioned tables: – ds_user Feb 07 '18 at 21:00
  • Yes, this is for partitioned tables only. Hope you have defined the s3 tables with PARTITIONED BY (day string), am I missing something – Ravikumar Feb 07 '18 at 22:31
  • Sorry confused with that, I am already specifying numPartitions as 12 and it creates 12 files in my s3 bucket, doesnt it mean it is already partitioned? Do I have to specify PARTITIONED BY ? – ds_user Feb 07 '18 at 22:35
  • Yes, you need to specify PARTITIONED BY to make the table as partitioned table. The numPartitions is used to control the number of executors running parallel. This is spark partitions and not the hive table partitions. The hive table partitions to be define when you create the schema of the table. If you want to optimize query performance, define hive table with partitioned. – Ravikumar Feb 07 '18 at 22:43
  • Thanks, I created an external table with that, however when i run MSCK REPAIR TABLE now, getting this error - org.apache.hive.service.cli.HiveSQLException: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask at org.apache.hive.service.cli.operation.Operation.toSQLException(Operation.java:380) ) – ds_user Feb 07 '18 at 22:55
  • could you please provide full stack trace – Ravikumar Feb 08 '18 at 22:53
  • Added it in the question. – ds_user Feb 08 '18 at 22:56
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/164793/discussion-between-ravikumar-and-ds-user). – Ravikumar Feb 08 '18 at 23:06