Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions doc/source/whatsnew/v3.0.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1165,6 +1165,8 @@ Groupby/resample/rolling
- Bug in :meth:`DataFrameGroupby.transform` and :meth:`SeriesGroupby.transform` with a reducer and ``observed=False`` that coerces dtype to float when there are unobserved categories. (:issue:`55326`)
- Bug in :meth:`Rolling.apply` for ``method="table"`` where column order was not being respected due to the columns getting sorted by default. (:issue:`59666`)
- Bug in :meth:`Rolling.apply` where the applied function could be called on fewer than ``min_period`` periods if ``method="table"``. (:issue:`58868`)
- Bug in :meth:`Rolling.skew` incorrectly computing skewness for windows following outliers due to numerical instability. The calculation now properly handles catastrophic cancellation by recomputing affected windows (:issue:`47461`)
- Bug in :meth:`Rolling.skew` incorrectly returning ``0.0`` for constant-value windows. Now returns ``NaN`` to match the statistical definition of skewness for degenerate distributions (:issue:`62864`)
- Bug in :meth:`Series.resample` could raise when the date range ended shortly before a non-existent time. (:issue:`58380`)

Reshaping
Expand Down
199 changes: 77 additions & 122 deletions pandas/_libs/window/aggregations.pyx
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# cython: boundscheck=False, wraparound=False, cdivision=True

from libc.math cimport (
fabs,
round,
signbit,
sqrt,
Expand Down Expand Up @@ -482,196 +483,150 @@ def roll_var(const float64_t[:] values, ndarray[int64_t] start,


cdef float64_t calc_skew(int64_t minp, int64_t nobs,
float64_t x, float64_t xx, float64_t xxx,
int64_t num_consecutive_same_value
float64_t mean, float64_t m2, float64_t m3,
) noexcept nogil:
cdef:
float64_t result, dnobs
float64_t A, B, C, R
float64_t moments_ratio, correction
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks you for using descriptive names for these variables!


if nobs >= minp:
dnobs = <float64_t>nobs
A = x / dnobs
B = xx / dnobs - A * A
C = xxx / dnobs - A * A * A - 3 * A * B

if nobs < 3:
result = NaN
# GH 42064 46431
# uniform case, force result to be 0
elif num_consecutive_same_value >= nobs:
result = 0.0
# #18044: with uniform distribution, floating issue will
# cause B != 0. and cause the result is a very
# #18044: with degenerate distribution, floating issue will
# cause m2 != 0. and cause the result is a very
# large number.
#
# in core/nanops.py nanskew/nankurt call the function
# _zero_out_fperr(m2) to fix floating error.
# if the variance is less than 1e-14, it could be
# treat as zero, here we follow the original
# skew/kurt behaviour to check B <= 1e-14
elif B <= 1e-14:
# skew/kurt behaviour to check m2 < n * (eps * eps * mean * mean)
elif m2 < dnobs * (1e-28 * mean * mean if fabs(mean) > 1e-14 else 1e-14):
result = NaN
else:
R = sqrt(B)
result = ((sqrt(dnobs * (dnobs - 1.)) * C) /
((dnobs - 2) * R * R * R))
moments_ratio = m3 / (m2 * sqrt(m2))
correction = dnobs * sqrt((dnobs - 1)) / (dnobs - 2)
result = moments_ratio * correction
else:
result = NaN

return result


cdef void add_skew(float64_t val, int64_t *nobs,
float64_t *x, float64_t *xx,
float64_t *xxx,
float64_t *compensation_x,
float64_t *compensation_xx,
float64_t *compensation_xxx,
int64_t *num_consecutive_same_value,
float64_t *prev_value,
float64_t *mean, float64_t *m2,
float64_t *m3,
bint *numerically_unstable
) noexcept nogil:
""" add a value from the skew calc """
cdef:
float64_t y, t
float64_t n, delta, delta_n, term1, m3_update, new_m3

# Not NaN
if val == val:
nobs[0] = nobs[0] + 1
nobs[0] += 1
n = <float64_t>(nobs[0])
delta = val - mean[0]
delta_n = delta / n
term1 = delta * delta_n * (n - 1.0)

y = val - compensation_x[0]
t = x[0] + y
compensation_x[0] = t - x[0] - y
x[0] = t
y = val * val - compensation_xx[0]
t = xx[0] + y
compensation_xx[0] = t - xx[0] - y
xx[0] = t
y = val * val * val - compensation_xxx[0]
t = xxx[0] + y
compensation_xxx[0] = t - xxx[0] - y
xxx[0] = t
m3_update = delta_n * (term1 * (n - 2.0) - 3.0 * m2[0])
new_m3 = m3[0] + m3_update
if fabs(m3_update) + fabs(m3[0]) > 1e10 * fabs(new_m3):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did you come up with the 1e10 threshold?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It loses signifcant digits when $|A| + |B| &gt;&gt; |A - B|$, so I thought that a change in 10 significant digits in base 10 should be considered a catastrophic cancellation.

# possible catastrophic cancellation
numerically_unstable[0] = True

# GH#42064, record num of same values to remove floating point artifacts
if val == prev_value[0]:
num_consecutive_same_value[0] += 1
else:
# reset to 1 (include current value itself)
num_consecutive_same_value[0] = 1
prev_value[0] = val
m3[0] = new_m3
m2[0] += term1
mean[0] += delta_n


cdef void remove_skew(float64_t val, int64_t *nobs,
float64_t *x, float64_t *xx,
float64_t *xxx,
float64_t *compensation_x,
float64_t *compensation_xx,
float64_t *compensation_xxx) noexcept nogil:
float64_t *mean, float64_t *m2,
float64_t *m3,
bint *numerically_unstable) noexcept nogil:
""" remove a value from the skew calc """
cdef:
float64_t y, t
float64_t n, delta, delta_n, term1, m3_update, new_m3

# Not NaN
if val == val:
nobs[0] = nobs[0] - 1
nobs[0] -= 1
n = <float64_t>(nobs[0])
delta = val - mean[0]
delta_n = delta / n
term1 = delta_n * delta * (n + 1.0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be interesting to use fused add multiply fma where applicable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't figure out an effective way to reformulate the computation of term1 to use fma. But I managed to change the update functions for the 3rd moment. Now I am using fma(term1, n + 2.0, -3.0 * m2[0])

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the latest commit, I had a performance regression against the main branch. asv find reports that the regression was caused by ae1f3f9, where I use fma. Should I revert this change, keep it as is, or is there an improvement that won't hinder the performance?


y = - val - compensation_x[0]
t = x[0] + y
compensation_x[0] = t - x[0] - y
x[0] = t
y = - val * val - compensation_xx[0]
t = xx[0] + y
compensation_xx[0] = t - xx[0] - y
xx[0] = t
y = - val * val * val - compensation_xxx[0]
t = xxx[0] + y
compensation_xxx[0] = t - xxx[0] - y
xxx[0] = t
m3_update = delta_n * (term1 * (n + 2.0) - 3.0 * m2[0])
new_m3 = m3[0] - m3_update

if fabs(m3_update) + fabs(m3[0]) > 1e10 * fabs(new_m3):
# possible catastrophic cancellation
numerically_unstable[0] = True

m3[0] = new_m3
m2[0] -= term1
mean[0] -= delta_n


def roll_skew(ndarray[float64_t] values, ndarray[int64_t] start,
ndarray[int64_t] end, int64_t minp) -> np.ndarray:
cdef:
Py_ssize_t i, j
float64_t val, min_val, mean_val, sum_val = 0
float64_t compensation_xxx_add, compensation_xxx_remove
float64_t compensation_xx_add, compensation_xx_remove
float64_t compensation_x_add, compensation_x_remove
float64_t x, xx, xxx
float64_t prev_value
int64_t nobs = 0, N = len(start), V = len(values), nobs_mean = 0
int64_t s, e, num_consecutive_same_value
ndarray[float64_t] output, values_copy
bint is_monotonic_increasing_bounds
float64_t val
float64_t mean, m2, m3
int64_t nobs = 0, N = len(start)
int64_t s, e
ndarray[float64_t] output
bint requires_recompute, numerically_unstable = False

minp = max(minp, 3)
is_monotonic_increasing_bounds = is_monotonic_increasing_start_end_bounds(
start, end
)
output = np.empty(N, dtype=np.float64)
min_val = np.nanmin(values)
values_copy = np.copy(values)

with nogil:
for i in range(0, V):
val = values_copy[i]
if val == val:
nobs_mean += 1
sum_val += val
mean_val = sum_val / nobs_mean
# Other cases would lead to imprecision for smallest values
if min_val - mean_val > -1e5:
mean_val = round(mean_val)
for i in range(0, V):
values_copy[i] = values_copy[i] - mean_val

for i in range(0, N):

s = start[i]
e = end[i]

# Over the first window, observations can only be added
# never removed
if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:

prev_value = values[s]
num_consecutive_same_value = 0

compensation_xxx_add = compensation_xxx_remove = 0
compensation_xx_add = compensation_xx_remove = 0
compensation_x_add = compensation_x_remove = 0
x = xx = xxx = 0
nobs = 0
for j in range(s, e):
val = values_copy[j]
add_skew(val, &nobs, &x, &xx, &xxx, &compensation_x_add,
&compensation_xx_add, &compensation_xxx_add,
&num_consecutive_same_value, &prev_value)

else:
requires_recompute = (
i == 0
or not is_monotonic_increasing_bounds
or s >= end[i - 1]
or numerically_unstable
)

# After the first window, observations can both be added
# and removed
# calculate deletes
if not requires_recompute:
for j in range(start[i - 1], s):
val = values_copy[j]
remove_skew(val, &nobs, &x, &xx, &xxx, &compensation_x_remove,
&compensation_xx_remove, &compensation_xxx_remove)
val = values[j]
remove_skew(val, &nobs, &mean, &m2, &m3, &numerically_unstable)

# calculate adds
for j in range(end[i - 1], e):
val = values_copy[j]
add_skew(val, &nobs, &x, &xx, &xxx, &compensation_x_add,
&compensation_xx_add, &compensation_xxx_add,
&num_consecutive_same_value, &prev_value)
val = values[j]
add_skew(val, &nobs, &mean, &m2, &m3, &numerically_unstable)

if requires_recompute or numerically_unstable:
mean = m2 = m3 = 0.0
nobs = 0

for j in range(s, e):
val = values[j]
add_skew(val, &nobs, &mean, &m2, &m3, &numerically_unstable)

numerically_unstable = False

output[i] = calc_skew(minp, nobs, x, xx, xxx, num_consecutive_same_value)
output[i] = calc_skew(minp, nobs, mean, m2, m3)

if not is_monotonic_increasing_bounds:
nobs = 0
x = 0.0
xx = 0.0
xxx = 0.0
mean = 0.0
m2 = 0.0
m3 = 0.0

return output

Expand Down
36 changes: 27 additions & 9 deletions pandas/tests/window/test_rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -1172,7 +1172,9 @@ def test_rolling_decreasing_indices(method):
increasing = getattr(df.rolling(window=5), method)()
decreasing = getattr(df_reverse.rolling(window=5), method)()

assert np.abs(decreasing.values[::-1][:-4] - increasing.values[4:]).max() < 1e-12
tm.assert_almost_equal(
decreasing.values[::-1][:-4], increasing.values[4:], atol=1e-12
)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -1438,17 +1440,30 @@ def test_rolling_skew_kurt_numerical_stability(method):


@pytest.mark.parametrize(
("method", "values"),
("method", "data", "values"),
[
("skew", [2.0, 0.854563, 0.0, 1.999984]),
("kurt", [4.0, -1.289256, -1.2, 3.999946]),
(
"skew",
[3000000, 1, 1, 2, 3, 4, 999],
[np.nan] * 3 + [2.0, 0.854563, 0.0, 1.999984],
),
(
"skew",
[1e6, -1e6, 1, 2, 3, 4, 5, 6],
[np.nan] * 3 + [-5.51135192e-06, -2.0, 0.0, 0.0, 0.0],
),
(
"kurt",
[3000000, 1, 1, 2, 3, 4, 999],
[np.nan] * 3 + [4.0, -1.289256, -1.2, 3.999946],
),
],
)
def test_rolling_skew_kurt_large_value_range(method, values):
def test_rolling_skew_kurt_large_value_range(method, data, values):
# GH: 37557
s = Series([3000000, 1, 1, 2, 3, 4, 999])
s = Series(data)
result = getattr(s.rolling(4), method)()
expected = Series([np.nan] * 3 + values)
expected = Series(values)
tm.assert_series_equal(result, expected)


Expand Down Expand Up @@ -1828,15 +1843,18 @@ def test_rolling_mean_sum_floating_artifacts():
assert (result[-3:] == 0).all()


@pytest.mark.xfail(reason="Series.rolling.kurt computes -3 to degenerate distribution")
def test_rolling_skew_kurt_floating_artifacts():
# GH 42064 46431

sr = Series([1 / 3, 4, 0, 0, 0, 0, 0])
r = sr.rolling(4)
result = r.skew()
assert (result[-2:] == 0).all()
expected = Series([np.nan, np.nan, np.nan, 1.9619045191072484, 2.0, np.nan, np.nan])
tm.assert_series_equal(result, expected)
result = r.kurt()
assert (result[-2:] == -3).all()
expected = Series([np.nan, np.nan, np.nan, 3.8636048803878786, 4.0, np.nan, np.nan])
tm.assert_series_equal(result, expected)


def test_numeric_only_frame(arithmetic_win_operators, numeric_only):
Expand Down
9 changes: 4 additions & 5 deletions pandas/tests/window/test_rolling_skew_kurt.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,11 @@ def test_center_reindex_frame(frame, roll_func):


def test_rolling_skew_edge_cases(step):
expected = Series([np.nan] * 4 + [0.0])[::step]
expected = Series([np.nan] * 5)[::step]
# yields all NaN (0 variance)
d = Series([1] * 5)
x = d.rolling(window=5, step=step).skew()
# index 4 should be 0 as it contains 5 same obs
# index 4 should be NaN as it contains 5 same obs (variance 0)
tm.assert_series_equal(expected, x)

expected = Series([np.nan] * 5)[::step]
Expand Down Expand Up @@ -213,10 +213,9 @@ def test_rolling_kurt_edge_cases(step):

def test_rolling_skew_eq_value_fperr(step):
# #18804 all rolling skew for all equal values should return Nan
# #46717 update: all equal values should return 0 instead of NaN
a = Series([1.1] * 15).rolling(window=10, step=step).skew()
assert (a[a.index >= 9] == 0).all()
assert a[a.index < 9].isna().all()
expected = Series([np.nan] * 15)[::step]
tm.assert_series_equal(expected, a)


def test_rolling_kurt_eq_value_fperr(step):
Expand Down
Loading