NumPy 编写自定义数组容器
在 numpy v1.16 版本中引入的 Numpy 调度机制是编写与 numpy API 兼容并提供 numpy 功能的自定义实现的自定义 N 维数组容器的推荐方法。应用程序包括dask数组(分布在多个节点上的 N 维数组)和cupy数组(GPU 上的 N 维数组)。
为了感受如何编写自定义数组容器,我们将从一个简单的示例开始,该示例具有相当狭窄的实用性,但说明了所涉及的概念。
>>> import numpy as np >>> class DiagonalArray: ... def __init__(self, N, value): ... self._N = N ... self._i = value ... def __repr__(self): ... return f"{self.__class__.__name__}(N={self._N}, value={self._i})" ... def __array__(self, dtype=None): ... return self._i * np.eye(self._N, dtype=dtype)
我们的自定义数组可以像这样实例化:
>>> arr = DiagonalArray(5, 1) >>> arr DiagonalArray(N=5, value=1)
我们可以使用numpy.array
or 转换成一个 numpy 数组numpy.asarray
,它会调用它的__array__
方法来获取一个标准的numpy.ndarray
.
>>> np.asarray(arr) array([[1., 0., 0., 0., 0.], [0., 1., 0., 0., 0.], [0., 0., 1., 0., 0.], [0., 0., 0., 1., 0.], [0., 0., 0., 0., 1.]])
如果我们使用arr
numpy 函数进行操作,numpy 将再次使用该 __array__
接口将其转换为数组,然后以通常的方式应用该函数。
>>> np.multiply(arr, 2) array([[2., 0., 0., 0., 0.], [0., 2., 0., 0., 0.], [0., 0., 2., 0., 0.], [0., 0., 0., 2., 0.], [0., 0., 0., 0., 2.]])
请注意,返回类型是标准的numpy.ndarray
.
>>> type(np.multiply(arr, 2)) numpy.ndarray
我们如何通过这个函数传递我们的自定义数组类型?Numpy 允许一个类通过接口__array_ufunc__
和__array_function__
. 让我们一次一个,从_array_ufunc__
. 此方法涵盖 通用函数 (ufunc),这是一类函数,例如包括 numpy.multiply
和numpy.sin
。
该__array_ufunc__
接收:
ufunc
,函数如numpy.multiply
method
, 一个字符串,区分numpy.multiply(...)
和 变体,如numpy.multiply.outer
、numpy.multiply.accumulate
等。对于常见情况,numpy.multiply(...)
, 。method == '__call__'
inputs
,这可能是不同类型的混合kwargs
, 传递给函数的关键字参数
对于这个例子,我们将只处理方法 __call__
>>> from numbers import Number >>> class DiagonalArray: ... def __init__(self, N, value): ... self._N = N ... self._i = value ... def __repr__(self): ... return f"{self.__class__.__name__}(N={self._N}, value={self._i})" ... def __array__(self, dtype=None): ... return self._i * np.eye(self._N, dtype=dtype) ... def __array_ufunc__(self, ufunc, method, *inputs, **kwargs): ... if method == '__call__': ... N = None ... scalars = [] ... for input in inputs: ... if isinstance(input, Number): ... scalars.append(input) ... elif isinstance(input, self.__class__): ... scalars.append(input._i) ... if N is not None: ... if N != self._N: ... raise TypeError("inconsistent sizes") ... else: ... N = self._N ... else: ... return NotImplemented ... return self.__class__(N, ufunc(*scalars, **kwargs)) ... else: ... return NotImplemented
现在我们的自定义数组类型通过 numpy 函数。
>>> arr = DiagonalArray(5, 1) >>> np.multiply(arr, 3) DiagonalArray(N=5, value=3) >>> np.add(arr, 3) DiagonalArray(N=5, value=4) >>> np.sin(arr) DiagonalArray(N=5, value=0.8414709848078965)
此时不起作用。arr + 3
>>> arr + 3 TypeError: unsupported operand type(s) for *: 'DiagonalArray' and 'int'
为了支持它,我们需要定义 Python 接口__add__
、__lt__
等以分派到相应的 ufunc。我们可以通过从 mixin 继承来方便地实现这一点 NDArrayOperatorsMixin
。
>>> import numpy.lib.mixins >>> class DiagonalArray(numpy.lib.mixins.NDArrayOperatorsMixin): ... def __init__(self, N, value): ... self._N = N ... self._i = value ... def __repr__(self): ... return f"{self.__class__.__name__}(N={self._N}, value={self._i})" ... def __array__(self, dtype=None): ... return self._i * np.eye(self._N, dtype=dtype) ... def __array_ufunc__(self, ufunc, method, *inputs, **kwargs): ... if method == '__call__': ... N = None ... scalars = [] ... for input in inputs: ... if isinstance(input, Number): ... scalars.append(input) ... elif isinstance(input, self.__class__): ... scalars.append(input._i) ... if N is not None: ... if N != self._N: ... raise TypeError("inconsistent sizes") ... else: ... N = self._N ... else: ... return NotImplemented ... return self.__class__(N, ufunc(*scalars, **kwargs)) ... else: ... return NotImplemented
>>> arr = DiagonalArray(5, 1) >>> arr + 3 DiagonalArray(N=5, value=4) >>> arr > 0 DiagonalArray(N=5, value=True)
现在让我们解决__array_function__
. 我们将创建 dict 将 numpy 函数映射到我们的自定义变体。
>>> HANDLED_FUNCTIONS = {} >>> class DiagonalArray(numpy.lib.mixins.NDArrayOperatorsMixin): ... def __init__(self, N, value): ... self._N = N ... self._i = value ... def __repr__(self): ... return f"{self.__class__.__name__}(N={self._N}, value={self._i})" ... def __array__(self, dtype=None): ... return self._i * np.eye(self._N, dtype=dtype) ... def __array_ufunc__(self, ufunc, method, *inputs, **kwargs): ... if method == '__call__': ... N = None ... scalars = [] ... for input in inputs: ... # In this case we accept only scalar numbers or DiagonalArrays. ... if isinstance(input, Number): ... scalars.append(input) ... elif isinstance(input, self.__class__): ... scalars.append(input._i) ... if N is not None: ... if N != self._N: ... raise TypeError("inconsistent sizes") ... else: ... N = self._N ... else: ... return NotImplemented ... return self.__class__(N, ufunc(*scalars, **kwargs)) ... else: ... return NotImplemented ... def __array_function__(self, func, types, args, kwargs): ... if func not in HANDLED_FUNCTIONS: ... return NotImplemented ... # Note: this allows subclasses that don't override ... # __array_function__ to handle DiagonalArray objects. ... if not all(issubclass(t, self.__class__) for t in types): ... return NotImplemented ... return HANDLED_FUNCTIONS[func](*args, **kwargs) ...
一个方便的模式是定义一个implements
可用于向HANDLED_FUNCTIONS
.
>>> def implements(np_function): ... "Register an __array_function__ implementation for DiagonalArray objects." ... def decorator(func): ... HANDLED_FUNCTIONS[np_function] = func ... return func ... return decorator ...
现在我们编写 numpy 函数的实现DiagonalArray
。为了完整起见,为了支持用法,arr.sum()
添加一个sum
调用的方法,numpy.sum(self)
对于mean
.
>>> @implements(np.sum) ... def sum(arr): ... "Implementation of np.sum for DiagonalArray objects" ... return arr._i * arr._N ... >>> @implements(np.mean) ... def mean(arr): ... "Implementation of np.mean for DiagonalArray objects" ... return arr._i / arr._N ... >>> arr = DiagonalArray(5, 1) >>> np.sum(arr) 5 >>> np.mean(arr) 0.2
如果用户尝试使用 中未包含的任何 numpy 函数 HANDLED_FUNCTIONS
,TypeError
则 numpy 将引发a ,表示不支持此操作。例如,连接两个 DiagonalArrays
不会产生另一个对角数组,因此不支持。
>>> np.concatenate([arr, arr]) TypeError: no implementation found for 'numpy.concatenate' on types that implement __array_function__: [<class '__main__.DiagonalArray'>]
此外,我们的sum
和mean
实现不接受 numpy 的实现所做的可选参数。
>>> np.sum(arr, axis=0) TypeError: sum() got an unexpected keyword argument 'axis'
用户总是具有转换为正常的选择numpy.ndarray
与 numpy.asarray
和使用标准numpy的从那里。
>>> np.concatenate([np.asarray(arr), np.asarray(arr)]) array([[1., 0., 0., 0., 0.], [0., 1., 0., 0., 0.], [0., 0., 1., 0., 0.], [0., 0., 0., 1., 0.], [0., 0., 0., 0., 1.], [1., 0., 0., 0., 0.], [0., 1., 0., 0., 0.], [0., 0., 1., 0., 0.], [0., 0., 0., 1., 0.], [0., 0., 0., 0., 1.]])
有关自定义数组容器的更完整示例,请参阅dask 源代码和 cupy 源代码。 另见NEP 18。
更多建议: