1313
1414try :
1515 import sqlalchemy
16- except ImportError :
16+ except ImportError : # pragma: no cover
1717 sqlalchemy = None
1818
1919from dask_sql .input_utils .base import BaseInputPlugin
@@ -35,9 +35,7 @@ def is_correct_input(
3535
3636 return is_sqlalchemy_hive or is_hive_cursor or format == "hive"
3737
38- def to_dc (
39- self , input_item : Any , table_name : str , format : str = None , ** kwargs
40- ): # pragma: no cover
38+ def to_dc (self , input_item : Any , table_name : str , format : str = None , ** kwargs ):
4139 table_name = kwargs .pop ("hive_table_name" , table_name )
4240 schema = kwargs .pop ("hive_schema_name" , "default" )
4341
@@ -65,14 +63,16 @@ def to_dc(
6563 if "InputFormat" in storage_information :
6664 format = storage_information ["InputFormat" ].split ("." )[- 1 ]
6765 # databricks format is different, see https://github.com/dask-contrib/dask-sql/issues/83
68- elif "InputFormat" in table_information :
66+ elif "InputFormat" in table_information : # pragma: no cover
6967 format = table_information ["InputFormat" ].split ("." )[- 1 ]
70- else :
68+ else : # pragma: no cover
7169 raise RuntimeError (
7270 "Do not understand the output of 'DESCRIBE FORMATTED <table>'"
7371 )
7472
75- if format == "TextInputFormat" or format == "SequenceFileInputFormat" :
73+ if (
74+ format == "TextInputFormat" or format == "SequenceFileInputFormat"
75+ ): # pragma: no cover
7676 storage_description = storage_information .get ("Storage Desc Params" , {})
7777 read_function = partial (
7878 dd .read_csv ,
@@ -81,15 +81,17 @@ def to_dc(
8181 )
8282 elif format == "ParquetInputFormat" or format == "MapredParquetInputFormat" :
8383 read_function = dd .read_parquet
84- elif format == "OrcInputFormat" :
84+ elif format == "OrcInputFormat" : # pragma: no cover
8585 read_function = dd .read_orc
86- elif format == "JsonInputFormat" :
86+ elif format == "JsonInputFormat" : # pragma: no cover
8787 read_function = dd .read_json
88- else :
88+ else : # pragma: no cover
8989 raise AttributeError (f"Do not understand hive's table format { format } " )
9090
9191 def _normalize (loc ):
92- if loc .startswith ("dbfs:/" ) and not loc .startswith ("dbfs://" ):
92+ if loc .startswith ("dbfs:/" ) and not loc .startswith (
93+ "dbfs://"
94+ ): # pragma: no cover
9395 # dask (or better: fsspec) needs to have the URL in a specific form
9496 # starting with two // after the protocol
9597 loc = f"dbfs://{ loc .lstrip ('dbfs:' )} "
@@ -102,6 +104,19 @@ def _normalize(loc):
102104 def wrapped_read_function (location , column_information , ** kwargs ):
103105 location = _normalize (location )
104106 logger .debug (f"Reading in hive data from { location } " )
107+ if format == "ParquetInputFormat" or format == "MapredParquetInputFormat" :
108+ # Hack needed for parquet files.
109+ # If the folder structure is like .../col=3/...
110+ # parquet wants to read in the partition information.
111+ # However, we add the partition information by ourself
112+ # which will lead to problems afterwards
113+ # Therefore tell parquet to only read in the columns
114+ # we actually care right now
115+ kwargs .setdefault ("columns" , list (column_information .keys ()))
116+ else : # pragma: no cover
117+ # prevent python to optimize it away and make coverage not respect the
118+ # pragma
119+ dummy = 0
105120 df = read_function (location , ** kwargs )
106121
107122 logger .debug (f"Applying column information: { column_information } " )
@@ -165,7 +180,7 @@ def _parse_hive_table_description(
165180 schema : str ,
166181 table_name : str ,
167182 partition : str = None ,
168- ): # pragma: no cover
183+ ):
169184 """
170185 Extract all information from the output
171186 of the DESCRIBE FORMATTED call, which is unfortunately
@@ -207,7 +222,7 @@ def _parse_hive_table_description(
207222 elif key == "# Partition Information" :
208223 mode = "partition"
209224 elif key .startswith ("#" ):
210- mode = None
225+ mode = None # pragma: no cover
211226 elif key :
212227 if not value :
213228 value = dict ()
@@ -223,6 +238,10 @@ def _parse_hive_table_description(
223238 elif mode == "partition" :
224239 partition_information [key ] = value
225240 last_field = partition_information [key ]
241+ else : # pragma: no cover
242+ # prevent python to optimize it away and make coverage not respect the
243+ # pragma
244+ dummy = 0
226245 elif value and last_field is not None :
227246 last_field [value ] = value2
228247
@@ -238,7 +257,7 @@ def _parse_hive_partition_description(
238257 cursor : Union ["sqlalchemy.engine.base.Connection" , "hive.Cursor" ],
239258 schema : str ,
240259 table_name : str ,
241- ): # pragma: no cover
260+ ):
242261 """
243262 Extract all partition informaton for a given table
244263 """
@@ -251,7 +270,7 @@ def _fetch_all_results(
251270 self ,
252271 cursor : Union ["sqlalchemy.engine.base.Connection" , "hive.Cursor" ],
253272 sql : str ,
254- ): # pragma: no cover
273+ ):
255274 """
256275 The pyhive.Cursor and the sqlalchemy connection behave slightly different.
257276 The former has the fetchall method on the cursor,
@@ -261,5 +280,5 @@ def _fetch_all_results(
261280
262281 try :
263282 return result .fetchall ()
264- except AttributeError :
283+ except AttributeError : # pragma: no cover
265284 return cursor .fetchall ()
0 commit comments