ref: ddd80abd3f634326ffa8190a095db0015ea83713
dir: /tools/3D-Reconstruction/MotionEST/HornSchunck.py/
#!/usr/bin/env python # coding: utf-8 import numpy as np import numpy.linalg as LA from scipy.ndimage.filters import gaussian_filter from scipy.sparse import csc_matrix from scipy.sparse.linalg import inv from MotionEST import MotionEST """Horn & Schunck Model""" class HornSchunck(MotionEST): """ constructor: cur_f: current frame ref_f: reference frame blk_sz: block size alpha: smooth constrain weight sigma: gaussian blur parameter """ def __init__(self, cur_f, ref_f, blk_sz, alpha, sigma, max_iter=100): super(HornSchunck, self).__init__(cur_f, ref_f, blk_sz) self.cur_I, self.ref_I = self.getIntensity() #perform gaussian blur to smooth the intensity self.cur_I = gaussian_filter(self.cur_I, sigma=sigma) self.ref_I = gaussian_filter(self.ref_I, sigma=sigma) self.alpha = alpha self.max_iter = max_iter self.Ix, self.Iy, self.It = self.intensityDiff() """ Build Frame Intensity """ def getIntensity(self): cur_I = np.zeros((self.num_row, self.num_col)) ref_I = np.zeros((self.num_row, self.num_col)) #use average intensity as block's intensity for i in xrange(self.num_row): for j in xrange(self.num_col): r = i * self.blk_sz c = j * self.blk_sz cur_I[i, j] = np.mean(self.cur_yuv[r:r + self.blk_sz, c:c + self.blk_sz, 0]) ref_I[i, j] = np.mean(self.ref_yuv[r:r + self.blk_sz, c:c + self.blk_sz, 0]) return cur_I, ref_I """ Get First Order Derivative """ def intensityDiff(self): Ix = np.zeros((self.num_row, self.num_col)) Iy = np.zeros((self.num_row, self.num_col)) It = np.zeros((self.num_row, self.num_col)) sz = self.blk_sz for i in xrange(self.num_row - 1): for j in xrange(self.num_col - 1): """ Ix: (i ,j) <--- (i ,j+1) (i+1,j) <--- (i+1,j+1) """ count = 0 for r, c in {(i, j + 1), (i + 1, j + 1)}: if 0 <= r < self.num_row and 0 < c < self.num_col: Ix[i, j] += ( self.cur_I[r, c] - self.cur_I[r, c - 1] + self.ref_I[r, c] - self.ref_I[r, c - 1]) count += 2 Ix[i, j] /= count """ Iy: (i ,j) (i ,j+1) ^ ^ | | (i+1,j) (i+1,j+1) """ count = 0 for r, c in {(i + 1, j), (i + 1, j + 1)}: if 0 < r < self.num_row and 0 <= c < self.num_col: Iy[i, j] += ( self.cur_I[r, c] - self.cur_I[r - 1, c] + self.ref_I[r, c] - self.ref_I[r - 1, c]) count += 2 Iy[i, j] /= count count = 0 #It: for r in xrange(i, i + 2): for c in xrange(j, j + 2): if 0 <= r < self.num_row and 0 <= c < self.num_col: It[i, j] += (self.ref_I[r, c] - self.cur_I[r, c]) count += 1 It[i, j] /= count return Ix, Iy, It """ Get weighted average of neighbor motion vectors for evaluation of laplacian """ def averageMV(self): avg = np.zeros((self.num_row, self.num_col, 2)) """ 1/12 --- 1/6 --- 1/12 | | | 1/6 --- -1/8 --- 1/6 | | | 1/12 --- 1/6 --- 1/12 """ for i in xrange(self.num_row): for j in xrange(self.num_col): for r, c in {(-1, 0), (1, 0), (0, -1), (0, 1)}: if 0 <= i + r < self.num_row and 0 <= j + c < self.num_col: avg[i, j] += self.mf[i + r, j + c] / 6.0 for r, c in {(-1, -1), (-1, 1), (1, -1), (1, 1)}: if 0 <= i + r < self.num_row and 0 <= j + c < self.num_col: avg[i, j] += self.mf[i + r, j + c] / 12.0 return avg def motion_field_estimation(self): count = 0 """ u_{n+1} = ~u_n - Ix(Ix.~u_n+Iy.~v+It)/(IxIx+IyIy+alpha^2) v_{n+1} = ~v_n - Iy(Ix.~u_n+Iy.~v+It)/(IxIx+IyIy+alpha^2) """ denom = self.alpha**2 + np.power(self.Ix, 2) + np.power(self.Iy, 2) while count < self.max_iter: avg = self.averageMV() self.mf[:, :, 1] = avg[:, :, 1] - self.Ix * ( self.Ix * avg[:, :, 1] + self.Iy * avg[:, :, 0] + self.It) / denom self.mf[:, :, 0] = avg[:, :, 0] - self.Iy * ( self.Ix * avg[:, :, 1] + self.Iy * avg[:, :, 0] + self.It) / denom count += 1 self.mf *= self.blk_sz def motion_field_estimation_mat(self): row_idx = [] col_idx = [] data = [] N = 2 * self.num_row * self.num_col b = np.zeros((N, 1)) for i in xrange(self.num_row): for j in xrange(self.num_col): """(IxIx+alpha^2)u+IxIy.v-alpha^2~u IxIy.u+(IyIy+alpha^2)v-alpha^2~v""" u_idx = i * 2 * self.num_col + 2 * j v_idx = u_idx + 1 b[u_idx, 0] = -self.Ix[i, j] * self.It[i, j] b[v_idx, 0] = -self.Iy[i, j] * self.It[i, j] #u: (IxIx+alpha^2)u row_idx.append(u_idx) col_idx.append(u_idx) data.append(self.Ix[i, j] * self.Ix[i, j] + self.alpha**2) #IxIy.v row_idx.append(u_idx) col_idx.append(v_idx) data.append(self.Ix[i, j] * self.Iy[i, j]) #v: IxIy.u row_idx.append(v_idx) col_idx.append(u_idx) data.append(self.Ix[i, j] * self.Iy[i, j]) #(IyIy+alpha^2)v row_idx.append(v_idx) col_idx.append(v_idx) data.append(self.Iy[i, j] * self.Iy[i, j] + self.alpha**2) #-alpha^2~u #-alpha^2~v for r, c in {(-1, 0), (1, 0), (0, -1), (0, 1)}: if 0 <= i + r < self.num_row and 0 <= j + c < self.num_col: u_nb = (i + r) * 2 * self.num_col + 2 * (j + c) v_nb = u_nb + 1 row_idx.append(u_idx) col_idx.append(u_nb) data.append(-1 * self.alpha**2 / 6.0) row_idx.append(v_idx) col_idx.append(v_nb) data.append(-1 * self.alpha**2 / 6.0) for r, c in {(-1, -1), (-1, 1), (1, -1), (1, 1)}: if 0 <= i + r < self.num_row and 0 <= j + c < self.num_col: u_nb = (i + r) * 2 * self.num_col + 2 * (j + c) v_nb = u_nb + 1 row_idx.append(u_idx) col_idx.append(u_nb) data.append(-1 * self.alpha**2 / 12.0) row_idx.append(v_idx) col_idx.append(v_nb) data.append(-1 * self.alpha**2 / 12.0) M = csc_matrix((data, (row_idx, col_idx)), shape=(N, N)) M_inv = inv(M) uv = M_inv.dot(b) for i in xrange(self.num_row): for j in xrange(self.num_col): self.mf[i, j, 0] = uv[i * 2 * self.num_col + 2 * j + 1, 0] * self.blk_sz self.mf[i, j, 1] = uv[i * 2 * self.num_col + 2 * j, 0] * self.blk_sz