@@ -53,6 +53,7 @@ class Dialect(BaseDialect):
5353 "TIMESTAMP_NTZ" : Timestamp ,
5454 # Text
5555 "STRING" : Text ,
56+ "VARCHAR" : Text ,
5657 # Boolean
5758 "BOOLEAN" : Boolean ,
5859 }
@@ -138,25 +139,47 @@ def create_connection(self):
138139 raise ConnectionError (* e .args ) from e
139140
140141 def query_table_schema (self , path : DbPath ) -> Dict [str , tuple ]:
141- # Databricks has INFORMATION_SCHEMA only for Databricks Runtime, not for Databricks SQL.
142- # https://docs.databricks.com/spark/latest/spark-sql/language-manual/information-schema/columns.html
143- # So, to obtain information about schema, we should use another approach.
144-
145142 conn = self .create_connection ()
143+ table_schema = {}
146144
147- catalog , schema , table = self ._normalize_table_path (path )
148- with conn .cursor () as cursor :
149- cursor .columns (catalog_name = catalog , schema_name = schema , table_name = table )
150- try :
151- rows = cursor .fetchall ()
152- finally :
153- conn .close ()
154- if not rows :
155- raise RuntimeError (f"{ self .name } : Table '{ '.' .join (path )} ' does not exist, or has no columns" )
156-
157- d = {r .COLUMN_NAME : (r .COLUMN_NAME , r .TYPE_NAME , r .DECIMAL_DIGITS , None , None ) for r in rows }
158- assert len (d ) == len (rows )
159- return d
145+ try :
146+ table_schema = super ().query_table_schema (path )
147+ except :
148+ logging .warning ("Failed to get schema from information_schema, falling back to legacy approach." )
149+
150+ if not table_schema :
151+ # This legacy approach can cause bugs. e.g. VARCHAR(255) -> VARCHAR(255)
152+ # and not the expected VARCHAR
153+
154+ # I don't think we'll fall back to this approach, but if so, see above
155+ catalog , schema , table = self ._normalize_table_path (path )
156+ with conn .cursor () as cursor :
157+ cursor .columns (catalog_name = catalog , schema_name = schema , table_name = table )
158+ try :
159+ rows = cursor .fetchall ()
160+ finally :
161+ conn .close ()
162+ if not rows :
163+ raise RuntimeError (f"{ self .name } : Table '{ '.' .join (path )} ' does not exist, or has no columns" )
164+
165+ table_schema = {r .COLUMN_NAME : (r .COLUMN_NAME , r .TYPE_NAME , r .DECIMAL_DIGITS , None , None ) for r in rows }
166+ assert len (table_schema ) == len (rows )
167+ return table_schema
168+ else :
169+ return table_schema
170+
171+ def select_table_schema (self , path : DbPath ) -> str :
172+ """Provide SQL for selecting the table schema as (name, type, date_prec, num_prec)"""
173+ database , schema , name = self ._normalize_table_path (path )
174+ info_schema_path = ["information_schema" , "columns" ]
175+ if database :
176+ info_schema_path .insert (0 , database )
177+
178+ return (
179+ "SELECT column_name, data_type, datetime_precision, numeric_precision, numeric_scale "
180+ f"FROM { '.' .join (info_schema_path )} "
181+ f"WHERE table_name = '{ name } ' AND table_schema = '{ schema } '"
182+ )
160183
161184 def _process_table_schema (
162185 self , path : DbPath , raw_schema : Dict [str , tuple ], filter_columns : Sequence [str ], where : str = None
0 commit comments