Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEEDBACK] User experience with arrow strings #10139

Open
j-bennet opened this issue Apr 4, 2023 · 13 comments
Open

[FEEDBACK] User experience with arrow strings #10139

j-bennet opened this issue Apr 4, 2023 · 13 comments
Labels
community convert-string discussion Discussing a topic with no specific actions yet needs attention It's been a while since this was pushed on. Needs attention from the owner or a maintainer.

Comments

@j-bennet
Copy link
Contributor

j-bennet commented Apr 4, 2023

This is an issue where you can report your experience with arrow strings.

Please let us know what works well, but also what doesn't. We'll help as best we can.

@github-actions github-actions bot added the needs triage Needs a response from a contributor label Apr 4, 2023
@phofl
Copy link
Collaborator

phofl commented Apr 4, 2023

Should we pin the issue? Makes it more visible as it gets pushed down.

@j-bennet j-bennet added community discussion Discussing a topic with no specific actions yet and removed needs triage Needs a response from a contributor labels Apr 4, 2023
@ncclementi ncclementi pinned this issue Apr 4, 2023
@github-actions github-actions bot added the needs attention It's been a while since this was pushed on. Needs attention from the owner or a maintainer. label May 8, 2023
@fjetter fjetter changed the title User experience with arrow strings [FEEDBACK] User experience with arrow strings Jun 14, 2023
@fjetter fjetter removed the needs attention It's been a while since this was pushed on. Needs attention from the owner or a maintainer. label Jun 14, 2023
@aiudirog
Copy link

Please let me know if I should open a separate ticket for this, but it appears the conversion to PyArrow strings is a bit too greedy and is picking up other objects as well:

In [10]: import dask.dataframe as ddf

In [11]: import pandas as pd

In [12]: df = pd.DataFrame({'lists': pd.Series([['a', 'b'], ['c', 'd']])})

In [13]: df
Out[13]: 
    lists
0  [a, b]
1  [c, d]

In [14]: df.dtypes
Out[14]: 
lists    object
dtype: object

In [15]: ddf.from_pandas(df, npartitions=1).compute()
Out[15]: 
        lists
0  ['a', 'b']
1  ['c', 'd']

In [16]: ddf.from_pandas(df, npartitions=1).compute().dtypes
Out[16]: 
lists    string[pyarrow]
dtype: object

While in most cases users should probably be avoiding the object type when possible, I personally feel it's a regression to assume all objects are strings.

@j-bennet
Copy link
Contributor Author

@aiudirog Thank you for your feedback, yes, this is a known problem. Since Dask (unlike pandas) is lazy, it doesn't load and parse your data until it's time to read it. So it has no way to determine if columns marked with object dtype contain strings, or something else. There aren't very many use cases for people to store complex data in a column, so Dask assumes they are strings.

For people that do store complex data in a column, it would be advisable to set dataframe.convert-string setting to false, and for columns that contain strings, explicitly provide string[pyarrow] dtype when reading data.

I hope this helps.

@GenevieveBuckley
Copy link
Contributor

GenevieveBuckley commented Aug 4, 2023

We ran into a small problem with the new dask arrow strings.
I say "we" but really I mean @m-albert and @jakirkham - I didn't have anything to do with this, but I'm copying over the relevant issue here for visibility.

dask/dask-image#335

(Edited by @m-albert)

In the presence of pyarrow, dask by default assumes dataframes of type object to be pyarrow strings (see dask/dask#10139 (comment)).

This creates problems revealed by failing tests (e.g. test_dask_image/test_ndmeasure/test_find_objects.py::test_3d_find_objects)

https://github.com/dask/dask-image/blob/67540af25597f84e4a642d644ba30dce7aebe753/dask_image/ndmeasure/_utils/_find_objects.py#L68-L70

dd.from_delayed(df1, meta=meta).compute().dtypes

Working install:

0 object
1 object
2 object
dtype: object

Failing install:

0 string[pyarrow]
1 string[pyarrow]
2 string[pyarrow]
dtype: object

The failing test had come up when releasing v2023.08.0 in conda-forge/dask-image-feedstock#14.

@jakirkham found that pyarrow is installed with the conda distribution of dask, but not when installing over pip, where it just part of the [complete] target.

Also @jakirkham found that the above described conflicting behaviour can be turned off using the dask configuration.

He did this for the tests performed by the dask-image conda feedstock on v2028.08.0.

@jakirkham
Copy link
Member

We also saw some kind of issue in Dask-SQL related to this change. It popped up when we were trying to wrap up a few fixes and make a release. Unfortunately don't have more details since we haven't had any time to investigate yet. So we disabled it for now ( dask-contrib/dask-sql#1206 ). Just wanted to add that data point here as well

@phofl
Copy link
Collaborator

phofl commented Aug 4, 2023

@GenevieveBuckley I don't think that I am following completely. The option is only pulled if the correct Arrow version is installed, so different forms of behaviour are expected. Is there anything else that is not working as expected that we should be aware of?

@m-albert
Copy link

m-albert commented Aug 4, 2023

@phofl Trying to clarify: I see how different behaviors are expected depending on whether the correct arrow version is installed. Our problem was exactly about that, namely that pyarrow>7 is not a regular dependency of the dask distribution on pip, but it is on conda. CI testing was performed on pip installed packages, and errors only showed up when testing a conda installation during the release process.

So in a way the problem we had was not related to the direct use of arrow strings, but the current mismatch between the pip and conda dask distributions.

@phofl
Copy link
Collaborator

phofl commented Aug 4, 2023

Ah got it. @jrbourbeau can provide more context on the packaging requirements

@jakirkham
Copy link
Member

jakirkham commented Aug 5, 2023

The dask conda-forge package requires pyarrow 7 or later, which means it will pick up the latest pyarrow in conda-forge (currently pyarrow 12.0.1). How would this affect the behavior we are seeing?

Edit: Also noting that pip install dask[complete] likely would have similar behavior (as this would install the latest pyarrow wheel, which is currently 12.0.1) as pyarrow 7 or later is required by dask[complete]

@zmbc
Copy link

zmbc commented Nov 15, 2023

string[pyarrow] imposes a 2GB limit, which I have run into when manipulating very large dataframes. Is this expected and if not, is it basically an upstream issue Dask has no control over? I know PyArrow has a large_string type but it doesn't seem that Pandas supports it.

Edit: if useful, here is an example stacktrace:

File .../python3.10/site-packages/dask/dataframe/multi.py:289, in merge_chunk()
    286             else:
    287                 rhs = rhs.assign(**{col: right.astype(dtype)})
--> 289 out = lhs.merge(rhs, *args, **kwargs)
    291 # Workaround for pandas bug where if the left frame of a merge operation is
    292 # empty, the resulting dataframe can have columns in the wrong order.
    293 # https://github.com/pandas-dev/pandas/issues/9937
    294 if len(lhs) == 0:

File .../python3.10/site-packages/pyarrow/table.pxi:1043, in pyarrow.lib.ChunkedArray.take()
   1041     ]
   1042     """
-> 1043     return _pc().take(self, indices)
   1044 
   1045 def drop_null(self):

File .../python3.10/site-packages/pyarrow/compute.py:486, in take()
    446 """
    447 Select values (or records) from array- or table-like data given integer
    448 selection indices.
   (...)
    483 ]
    484 """
    485 options = TakeOptions(boundscheck=boundscheck)
--> 486 return call_function('take', [data, indices], options, memory_pool)

File .../python3.10/site-packages/pyarrow/_compute.pyx:590, in pyarrow._compute.call_function()
    588 """
    589 func = _global_func_registry.get_function(name)
--> 590 return func.call(args, options=options, memory_pool=memory_pool,
    591                  length=length)
    592 

File .../python3.10/site-packages/pyarrow/_compute.pyx:385, in pyarrow._compute.Function.call()
    383 else:
    384     with nogil:
--> 385         result = GetResultValue(
    386             self.base_func.Execute(c_batch.values, c_options,
    387                                    &c_exec_ctx)

File .../python3.10/site-packages/pyarrow/error.pxi:154, in pyarrow.lib.pyarrow_internal_check_status()
    152 cdef api int pyarrow_internal_check_status(const CStatus& status) \
    153         except -1 nogil:
--> 154     return check_status(status)
    155 
    156 cdef api object pyarrow_internal_convert_status(const CStatus& status):

File .../python3.10/site-packages/pyarrow/error.pxi:91, in pyarrow.lib.check_status()
     89     return -1
     90 
---> 91 raise convert_status(status)
     92 
     93 

ArrowInvalid: offset overflow while concatenating arrays

@phofl
Copy link
Collaborator

phofl commented Nov 15, 2023

I know PyArrow has a large_string type but it doesn't seem that Pandas supports it.

pandas does support it, you can do:

import pyarrow as pa
import pandas as pd

dtype = pd.ArrowDtype(pa.large_string())

@zmbc
Copy link

zmbc commented Nov 15, 2023

@phofl Good point! I thought it wouldn't be possible for Pandas to load a Parquet file with that dtype, but I just tested it, and it works.

Note: The intended limitation of string[pyarrow] is that it can only hold 2GB per Arrow chunk. Generally Arrow chunks should be invisible to the user (Dask, in this case) unless they are using low-level Arrow APIs. However, the issue I linked above (and this related one) are bugs that cause overflows to happen in practice.

I tried converting to large_string[pyarrow] using a map_partitions within Dask, and it worked but I got an error during later processing -- I don't have the error message handy, but could retrieve it. But again, I'm not sure if this is surprising since Dask hasn't been designed to work with this type.

@zmbc
Copy link

zmbc commented Nov 15, 2023

I tried converting to large_string[pyarrow] using a map_partitions within Dask, and it worked but I got an error during later processing

Looked into this a bit more, and it definitely seems there is a bug somewhere -- either in Dask or in PyArrow. The error I got in my real application was ValueError: ArrowStringArray requires a PyArrow (chunked) array of string type with this stacktrace:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
File .../lib/python3.10/site-packages/distributed/shuffle/_core.py:405, in handle_unpack_errors()
    404 try:
--> 405     yield
    406 except Reschedule as e:

File .../lib/python3.10/site-packages/distributed/shuffle/_shuffle.py:87, in shuffle_unpack()
     86 with handle_unpack_errors(id):
---> 87     return get_worker_plugin().get_output_partition(
     88         id, barrier_run_id, output_partition
     89     )

File .../lib/python3.10/site-packages/distributed/shuffle/_worker_plugin.py:431, in get_output_partition()
    430 key = thread_state.key
--> 431 return sync(
    432     self.worker.loop,
    433     shuffle_run.get_output_partition,
    434     partition_id=partition_id,
    435     key=key,
    436     meta=meta,
    437 )

File .../lib/python3.10/site-packages/distributed/utils.py:434, in sync()
    433 if error is not None:
--> 434     raise error
    435 else:

File .../lib/python3.10/site-packages/distributed/utils.py:408, in f()
    407     future = asyncio.ensure_future(awaitable)
--> 408     result = yield future
    409 except Exception as exception:

File .../lib/python3.10/site-packages/tornado/gen.py:767, in run()
    766 try:
--> 767     value = future.result()
    768 except Exception as e:
    769     # Save the exception for later. It's important that
    770     # gen.throw() not be called inside this try/except block
    771     # because that makes sys.exc_info behave unexpectedly.

File .../lib/python3.10/site-packages/distributed/shuffle/_core.py:276, in get_output_partition()
    275 await self.flush_receive()
--> 276 return await self._get_output_partition(partition_id, key, **kwargs)

File .../lib/python3.10/site-packages/distributed/shuffle/_shuffle.py:519, in _get_output_partition()
    517         return convert_shards(data, meta)
--> 519     out = await self.offload(_, partition_id, self.meta)
    520 except KeyError:

File .../lib/python3.10/site-packages/distributed/shuffle/_core.py:163, in offload()
    162 with self.time("cpu"):
--> 163     return await asyncio.get_running_loop().run_in_executor(
    164         self.executor, partial(func, *args, **kwargs)
    165     )

File .../lib/python3.10/concurrent/futures/thread.py:58, in run()
     57 try:
---> 58     result = self.fn(*self.args, **self.kwargs)
     59 except BaseException as exc:

File .../lib/python3.10/site-packages/distributed/shuffle/_shuffle.py:517, in _()
    516 data = self._read_from_disk((partition_id,))
--> 517 return convert_shards(data, meta)

File .../lib/python3.10/site-packages/distributed/shuffle/_arrow.py:56, in convert_shards()
     54 table = pa.concat_tables(shards)
---> 56 df = from_pyarrow_table_dispatch(meta, table, self_destruct=True)
     57 return df.astype(meta.dtypes, copy=False)

File .../lib/python3.10/site-packages/dask/utils.py:642, in __call__()
    641 meth = self.dispatch(type(arg))
--> 642 return meth(arg, *args, **kwargs)

File .../lib/python3.10/site-packages/dask/dataframe/backends.py:243, in get_pandas_dataframe_from_pyarrow()
    242 types_mapper = kwargs.pop("types_mapper", default_types_mapper)
--> 243 return table.to_pandas(types_mapper=types_mapper, **kwargs)

File .../lib/python3.10/site-packages/pyarrow/array.pxi:884, in pyarrow.lib._PandasConvertible.to_pandas()
    883 )
--> 884 return self._to_pandas(options, categories=categories,
    885                        ignore_metadata=ignore_metadata,

File .../lib/python3.10/site-packages/pyarrow/table.pxi:4192, in pyarrow.lib.Table._to_pandas()
   4191 from pyarrow.pandas_compat import table_to_blockmanager
-> 4192 mgr = table_to_blockmanager(
   4193     options, self, categories,

File .../lib/python3.10/site-packages/pyarrow/pandas_compat.py:774, in table_to_blockmanager()
    773 columns = _deserialize_column_index(table, all_columns, column_indexes)
--> 774 blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes)
    776 axes = [columns, index]

File .../lib/python3.10/site-packages/pyarrow/pandas_compat.py:1124, in _table_to_blocks()
   1122 result = pa.lib.table_to_blocks(options, block_table, categories,
   1123                                 list(extension_columns.keys()))
-> 1124 return [_reconstruct_block(item, columns, extension_columns)
   1125         for item in result]

File .../lib/python3.10/site-packages/pyarrow/pandas_compat.py:1124, in <listcomp>()
   1122 result = pa.lib.table_to_blocks(options, block_table, categories,
   1123                                 list(extension_columns.keys()))
-> 1124 return [_reconstruct_block(item, columns, extension_columns)
   1125         for item in result]

File .../lib/python3.10/site-packages/pyarrow/pandas_compat.py:736, in _reconstruct_block()
    734     raise ValueError("This column does not support to be converted "
    735                      "to a pandas ExtensionArray")
--> 736 pd_ext_arr = pandas_dtype.__from_arrow__(arr)
    737 block = _int.make_block(pd_ext_arr, placement=placement)

File .../lib/python3.10/site-packages/pandas/core/arrays/string_.py:212, in __from_arrow__()
    210     from pandas.core.arrays.string_arrow import ArrowStringArray
--> 212     return ArrowStringArray(array)
    213 elif self.storage == "pyarrow_numpy":

File .../lib/python3.10/site-packages/pandas/core/arrays/string_arrow.py:129, in __init__()
    125 if not pa.types.is_string(self._pa_array.type) and not (
    126     pa.types.is_dictionary(self._pa_array.type)
    127     and pa.types.is_string(self._pa_array.type.value_type)
    128 ):
--> 129     raise ValueError(
    130         "ArrowStringArray requires a PyArrow (chunked) array of string type"
    131     )

ValueError: ArrowStringArray requires a PyArrow (chunked) array of string type

The above exception was the direct cause of the following exception:

RuntimeError                              Traceback (most recent call last)

... application code calling set_index on a large dataframe ...

File .../lib/python3.10/site-packages/dask/dataframe/core.py:5435, in DataFrame.set_index(***failed resolving arguments***)
   5432 else:
   5433     from dask.dataframe.shuffle import set_index
-> 5435     return set_index(
   5436         self,
   5437         other,
   5438         drop=drop,
   5439         npartitions=npartitions,
   5440         divisions=divisions,
   5441         sort=sort,
   5442         **kwargs,
   5443     )

File .../lib/python3.10/site-packages/dask/dataframe/shuffle.py:242, in set_index(df, index, npartitions, shuffle, compute, drop, upsample, divisions, partition_size, sort, **kwargs)
    239     index2 = index
    241 if divisions is None:
--> 242     divisions, mins, maxes, presorted = _calculate_divisions(
    243         df, index2, repartition, npartitions, upsample, partition_size
    244     )
    246     if presorted and npartitions == df.npartitions:
    247         divisions = mins + [maxes[-1]]

File .../lib/python3.10/site-packages/dask/dataframe/shuffle.py:54, in _calculate_divisions(df, partition_col, repartition, npartitions, upsample, partition_size, ascending)
     51 maxes = partition_col.map_partitions(M.max)
     53 try:
---> 54     divisions, sizes, mins, maxes = compute(divisions, sizes, mins, maxes)
     55 except TypeError as e:
     56     # When there are nulls and a column is non-numeric, a TypeError is sometimes raised as a result of
     57     # 1) computing mins/maxes above, 2) every null being switched to NaN, and 3) NaN being a float.
     58     # Also, Pandas ExtensionDtypes may cause TypeErrors when dealing with special nulls such as pd.NaT or pd.NA.
     59     # If this happens, we hint the user about eliminating nulls beforehand.
     60     if not is_numeric_dtype(partition_col.dtype):

File .../lib/python3.10/site-packages/dask/base.py:628, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    625     postcomputes.append(x.__dask_postcompute__())
    627 with shorten_traceback():
--> 628     results = schedule(dsk, keys, **kwargs)
    630 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File .../lib/python3.10/site-packages/distributed/shuffle/_shuffle.py:86, in shuffle_unpack()
     83 def shuffle_unpack(
     84     id: ShuffleId, output_partition: int, barrier_run_id: int
     85 ) -> pd.DataFrame:
---> 86     with handle_unpack_errors(id):
     87         return get_worker_plugin().get_output_partition(
     88             id, barrier_run_id, output_partition
     89         )

File .../lib/python3.10/contextlib.py:153, in __exit__()
    151     value = typ()
    152 try:
--> 153     self.gen.throw(typ, value, traceback)
    154 except StopIteration as exc:
    155     # Suppress StopIteration *unless* it's the same exception that
    156     # was passed to throw().  This prevents a StopIteration
    157     # raised inside the "with" statement from being suppressed.
    158     return exc is not value

File .../lib/python3.10/site-packages/distributed/shuffle/_core.py:411, in handle_unpack_errors()
    409     raise Reschedule()
    410 except Exception as e:
--> 411     raise RuntimeError(f"P2P shuffling {id} failed during unpack phase") from e

RuntimeError: P2P shuffling 9889e954166aa37ea8c6e3c4b76043ec failed during unpack phase

I can reproduce the error like this:

import pyarrow as pa
import pandas as pd
dtype = pd.ArrowDtype(pa.large_string())
other_dtype = pd.StringDtype("pyarrow")

df = pd.DataFrame({'foo': ['bar', 'baz'], 'bar': ['baz', 'quux']})
df['foo'] = df.foo.astype(dtype)
df['bar'] = df.foo.astype(other_dtype)

# Lightly modified from https://github.com/dask/dask/blob/b2f11d026d2c6f806036c050ff5dbd59d6ceb6ec/dask/dataframe/backends.py#L232-L240
def default_types_mapper(pyarrow_dtype: pa.DataType) -> object:
    # Avoid converting strings from `string[pyarrow]` to
    # `string[python]` if we have *any* `string[pyarrow]`
    if (
        pyarrow_dtype in {pa.large_string(), pa.string()}
        and pd.StringDtype("pyarrow") in df.dtypes.values
    ):
        return pd.StringDtype("pyarrow")
    return None

# Emulating https://github.com/dask/dask/blob/b2f11d026d2c6f806036c050ff5dbd59d6ceb6ec/dask/dataframe/backends.py#L243
pa.Table.from_pandas(df).to_pandas(types_mapper=default_types_mapper)

It seems that the failure depends on there being other strings in the DataFrame that are not large_string[pyarrow]. Dask then uses a types_mapper that converts large_string[pyarrow] to string[pyarrow], which I don't want and also fails.

@github-actions github-actions bot added the needs attention It's been a while since this was pushed on. Needs attention from the owner or a maintainer. label Feb 19, 2024
@fjetter fjetter unpinned this issue Mar 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community convert-string discussion Discussing a topic with no specific actions yet needs attention It's been a while since this was pushed on. Needs attention from the owner or a maintainer.
Projects
None yet
Development

No branches or pull requests

8 participants