Skip to content

Conversation

@jcrist
Copy link
Member

@jcrist jcrist commented Nov 7, 2017

This started off as a PR to support writing parquet using arrow, but kind of spiraled out in scope as I found bugs in the original implementations. A summary of changes here:

  • Support engine kwarg in to_parquet
  • Add support for using pyarrow in to_parquet
  • Switch 'arrow' to 'pyarrow' for engine kwarg, deprecating old 'arrow' name. This matches the pandas interface.
  • Refactor how engines were handled, removing top-level imports in dask.dataframe.io.parquet
  • Fix several bugs in both readers and writers, mostly having to do with how indices were handled
  • Expand testing for pyarrow and fastparquet compatibility
  • Allow running the test suite with neither, one, or both of pyarrow and fastparquet installed

Fixes #2783.
Fixes #2440.

@jcrist
Copy link
Member Author

jcrist commented Nov 7, 2017

A few issues I ran into. None of these are blockers, and I consider the current state "good enough", but these are things that would be good to fix at some point:


  1. pandas metadata location

In pyarrow, the pandas metadata is written to each partition file, while in fastparquet the pandas metadata is written to only the summary files (_metadata). This difference made reading files written by one backend with the other backend tricky, since pyarrow will automatically set the index on files it wrote (since the pandas metadata is present), but won't on files that fastparquet wrote.


  1. _metadata vs _common_metadata

Currently there is no way to write the _metadata file properly in pyarrow. From what I understand, _common_metadata should contain the schema, while _metadata should contain the schema and references to where all the row groups are. This creates a few complications:

  • fastparquet requires the row-group references in _metadata file to read datasets. You can read the dataset in the folder if a glob is passed in (`dd.read_parquet('test.parquet/*').
  • pyarrow ignores row group references in the _metadata file completely, and instead relies on a glob. It seems to also ignore the _common_metadata file, and use the _metadata file for its purpose instead.

Currently I'm xfailing all tests where pyarrow writes and fastparquet reads for this reason. Once pyarrow can write the _metadata file, or fastparquet can read without it then we can unxfail these tests.


  1. Pandas metadata for unnamed indices

Both implementations seem to handle unnamed indices differently in the pandas metadata. fastparquet uses the name 'index', while pyarrow uses pandas derived name '__index_level_0__'. This difference makes universally handling both versions tricky.

Ping @martindurant, @wesm for comment.

@jcrist
Copy link
Member Author

jcrist commented Nov 7, 2017

Also ping @brendancol, since you needed this at some point.


try:
import pyarrow.parquet as pyarrow # noqa
import pyarrow.parquet as pa_parquet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We normally use import pyarrow.parquet as pq

engine : {'auto', 'fastparquet', 'pyarrow'}, default 'auto'
Parquet library to use. If only one library is installed, it will use
that one; if both, it will use 'fastparquet'
compression : string or dict
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not changed in this PR but out of curiosity: Should compression=None mean that the engine's default compression should be applied or no compression at all? (The default behaviour in the Parquet world is normally to have snappy enabled).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd expect None to mean "no compression", as forwarding the compression overrides the default setting.

Perhaps we should change our default? If it's interpretted as "use the default setting for the backend", the default should maybe be compression='default' or something similar. I also wouldn't be opposed to changing it to snappy for fastparquet as well (thus matching pandas behavior), but @martindurant probably has more thoughts.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am totally fine, in theory, to have snappy be the default compression for fastparquet. The reason that it isn't is that snappy itself is only an optional dependency, one that might be tricky to build on some systems. I imagine that this location is inconvenient for checking of the importability of snappy.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In dask.distributed we perform a little dance to determine default compression in a way that is robust to missing imports. https://github.com/dask/distributed/blob/master/distributed/protocol/compression.py

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps for now switch the dask kwarg default to 'default', which means the default compression for whatever backend is selected, and handle unification of this at a later point?

Construct directory-based partitioning by splitting on these fields'
values. Each dask partition will result in one or more datafiles,
there will be no global groupby.
times: 'int64' (default), or 'int96':
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pyarrow also supports this mode, this can be toggled with use_deprecated_int96_timestamps on pq.write_tablee.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As is, this matches the pandas behavior, just forwarding the extra kwargs to the backend (you can pass times to fastparquet and use_deprecated_int96_timestamps to pyarrow).

Personally I'd like to unite on one of these names and deprecate the other (I prefer the long use_deprecated_int96_timestamps). I'd rather not implicitly unite on them though until pandas settles on the same behavior (we like to match pandas as much as possible).

@xhochy
Copy link
Contributor

xhochy commented Nov 7, 2017

Regarding the more lengthy issues:

  1. I would like to see fastparquet also write the Pandas metadata to all files. From a distributed execution perspective, you will always load the whole dataset and then work on that. In contrast for local development or just for debugging purposes, you may also want to simply load them in Pandas and analyze a single file there.

  2. I don't completely understand what fastparquet wants to see in the _metadata file. Should it contain all RowGroups of the dataset in the Thrift header of the file and have the filename of https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift#L666 set to the correct location? I can have a look to implement this in parquet-cpp/pyarrow once I know the problem.

@martindurant
Copy link
Member

On 1), I regard it as a big that dask doesn't write the pandas metadata to every file when using fastparquet. This was already fixed when fastparquet writes multi-file data-sets directly, and should be a simple enough fix.

On 2), I still think it's reasonable, when passed a path which might be a directory or a file, to try to find a _metadata. Globbing for a set of filenames isn't unreasonable either, and I suppose we assume they are called .parq or .parquet; but in that case we do have to touch every one of them up front to check they are really parquet, and to grab the metadata from each. I can't think of a simple way to allow both metadata files and auto-globbing side-by-side.

On 3), I believe I was acting on advice for this choice, but I may have misunderstood. In fact, the index should probably have None name at all. Also not implemented on the fastparquet side, is the name of the columns name series (df.columns.name).

@jcrist
Copy link
Member Author

jcrist commented Nov 9, 2017

Edit, accidentally hit the "close request" button :(

I regard it as a big that dask doesn't write the pandas metadata to every file when using fastparquet.

Would this be a fix in fastparquet or how dask uses it?

I can't think of a simple way to allow both metadata files and auto-globbing side-by-side.

I think we should maybe support both, but I also think that the metadata file is nice since it contains information about the whole dataset in one place instead of several. @xhochy had a question above about what fastparquet expects in the metadata file, can you answer that?

I believe I was acting on advice for this choice, but I may have misunderstood.

We should probably standardize on how non-named indices are handled between the two libraries. I'll open an issue in fastparquet. This doesn't need to be dealt with here.

@jcrist jcrist closed this Nov 9, 2017
@jcrist jcrist reopened this Nov 9, 2017
@martindurant
Copy link
Member

martindurant commented Nov 9, 2017

Would this be a fix in fastparquet or how dask uses it?

I assume the latter. fastparquet.writer.make_part_file takes a fmd= parameter, I think supplying that will be enough. Better check which version of fastparquet that requires.

@jcrist
Copy link
Member Author

jcrist commented Nov 9, 2017

Yeah, that fixed it, thanks. Was added several releases ago, so should be fine to depend on.

elif index is None:
if has_pandas_metadata:
pandas_metadata = json.loads(schema.metadata[b'pandas'].decode('utf8'))
index_cols = pandas_metadata.get('index_columns', [])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As of this PR you will need to reconstruct indexes using pandas_metadata['columns'] to get the actual name of the index ([c['name'] for c in pandas_metadata['columns']]) along with pandas_metadata['index_columns'] which will contain the names of the columns are they are in the pyarrow.Table object, which will always match this pattern: __index_level_\d+__.

This was done to support the case of an index level name that's the same as a column name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a corresponding PR into the pandas developer docs about this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. Incoming.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new metadata seems like an improvement in robustness, but from the code it looks like pyarrow can no longer read metadata written using the previous metadata? How do you suggest we handle this transition? We can probably write code that handles both versions, but it's mildly annoying.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to handle both, at least for 1 release cycle. Don't we have integration tests in pandas that can help with this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to handle both, at least for 1 release cycle.

Agreed. Was that directed at me or at Phil? From at least reading the arrow code it looks like if I install arrow 0.8.0 (or whatever the next release is) it won't be able to read metadata written with 0.7.1, which makes us supporting both a bit tricky.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I think I can do this in a backward compatible way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me see if that's possible, and I'll report back.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arrow 0.8.0 isn't released yet, so this can still be sorted out in the next week or so. We can use the pandas integration tests to help validate that all is still good

__all__ = ('read_parquet', 'to_parquet')


def _meta_from_dtypes(to_read_columns, file_columns, file_dtypes, index_cols):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What kind of support is there in fastparquet for the following pandas use cases?

  1. MultiIndex columns and indexes
  2. Columns whose name is the same as an index level
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martindurant , do you know the answer to this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. No support for multi-index. It would not be too hard to add.
  2. I don't know, I don't imagine it's a problem, but there is no test.
jcrist added 13 commits November 9, 2017 17:28
- No top-level imports for parquet engines - Remove dispatching in read/write methods, move to `get_engine` function.
Minimize exported names.
- Add tests for reading and writing from both engines - Cleanup bugs in the implementations of both engines that arose during testing.
Can have neither, one, or both installed.
- Change default of `compression` in `to_parquet` to `'default'`, meaning the use the default compression for the backend engine. - Cleanup docstrings - Fix threadsafe bug in fastparquet where metadata was being mutated in a non-deterministic manner. This should be upstreamed, but fix here for now. - Fix part of a metadata bug in fastparquet where the total number of rows isn't consistent when writing a file.
@jcrist
Copy link
Member Author

jcrist commented Nov 10, 2017

A summary of the current state here:

  • Datasets written by fastparquet can be read by both engines
  • Datasets written by pyarrow can be read by pyarrow, and by fastparquet if a glob is used (_metadata file issue mentioned above).
  • Test suite is expanded to test all combinations of reader/writer everywhere it makes sense to.

Issues mentioned above that have not been addressed:

  • New pandas metadata change for upcoming release of arrow. I'd prefer to handle this in a separate PR.
  • Handling of the central _metadata file. This sounds like it'd require work in pyarrow (ping @martindurant again to tell @xhochy what's expected here). Again, I'd prefer to handle this in a separate PR.

As is, I think this is good to go.

@cpcloud
Copy link
Contributor

cpcloud commented Nov 11, 2017

@jcrist Update from the pyarrow side of things: we just merged a PR (apache/arrow#1298) to ensure backwards compatible reads. What else is needed from the pyarrow side?

@jcrist
Copy link
Member Author

jcrist commented Nov 13, 2017

Thanks Phil. Besides handling the missing information in the _metadata files (which should be done in a future PR, and requires @martindurant to clarify what's expected here) I think pyarrow work should be done. I plan to submit a subsequent PR to handle the updated pandas metadata once this PR is merged.

As is, I think this PR is good to go. It fixes a number of bugs in both readers, and adds support for writing using pyarrow. All other issues should be handled in subsequent PRs.

Barring further comments, I plan to merge this later today.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

7 participants