We are trying to integrate ES (1.7.2, 4 node cluster) with Spark (1.5.1, compiled with hive and hadoop with scala 2.11, 4 node cluster), there is hdfs coming into equation (hadoop 2.7,4 nodes) and thrift jdbc server and elasticsearch-hadoop-2.2.0-m1.jar
Thus, there are two ways of executing statement on ES.
Spark SQL with scala
val conf = new SparkConf().setAppName("QueryRemoteES").setMaster("spark://node1:37077").set("spark.executor.memory","2g") conf.set("spark.logConf", "true") conf.set("spark.cores.max","20") conf.set("es.index.auto.create", "false") conf.set("es.batch.size.bytes", "100mb") conf.set("es.batch.size.entries", "10000") conf.set("es.scroll.size", "10000") conf.set("es.nodes", "node2:39200") conf.set("es.nodes.discovery","true") conf.set("pushdown", "true") sc.addJar("executorLib/elasticsearch-hadoop-2.2.0-m1.jar") sc.addJar("executorLib/scala-library-2.10.1.jar") sqlContext.sql("CREATE TEMPORARY TABLE geoTab USING org.elasticsearch.spark.sql OPTIONS (resource 'geo_2/kafkain')" ) val all: DataFrame = sqlContext.sql("SELECT count(*) FROM geoTab WHERE transmittersID='262021306841042'") .....
Thrift server (code executed on spark)
.... polledDataSource = new ComboPooledDataSource() polledDataSource.setDriverClass("org.apache.hive.jdbc.HiveDriver") polledDataSource.setJdbcUrl("jdbc:hive2://node1:30001") polledDataSource.setMaxPoolSize(5) dbConnection = polledDataSource.getConnection dbStatement = dbConnection.createStatement val dbResult = dbStatement.execute("CREATE TEMPORARY EXTERNAL TABLE IF NOT EXISTS geoDataHive6(transmittersID STRING,lat DOUBLE,lon DOUBLE) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.resource' = 'geo_2/kafkain','es.query'='{\"query\":{\"term\":{\"transmittersID\":\"262021306841042\"}}}','es.nodes'='node2','es.port'='39200','es.nodes.discovery' = 'false','es.mapping.include' = 'trans*,point.*','es.mapping.names' = 'transmittersID:transmittersID,lat:point.lat,lon:point.lon','pushdown' = 'true')") dbStatement.setFetchSize(50000) dbResultSet = dbStatement.executeQuery("SELECT count(*) FROM geoDataHive6") .....
I have following issues and due to fact that they are connected, I have decided to pack them into one question on stack:
It seems that method using Spark SQL supports pushdown of what goes behind WHERE (whether es.query is specified or not), time of execution is the same and is acceptable. But solution number 1 definitely does not support pushdow of aggregating functions, i.e. presented count(*) is not executed on side of ES, but only after all data are retrieved - ES returns rows and Spark SQL counts them. Please confirm if this is correct behaviour
Solution number one behaves strange in that whether pushdown is passed true or false, time is equal
Solution number 2 seems to support no pushdown, it does not matter in what way I try to specify the sub-query, be it part of the table definition or in WHERE clause of the statement, it seems it is just fetching all the huge index and then to make the maths on it. Is it so that thrift-hive is not able to do pushdown on ES
I'd like to trace queries in elastic search, I do set following:
//logging.yml index.search.slowlog: TRACE, index_search_slow_log_file index.indexing.slowlog: TRACE, index_indexing_slow_log_file additivity: index.search.slowlog: true index.indexing.slowlog: true
All index.search.slowlog.threshold.query,index.search.slowlog.threshold.fetch and even index.indexing.slowlog.threshold.index are set to 0ms. And I do see in slowlog file common statements executed from sense (so it works). But I don't see Spark SQL or thrift statements executed against ES. I suppose these are scan&scroll statement because if i execute scan&scroll from sense, these are also not logged. Is it possible somehow to trace scan&scroll on side of ES?