4

I would like to find tables with a specific column in a database on databricks by pyspark sql.

I use the following code but it does not work. https://medium.com/@rajnishkumargarg/find-all-the-tables-by-column-name-in-hive-51caebb94832

On SQL server my code:

   SELECT Table_Name, Column_Name 
   FROM INFORMATION_SCHEMA.COLUMNS
   WHERE TABLE_CATALOG = 'YOUR_DATABASE'
   AND COLUMN_NAME LIKE '%YOUR_COLUMN%'

but, I cannot find out how to do the same thing on pyspark sql ?

thanks

user3448011
  • 1,469
  • 1
  • 17
  • 39

4 Answers4

5

The SparkSession has a property catalog. This catalog's method listTables returns a list of all tables known to the SparkSession. With this list you can query all columns for each table with listColumns

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("test").getOrCreate()

spark.sql("CREATE TABLE tab1 (name STRING, age INT) USING parquet")
spark.sql("CREATE TABLE tab2 (name STRING, age INT) USING parquet")
spark.sql("CREATE TABLE tab3 (street STRING, age INT) USING parquet")

for table in spark.catalog.listTables():
    for column in spark.catalog.listColumns(table.name):
        if column.name == 'name':
            print('Found column {} in table {}'.format(column.name, table.name))

prints

Found column name in table tab1
Found column name in table tab2

Both methods, listTables and listColumns accept a database name as an optional argument if you want to restrict your search to a single database.

werner
  • 13,518
  • 6
  • 30
  • 45
0

I had a similar problem to OP, I needed to find all columns - including nested columns - that match a LIKE clause.

I wrote a post about it here https://medium.com/helmes-people/how-to-view-all-databases-tables-and-columns-in-databricks-9683b12fee10

But you can find the full code below.

The benefit of this solution, in comparison with the previous answers, is that it works in case you need to search columns with LIKE '%%', as written by OP. Also, it allows you to search for name in nested fields. Finally, it creates a SQL like view, similar to INFORMATION_SCHEMA views.

from pyspark.sql.types import StructType

# get field name from schema (recursive for getting nested values)
def get_schema_field_name(field, parent=None):
  if type(field.dataType) == StructType:
    if parent == None:
      prt = field.name
    else:
      prt = parent+"."+field.name # using dot notation
    res = []
    for i in field.dataType.fields:
      res.append(get_schema_field_name(i, prt))
    return res
  else:
    if parent==None:
      res = field.name
    else:
      res = parent+"."+field.name
    return res
  
# flatten list, from https://stackoverflow.com/a/12472564/4920394
def flatten(S):
  if S == []:
    return S
  if isinstance(S[0], list):
    return flatten(S[0]) + flatten(S[1:])
  return S[:1] + flatten(S[1:])

# list of databases
db_list = [x[0] for x in spark.sql("SHOW DATABASES").rdd.collect()]

for i in db_list:
  spark.sql("SHOW TABLES IN {}".format(i)).createOrReplaceTempView(str(i)+"TablesList")

# create a query for fetching all tables from all databases
union_string = "SELECT database, tableName FROM "
for idx, item in enumerate(db_list):
  if idx == 0:
    union_string += str(item)+"TablesList WHERE isTemporary = 'false'"
  else:
    union_string += " UNION ALL SELECT database, tableName FROM {}".format(str(item)+"TablesList WHERE isTemporary = 'false'")
spark.sql(union_string).createOrReplaceTempView("allTables")

# full list = schema, table, column
full_list = []
for i in spark.sql("SELECT * FROM allTables").collect():
  table_name = i[0]+"."+i[1]
  table_schema = spark.sql("SELECT * FROM {}".format(table_name))
  column_list = []
  for j in table_schema.schema:
    column_list.append(get_schema_field_name(j))
  column_list = flatten(column_list)
  for k in column_list:
    full_list.append([i[0],i[1],k])
spark.createDataFrame(full_list, schema = ['database', 'tableName', 'columnName']).createOrReplaceTempView("allColumns")```
Kristo_R
  • 167
  • 1
  • 13
0
#The following code will create a TempView containing all the tables, 
# and all their columns  along with their type , for a specified database
cls = []
spark.sql("Drop view if exists allTables")
spark.sql("Drop view if exists allColumns")
for table in spark.catalog.listTables("TYPE_IN_YOUR_DB_NAME_HERE"):
    for column in spark.catalog.listColumns(table.name, table.database):
        cls.append([table.database,table.name, column.name, column.dataType])    
spark.createDataFrame(cls, schema = ['databaseName','tableName','columnName', 
'columnDataType']).createOrReplaceTempView("allColumns")
-1

SparkSession really has catalog property as werner mentioned.

If i understand you correctly, you want to get tables that has a specific column. you can try this code(sorry for scala code instead python):

  val databases = spark.catalog.listDatabases().select($"name".as("db_name")).as("databases")
  val tables = spark.catalog.listTables().select($"name".as("table_name"), $"database").as("tables")
  val tablesWithDatabase = databases.join(tables, $"databases.db_name" === $"tables.database", "inner").collect()
  tablesWithDatabase.foreach(row => {
    val dbName = row.get(0).asInstanceOf[String]
    val tableName = row.get(1).asInstanceOf[String]
    val columns = spark.catalog.listColumns(dbName, tableName)
    columns.foreach(column=>{
      if (column.name == "Your column")
        // Do your logic here
        null
    })
  })

Notice that i am doing collect so if you have a lot of tables/databases it can cause an OOM error, the reason im doing collect is because that in contrast to listTables or listDatabases methods, that can be called without arguments at all, listColumns need to get dbName and tableName, and it is not having any unique column id match to table.

So the search of the column will be done locally on the driver.

Hope that was helping.

ShemTov
  • 687
  • 3
  • 8