1000字范文,内容丰富有趣,学习的好帮手!
1000字范文 > python 移动平均函数_python – NumPy版本的“指数加权移动平均线...

python 移动平均函数_python – NumPy版本的“指数加权移动平均线...

时间:2019-06-28 00:42:31

相关推荐

python 移动平均函数_python  –  NumPy版本的“指数加权移动平均线...

更新于08/06/

纯粹的,快速的&大输入的矢量化解决方案

用于就地计算的out参数,

dtype参数,

索引顺序参数

这个函数相当于pandas的ewm(adjust = False).mean(),但要快得多. ewm(adjust = True).mean()(pandas的默认值)可以在结果的开头产生不同的值.我正在努力为此解决方案添加adjust功能.

当输入太大时,@Divakar’s answer会导致浮点精度问题.这是因为(1-α)**(n 1) – >当n – >时为0 inf和alpha – > 1,导致在计算中弹出除零和NaN值.

这是我最快的解决方案,没有精度问题,几乎完全矢量化.它有点复杂,但性能很好,特别是对于非常大的输入.不使用就地计算(可以使用out参数,节省内存分配时间):100M元素输入向量为3.62秒,100K元素输入向量为3.2ms,相对较老的5000元素输入向量为293μs PC(结果将随着不同的alpha / row_size值而变化).

# tested with python3 & numpy 1.15.2

import numpy as np

def ewma_vectorized_safe(data, alpha, row_size=None, dtype=None, order='C', out=None):

"""

Reshapes data before calculating EWMA, then iterates once over the rows

to calculate the offset without precision issues

:param data: Input data, will be flattened.

:param alpha: scalar float in range (0,1)

The alpha parameter for the moving average.

:param row_size: int, optional

The row size to use in the computation. High row sizes need higher precision,

low values will impact performance. The optimal value depends on the

platform and the alpha being used. Higher alpha values require lower

row size. Default depends on dtype.

:param dtype: optional

Data type used for calculations. Defaults to float64 unless

data.dtype is float32, then it will use float32.

:param order: {'C', 'F', 'A'}, optional

Order to use when flattening the data. Defaults to 'C'.

:param out: ndarray, or None, optional

A location into which the result is stored. If provided, it must have

the same shape as the desired output. If not provided or `None`,

a freshly-allocated array is returned.

:return: The flattened result.

"""

data = np.array(data, copy=False)

if dtype is None:

if data.dtype == np.float32:

dtype = np.float32

else:

dtype = np.float

else:

dtype = np.dtype(dtype)

row_size = int(row_size) if row_size is not None

else get_max_row_size(alpha, dtype)

if data.size <= row_size:

# The normal function can handle this input, use that

return ewma_vectorized(data, alpha, dtype=dtype, order=order, out=out)

if data.ndim > 1:

# flatten input

data = np.reshape(data, -1, order=order)

if out is None:

out = np.empty_like(data, dtype=dtype)

else:

assert out.shape == data.shape

assert out.dtype == dtype

row_n = int(data.size // row_size) # the number of rows to use

trailing_n = int(data.size % row_size) # the amount of data leftover

first_offset = data[0]

if trailing_n > 0:

# set temporary results to slice view of out parameter

out_main_view = np.reshape(out[:-trailing_n], (row_n, row_size))

data_main_view = np.reshape(data[:-trailing_n], (row_n, row_size))

else:

out_main_view = out

data_main_view = data

# get all the scaled cumulative sums with 0 offset

ewma_vectorized_2d(data_main_view, alpha, axis=1, offset=0, dtype=dtype,

order='C', out=out_main_view)

scaling_factors = (1 - alpha) ** np.arange(1, row_size + 1)

last_scaling_factor = scaling_factors[-1]

# create offset array

offsets = np.empty(out_main_view.shape[0], dtype=dtype)

offsets[0] = first_offset

# iteratively calculate offset for each row

for i in range(1, out_main_view.shape[0]):

offsets[i] = offsets[i - 1] * last_scaling_factor + out_main_view[i - 1, -1]

# add the offsets to the result

out_main_view += offsets[:, np.newaxis] * scaling_factors[np.newaxis, :]

if trailing_n > 0:

# process trailing data in the 2nd slice of the out parameter

ewma_vectorized(data[-trailing_n:], alpha, offset=out_main_view[-1, -1],

dtype=dtype, order='C', out=out[-trailing_n:])

return out

def get_max_row_size(alpha, dtype=float):

assert 0. <= alpha < 1.

# This will return the maximum row size possible on

# your platform for the given dtype. I can find no impact on accuracy

# at this value on my machine.

# Might not be the optimal value for speed, which is hard to predict

# due to numpy's optimizations

# Use np.finfo(dtype).eps if you are worried about accuracy

# and want to be extra safe.

epsilon = np.finfo(dtype).tiny

# If this produces an OverflowError, make epsilon larger

return int(np.log(epsilon)/np.log(1-alpha)) + 1

1D ewma功能:

def ewma_vectorized(data, alpha, offset=None, dtype=None, order='C', out=None):

"""

Calculates the exponential moving average over a vector.

Will fail for large inputs.

:param data: Input data

:param alpha: scalar float in range (0,1)

The alpha parameter for the moving average.

:param offset: optional

The offset for the moving average, scalar. Defaults to data[0].

:param dtype: optional

Data type used for calculations. Defaults to float64 unless

data.dtype is float32, then it will use float32.

:param order: {'C', 'F', 'A'}, optional

Order to use when flattening the data. Defaults to 'C'.

:param out: ndarray, or None, optional

A location into which the result is stored. If provided, it must have

the same shape as the input. If not provided or `None`,

a freshly-allocated array is returned.

"""

data = np.array(data, copy=False)

if dtype is None:

if data.dtype == np.float32:

dtype = np.float32

else:

dtype = np.float64

else:

dtype = np.dtype(dtype)

if data.ndim > 1:

# flatten input

data = data.reshape(-1, order)

if out is None:

out = np.empty_like(data, dtype=dtype)

else:

assert out.shape == data.shape

assert out.dtype == dtype

if data.size < 1:

# empty input, return empty array

return out

if offset is None:

offset = data[0]

alpha = np.array(alpha, copy=False).astype(dtype, copy=False)

# scaling_factors -> 0 as len(data) gets large

# this leads to divide-by-zeros below

scaling_factors = np.power(1. - alpha, np.arange(data.size + 1, dtype=dtype),

dtype=dtype)

# create cumulative sum array

np.multiply(data, (alpha * scaling_factors[-2]) / scaling_factors[:-1],

dtype=dtype, out=out)

np.cumsum(out, dtype=dtype, out=out)

# cumsums / scaling

out /= scaling_factors[-2::-1]

if offset != 0:

offset = np.array(offset, copy=False).astype(dtype, copy=False)

# add offsets

out += offset * scaling_factors[1:]

return out

2D ewma功能:

def ewma_vectorized_2d(data, alpha, axis=None, offset=None, dtype=None, order='C', out=None):

"""

Calculates the exponential moving average over a given axis.

:param data: Input data, must be 1D or 2D array.

:param alpha: scalar float in range (0,1)

The alpha parameter for the moving average.

:param axis: The axis to apply the moving average on.

If axis==None, the data is flattened.

:param offset: optional

The offset for the moving average. Must be scalar or a

vector with one element for each row of data. If set to None,

defaults to the first value of each row.

:param dtype: optional

Data type used for calculations. Defaults to float64 unless

data.dtype is float32, then it will use float32.

:param order: {'C', 'F', 'A'}, optional

Order to use when flattening the data. Ignored if axis is not None.

:param out: ndarray, or None, optional

A location into which the result is stored. If provided, it must have

the same shape as the desired output. If not provided or `None`,

a freshly-allocated array is returned.

"""

data = np.array(data, copy=False)

assert data.ndim <= 2

if dtype is None:

if data.dtype == np.float32:

dtype = np.float32

else:

dtype = np.float64

else:

dtype = np.dtype(dtype)

if out is None:

out = np.empty_like(data, dtype=dtype)

else:

assert out.shape == data.shape

assert out.dtype == dtype

if data.size < 1:

# empty input, return empty array

return out

if axis is None or data.ndim < 2:

# use 1D version

if isinstance(offset, np.ndarray):

offset = offset[0]

return ewma_vectorized(data, alpha, offset, dtype=dtype, order=order,

out=out)

assert -data.ndim <= axis < data.ndim

# create reshaped data views

out_view = out

if axis < 0:

axis = data.ndim - int(axis)

if axis == 0:

# transpose data views so columns are treated as rows

data = data.T

out_view = out_view.T

if offset is None:

# use the first element of each row as the offset

offset = np.copy(data[:, 0])

elif np.size(offset) == 1:

offset = np.reshape(offset, (1,))

alpha = np.array(alpha, copy=False).astype(dtype, copy=False)

# calculate the moving average

row_size = data.shape[1]

row_n = data.shape[0]

scaling_factors = np.power(1. - alpha, np.arange(row_size + 1, dtype=dtype),

dtype=dtype)

# create a scaled cumulative sum array

np.multiply(

data,

np.multiply(alpha * scaling_factors[-2], np.ones((row_n, 1), dtype=dtype),

dtype=dtype)

/ scaling_factors[np.newaxis, :-1],

dtype=dtype, out=out_view

)

np.cumsum(out_view, axis=1, dtype=dtype, out=out_view)

out_view /= scaling_factors[np.newaxis, -2::-1]

if not (np.size(offset) == 1 and offset == 0):

offset = offset.astype(dtype, copy=False)

# add the offsets to the scaled cumulative sums

out_view += offset[:, np.newaxis] * scaling_factors[np.newaxis, 1:]

return out

用法:

data_n = 100000000

data = ((0.5*np.random.randn(data_n)+0.5) % 1) * 100

span = 5000 # span >= 1

alpha = 2/(span+1) # for pandas` span parameter

# com = 1000 # com >= 0

# alpha = 1/(1+com) # for pandas` center-of-mass parameter

# halflife = 100 # halflife > 0

# alpha = 1 - np.exp(np.log(0.5)/halflife) # for pandas` half-life parameter

result = ewma_vectorized_safe(data, alpha)

只是一个提示

对于给定的alpha,很容易计算“窗口大小”(技术上指数平均值具有无限“窗口”),这取决于该窗口中数据对平均值的贡献.例如,这可用于选择由于边界效应而将结果的多少开始视为不可靠.

def window_size(alpha, sum_proportion):

# Increases with increased sum_proportion and decreased alpha

# solve (1-alpha)**window_size = (1-sum_proportion) for window_size

return int(np.log(1-sum_proportion) / np.log(1-alpha))

alpha = 0.02

sum_proportion = .99 # window covers 99% of contribution to the moving average

window = window_size(alpha, sum_proportion) # = 227

sum_proportion = .75 # window covers 75% of contribution to the moving average

window = window_size(alpha, sum_proportion) # = 68

此线程中使用的alpha = 2 /(window_size 1.0)关系(来自pandas的’span’选项)是上述函数的逆的非常粗略的近似(sum_proportion~ = 0.87). alpha = 1 – np.exp(np.log(1-sum_proportion)/ window_size)更准确(pandas的’half-life’选项等于此公式,sum_proportion = 0.5).

在以下示例中,数据表示连续的噪声信号. cutoff_idx是结果中的第一个位置,其中至少99%的值依赖于数据中的单独值(即,小于1%取决于数据[0]). cutoff_idx之前的数据被排除在最终结果之外,因为它太依赖于数据中的第一个值,因此可能会使平均值偏离.

result = ewma_vectorized_safe(data, alpha, chunk_size)

sum_proportion = .99

cutoff_idx = window_size(alpha, sum_proportion)

result = result[cutoff_idx:]

为了说明上面解决的问题你可以运行几次,注意红线的经常出现的错误开始,在cutoff_idx之后跳过:

data_n = 100000

data = np.random.rand(data_n) * 100

window = 1000

sum_proportion = .99

alpha = 1 - np.exp(np.log(1-sum_proportion)/window)

result = ewma_vectorized_safe(data, alpha)

cutoff_idx = window_size(alpha, sum_proportion)

x = np.arange(start=0, stop=result.size)

import matplotlib.pyplot as plt

plt.plot(x[:cutoff_idx+1], result[:cutoff_idx+1], '-r',

x[cutoff_idx:], result[cutoff_idx:], '-b')

plt.show()

请注意cutoff_idx == window,因为alpha是使用window_size()函数的反函数设置的,具有相同的sum_proportion.

这类似于pandas如何应用ewm(span = window,min_periods = window).

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。