-
- Notifications
You must be signed in to change notification settings - Fork 1.8k
Support arrow in to_parquet. Several other parquet cleanups. #2868
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| A few issues I ran into. None of these are blockers, and I consider the current state "good enough", but these are things that would be good to fix at some point:
In
Currently there is no way to write the
Currently I'm xfailing all tests where pyarrow writes and fastparquet reads for this reason. Once pyarrow can write the
Both implementations seem to handle unnamed indices differently in the pandas metadata. Ping @martindurant, @wesm for comment. |
| Also ping @brendancol, since you needed this at some point. |
| | ||
| try: | ||
| import pyarrow.parquet as pyarrow # noqa | ||
| import pyarrow.parquet as pa_parquet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We normally use import pyarrow.parquet as pq
dask/dataframe/io/parquet.py Outdated
| engine : {'auto', 'fastparquet', 'pyarrow'}, default 'auto' | ||
| Parquet library to use. If only one library is installed, it will use | ||
| that one; if both, it will use 'fastparquet' | ||
| compression : string or dict |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not changed in this PR but out of curiosity: Should compression=None mean that the engine's default compression should be applied or no compression at all? (The default behaviour in the Parquet world is normally to have snappy enabled).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd expect None to mean "no compression", as forwarding the compression overrides the default setting.
Perhaps we should change our default? If it's interpretted as "use the default setting for the backend", the default should maybe be compression='default' or something similar. I also wouldn't be opposed to changing it to snappy for fastparquet as well (thus matching pandas behavior), but @martindurant probably has more thoughts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am totally fine, in theory, to have snappy be the default compression for fastparquet. The reason that it isn't is that snappy itself is only an optional dependency, one that might be tricky to build on some systems. I imagine that this location is inconvenient for checking of the importability of snappy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In dask.distributed we perform a little dance to determine default compression in a way that is robust to missing imports. https://github.com/dask/distributed/blob/master/distributed/protocol/compression.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps for now switch the dask kwarg default to 'default', which means the default compression for whatever backend is selected, and handle unification of this at a later point?
| Construct directory-based partitioning by splitting on these fields' | ||
| values. Each dask partition will result in one or more datafiles, | ||
| there will be no global groupby. | ||
| times: 'int64' (default), or 'int96': |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pyarrow also supports this mode, this can be toggled with use_deprecated_int96_timestamps on pq.write_tablee.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As is, this matches the pandas behavior, just forwarding the extra kwargs to the backend (you can pass times to fastparquet and use_deprecated_int96_timestamps to pyarrow).
Personally I'd like to unite on one of these names and deprecate the other (I prefer the long use_deprecated_int96_timestamps). I'd rather not implicitly unite on them though until pandas settles on the same behavior (we like to match pandas as much as possible).
| Regarding the more lengthy issues:
|
| On 1), I regard it as a big that dask doesn't write the pandas metadata to every file when using fastparquet. This was already fixed when fastparquet writes multi-file data-sets directly, and should be a simple enough fix. On 2), I still think it's reasonable, when passed a path which might be a directory or a file, to try to find a On 3), I believe I was acting on advice for this choice, but I may have misunderstood. In fact, the index should probably have None name at all. Also not implemented on the fastparquet side, is the name of the columns name series (df.columns.name). |
| Edit, accidentally hit the "close request" button :(
Would this be a fix in fastparquet or how dask uses it?
I think we should maybe support both, but I also think that the metadata file is nice since it contains information about the whole dataset in one place instead of several. @xhochy had a question above about what fastparquet expects in the metadata file, can you answer that?
We should probably standardize on how non-named indices are handled between the two libraries. I'll open an issue in fastparquet. This doesn't need to be dealt with here. |
I assume the latter. |
| Yeah, that fixed it, thanks. Was added several releases ago, so should be fine to depend on. |
| elif index is None: | ||
| if has_pandas_metadata: | ||
| pandas_metadata = json.loads(schema.metadata[b'pandas'].decode('utf8')) | ||
| index_cols = pandas_metadata.get('index_columns', []) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As of this PR you will need to reconstruct indexes using pandas_metadata['columns'] to get the actual name of the index ([c['name'] for c in pandas_metadata['columns']]) along with pandas_metadata['index_columns'] which will contain the names of the columns are they are in the pyarrow.Table object, which will always match this pattern: __index_level_\d+__.
This was done to support the case of an index level name that's the same as a column name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need a corresponding PR into the pandas developer docs about this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed. Incoming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR here: pandas-dev/pandas#18201
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new metadata seems like an improvement in robustness, but from the code it looks like pyarrow can no longer read metadata written using the previous metadata? How do you suggest we handle this transition? We can probably write code that handles both versions, but it's mildly annoying.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to handle both, at least for 1 release cycle. Don't we have integration tests in pandas that can help with this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to handle both, at least for 1 release cycle.
Agreed. Was that directed at me or at Phil? From at least reading the arrow code it looks like if I install arrow 0.8.0 (or whatever the next release is) it won't be able to read metadata written with 0.7.1, which makes us supporting both a bit tricky.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I think I can do this in a backward compatible way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me see if that's possible, and I'll report back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Arrow 0.8.0 isn't released yet, so this can still be sorted out in the next week or so. We can use the pandas integration tests to help validate that all is still good
| __all__ = ('read_parquet', 'to_parquet') | ||
| | ||
| | ||
| def _meta_from_dtypes(to_read_columns, file_columns, file_dtypes, index_cols): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What kind of support is there in fastparquet for the following pandas use cases?
- MultiIndex columns and indexes
- Columns whose name is the same as an index level
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@martindurant , do you know the answer to this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- No support for multi-index. It would not be too hard to add.
- I don't know, I don't imagine it's a problem, but there is no test.
- No top-level imports for parquet engines - Remove dispatching in read/write methods, move to `get_engine` function.
Minimize exported names.
Can have neither, one, or both installed.
cdec2c1 to ea555e5 Compare - Change default of `compression` in `to_parquet` to `'default'`, meaning the use the default compression for the backend engine. - Cleanup docstrings - Fix threadsafe bug in fastparquet where metadata was being mutated in a non-deterministic manner. This should be upstreamed, but fix here for now. - Fix part of a metadata bug in fastparquet where the total number of rows isn't consistent when writing a file.
| A summary of the current state here:
Issues mentioned above that have not been addressed:
As is, I think this is good to go. |
| @jcrist Update from the pyarrow side of things: we just merged a PR (apache/arrow#1298) to ensure backwards compatible reads. What else is needed from the pyarrow side? |
| Thanks Phil. Besides handling the missing information in the As is, I think this PR is good to go. It fixes a number of bugs in both readers, and adds support for writing using pyarrow. All other issues should be handled in subsequent PRs. Barring further comments, I plan to merge this later today. |
This started off as a PR to support writing parquet using arrow, but kind of spiraled out in scope as I found bugs in the original implementations. A summary of changes here:
enginekwarg into_parquetpyarrowinto_parquet'arrow'to'pyarrow'for engine kwarg, deprecating old'arrow'name. This matches the pandas interface.dask.dataframe.io.parquetpyarrowandfastparquetcompatibilitypyarrowandfastparquetinstalledFixes #2783.
Fixes #2440.