我有以下numpy
随机2D数组:
np.random.rand(30, 20)
我想迭代数组中的每个网格单元格.如果网格单元的值> 0.6,那么我想将多余的分配给其直接的8个相邻单元(在角网格单元的情况下,相邻单元的数量将更少).
应根据2个用户选择的规则之一重新分配超出部分:
在8个邻居之间
与每个邻居中的值成比例,即具有较高值的邻居变得更高
有没有办法在numpy
不诉诸for
循环的情况下完成此操作?
好的,这是我的看法:在每个步骤中,算法检测所有超阈值单元格,同时均匀或固定地更新所有这些及其所有邻居; 这是完全矢量化的,有两种实现方式:
通常更快的是基于线性卷积加上一些技巧来保存边缘和角落的质量;
另一个表示与稀疏矩阵相同的运算符,它通常较慢,但我把它留下来因为
它可以处理稀疏参数,因此当超阈值单元格的比例很低时更快
由于此过程通常不会在一个步骤中收敛,因此将其置于循环中,但是对于除最小网格之外的所有网格,其开销应该是最小的,因为其有效载荷很大.在用户提供循环次数或没有剩余超阈值单位后,循环将终止.可选地,它可以记录迭代之间的欧几里德增量.
关于算法的几句话:如果它不是边界,则均匀扩展操作可以被描述为减去重新分配的质量p的模式,然后添加与环核k = [ 111]的卷积相同的模式.1 0 1; 1 1 1]/8; 类似地,与剩余质量r 成比例的再分配可写为
(1) r(k*(p /(k*r)))
其中*是卷积运算符,乘法和除法是组件方式.解析公式,我们看到p中的每个点首先通过其8个邻居上的剩余质量r*k的平均值进行归一化,然后将其扩展到所述邻居(另一个卷积)并用残差进行缩放.预标准化保证了质量守恒.特别是,它正确地标准化边界和角落.在此基础上,我们看到偶数规则的边界问题可以以类似的方式解决:通过使用(1)将r替换为一张.
有趣的注意事项:使用比例规则可以构建非会聚模式.这是两个振荡器:
0.7 0 0.8 0 0.8 0 0 0 0 0 0 0.6 0 0.6 0 0.6 0 1.0 0.6 0 0.8 0 1.0 0 1.0 0 0 0 0 0 0 0.6 0 0.6 0 0.6
代码在下面,相当长和技术我害怕,但我试图解释至少主要(最快)分支; 调用main函数level
,还有一些简单的测试函数.
有一些print
陈述,但我认为这是唯一的Python3依赖.
import numpy as np try: from scipy import signal HAVE_SCIPY = True except ImportError: HAVE_SCIPY = False import time SPARSE_THRESH = 0.05 USE_SCIPY = False # actually, numpy workaround is a bit faster KERNEL = np.zeros((3, 3)) + 1/8 KERNEL[1, 1] = 0 def scipy_ring(a): """convolve 2d array a with kernel 1/8 1/8 1/8 1/8 0 1/8 1/8 1/8 1/8 """ return signal.convolve2d(a, KERNEL, mode='same') def numpy_ring(a): """convolve 2d array a with kernel 1/8 1/8 1/8 1/8 0 1/8 1/8 1/8 1/8 """ tmp = a.copy() tmp[:, 1:] += a[:, :-1] tmp[:, :-1] += a[:, 1:] out = tmp.copy() out[1:, :] += tmp[:-1, :] out[:-1, :] += tmp[1:, :] return (out - a) / 8 if USE_SCIPY and HAVE_SCIPY: conv_with_ring = scipy_ring else: conv_with_ring = numpy_ring # next is yet another implementation of convolution including edge correction. # what for? it is most of the time much slower than the above but can handle # sparse inputs and consequently is faster if the fraction of above-threshold # cells is small (~5% or less) SPREAD_CACHE = {} def precomp(sh): """precompute sparse representation of operator mapping ravelled 2d array of shape sh to boundary corrected convolution with ring kernel 1/8 1/8 1/8 / 1/5 0 1/5 0 1/3 \ 1/8 0 1/8 | 1/5 1/5 1/5 at edges, 1/3 1/3 at corners | 1/8 1/8 1/8 \ / stored are - a table of flat indices encoding neighbours of the cell whose flat index equals the row no in the table - two scaled copies of the appropriate weights which equal 1 / neighbour count """ global SPREAD_CACHE m, n = sh # m*n grid points, each with up to 8 neighbours: # tedious but straighforward indices = np.empty((m*n, 8), dtype=int) indices[n-1:, 1] = np.arange(m*n - (n-1)) # NE indices[:-(n+1), 3] = np.arange(n+1, m*n) # SE indices[:-(n-1), 5] = np.arange(n-1, m*n) # SW indices[n+1:, 7] = np.arange(m*n - (n+1)) # NW indices[n:, 0] = np.arange((m-1)*n) # N indices[:n, [0, 1, 7]] = -1 indices[:-1, 2] = np.arange(1, m*n) # E indices[n-1::n, 1:4] = -1 indices[:-n, 4] = np.arange(n, m*n) # S indices[-n:, 3:6] = -1 indices[1:, 6] = np.arange(m*n - 1) # W indices[::n, 5:] = -1 # weights are constant along rows, therefore m*n vector suffices nei_count = (indices > -1).sum(axis=-1) weights = 1 / nei_count SPREAD_CACHE[sh] = indices, weights, 8 * weights.reshape(sh) return indices, weights, 8 * weights.reshape(sh) def iadd_conv_ring(a, out): """add boundary corrected convolution of 2d array a with ring kernel 1/8 1/8 1/8 / 1/5 0 1/5 0 1/3 \ 1/8 0 1/8 | 1/5 1/5 1/5 at edges, 1/3 1/3 at corners | 1/8 1/8 1/8 \ / to out. IMPORTANT: out must be flat and have one spare field at its end which is used as a "/dev/NULL" by the indexing trickery if a is a tuple it is interpreted as a sparse representation of the form: (flat) indices, values, shape. """ if isinstance(a, tuple): # sparse ind, val, sh = a k_ind, k_wgt, __ \ = SPREAD_CACHE[sh] if sh in SPREAD_CACHE else precomp(sh) np.add.at(out, k_ind[ind, :], k_wgt[ind, None]*val[:, None]) else: # dense sh = a.shape k_ind, k_wgt, __ \ = SPREAD_CACHE[sh] if sh in SPREAD_CACHE else precomp(sh) np.add.at(out, k_ind, k_wgt[:, None]*a.ravel()[:, None]) return out # main function def level(input, threshold=0.6, rule='proportional', maxiter=1, switch_to_sparse_at=SPARSE_THRESH, use_conv_matrix=False, track_Euclidean_deltas=False): """spread supra-threshold mass to neighbours until equilibrium reached updates are simultaneous, iterations are capped at maxiter 'rule' can be 'proportional' or 'even' 'switch_to_sparse_at' and 'use_conv_matrix' influence speed but not result returns updated grid, convergence flag, vector of numbers of supratheshold cells for each iteration, runtime, [vector of Euclidean deltas] """ sh = input.shape m, n = sh nei_ind, rec_nc, rec_8 \ = SPREAD_CACHE[sh] if sh in SPREAD_CACHE else precomp(sh) if rule == 'proportional': def iteration(state, state_f): em = state > threshold nnz = em.sum() if nnz == 0: # no change, send signal to quit return nnz elif nnz < em.size * switch_to_sparse_at: # use sparse code ei = np.where(em.flat)[0] excess = state_f[ei] - threshold state_f[-1] = 0 exc_nei_sum = rec_nc[ei] * state_f[nei_ind[ei, :]].sum(axis=-1) exc_nei_ind = np.unique(nei_ind[ei, :]) if exc_nei_ind[0] == -1: exc_nei_ind = exc_nei_ind[1:] nm = exc_nei_sum != 0 state_swap = state_f[exc_nei_ind] state_f[exc_nei_ind] = 1 iadd_conv_ring((ei[nm], excess[nm] / exc_nei_sum[nm], sh), state_f) state_f[exc_nei_ind] *= state_swap iadd_conv_ring((ei[~nm], excess[~nm], sh), state_f) state_f[ei] -= excess elif use_conv_matrix: excess = np.where(em, state - threshold, 0) state_f[-1] = 0 nei_sum = (rec_nc * state_f[nei_ind].sum(axis=-1)).reshape(sh) nm = nei_sum != 0 pm = em & nm exc_p = np.where(pm, excess, 0) exc_p[pm] /= nei_sum[pm] wei_nei_sum = iadd_conv_ring(exc_p, np.zeros_like(state_f)) state += state * wei_nei_sum[:-1].reshape(sh) fm = em & ~nm exc_f = np.where(fm, excess, 0) iadd_conv_ring(exc_f, state_f) state -= excess else: excess = np.where(em, state - threshold, 0) nei_sum = conv_with_ring(state) # must special case the event of all neighbours being zero nm = nei_sum != 0 # these can be distributed proportionally: pm = em & nm # select, prenormalise by sum of masses of neighbours,... exc_p = np.where(pm, excess, 0) exc_p[pm] /= nei_sum[pm] # ...spread to neighbours and scale spread_p = state * conv_with_ring(exc_p) # these can't be distributed proportionally (because all # neighbours are zero); therefore fall back to even: fm = em & ~nm exc_f = np.where(fm, excess * rec_8, 0) spread_f = conv_with_ring(exc_f) state += spread_p + spread_f - excess return nnz elif rule == 'even': def iteration(state, state_f): em = state > threshold nnz = em.sum() if nnz == 0: # no change, send signal to quit return nnz elif nnz < em.size * switch_to_sparse_at: # use sparse code ei = np.where(em.flat)[0] excess = state_f[ei] - threshold iadd_conv_ring((ei, excess, sh), state_f) state_f[ei] -= excess elif use_conv_matrix: excess = np.where(em, state - threshold, 0) iadd_conv_ring(excess, state_f) state -= excess else: excess = np.where(em, state - threshold, 0) # prenormalise by number of neighbours, and spread spread = conv_with_ring(excess * rec_8) state += spread - excess return nnz else: raise ValueError('unknown rule: ' + rule) # master loop t0 = time.time() out_f = np.empty((m*n + 1,)) out = out_f[:m*n] out[:] = input.ravel() out.shape = sh nnz = [] if track_Euclidean_deltas: last = input E = [] for i in range(maxiter): nnz.append(iteration(out, out_f)) if nnz[-1] == 0: if track_Euclidean_deltas: return out, True, nnz, time.time() - t0, E + [0] return out, True, nnz, time.time() - t0 if track_Euclidean_deltas: E.append(np.sqrt(((last-out)**2).sum())) last = out.copy() if track_Euclidean_deltas: return out, False, nnz, time.time() - t0, E return out, False, nnz, time.time() - t0 # tests def check_simple(): A = np.zeros((6, 6)) A[[0, 1, 1, 4, 4], [0, 3, 5, 1, 5]] = 1.08 A[5, :] = 0.1 * np.arange(6) print('original') print(A) for rule in ('proportional', 'even'): print(rule) for lb, ucm, st in (('convolution', False, 0.001), ('matrix', True, 0.001), ('sparse', True, 0.999)): print(lb) print(level(A, rule=rule, switch_to_sparse_at=st, use_conv_matrix=ucm)[0]) def check_consistency(sh=(300, 400), n=20): print("""Running consistency checks with different solvers {} trials each {} x {} cells """.format(n, *sh)) data = np.random.random((n,) + sh) sums = data.sum(axis=(1, 2)) for th, lb in ((0.975, 'sparse'), (0.6, 'dense'), (0.975, 'sparse'), (0.6, 'dense'), (0.975, 'sparse'), (0.6, 'dense')): times = np.zeros((2, 3)) for d, s in zip (data, sums): for i, rule in enumerate(('proportional', 'even')): results = [] for j, (ucm, st) in enumerate ( ((False, 0.001), (True, 0.001), (True, 0.999))): res, conv, nnz, time = level( d, rule=rule, switch_to_sparse_at=st, use_conv_matrix=ucm, threshold=th) results.append(res) times[i, j] += time assert np.allclose(results[0], results[1]) assert np.allclose(results[1], results[2]) assert np.allclose(results[2], results[0]) assert np.allclose(s, [r.sum() for r in results]) print("""condition {} finished, no obvious errors; runtimes [sec]: convolution matrix sparse solver proportional {:13.7f} {:13.7f} {:13.7f} even {:13.7f} {:13.7f} {:13.7f} """.format(lb, *tuple(times.ravel()))) def check_convergence(sh=(300, 400), maxiter=100): data = np.random.random(sh) res, conv, nnz, time, Eucl = level(data, maxiter=maxiter, track_Euclidean_deltas=True) print('nnz:', nnz) print('delta:', Eucl) print('final length:', np.sqrt((res*res).sum())) print('ratio:', Eucl[-1] / np.sqrt((res*res).sum()))