detectedOBJprofiler のソースコード

#!
# coding: utf-8
# Copyright (C) 2016 TOPS SYSTEMS
### @file detectedOBJprofiler.py
### @brief detected OBJ profiler
###
### STMRFの移動物体検出結果について
### ①クラスタリング前のブロック毎検出マップを画像にオーバレイして表示する
### ②クラスタリング後の識別オブジェクト毎に画像を切り出す
### ③②で切り出した画像についてHOG計算を行って出力する
### ④DNN用の固定サイズ画像を出力
###   DNN用の固定サイズ画像を出力する場合にHOG系のログ出力を抑止し、DNN用の情報を出力するオプションを追加
###
### Contact: izumida@topscom.co.jp
###
### @author: M.Izumida
### @date: January 18, 2016
###
## v01r03 NA mode
## v01r04 BB mode
## v01r05 AI mode (Fixed size picture extraction mode, extended.) -F flag
## v01r06 AI mode -a flag
##
# Written for Python 2.7 (NOT FOR 3.x)
#=======================================================================
# インポート宣言
from __future__ import division
import sys
import re
import argparse
import os
from datetime import datetime
import math
import pickle
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
#=======================================================================
# バージョン文字列
versionSTR = "detectedOBJprofiler.py v01r06 Tops Systems (pgm by mpi)"
#=======================================================================
# 共通サブルーチン
#-------------------------------------------------------------------
def errPrint(mes):
    """errPrint.

    エラー出力へのメッセージ表示
    後の始末はその後で別に書くこと
    """
    sys.stderr.write(mes)
    sys.stderr.write('\n')

#-------------------------------------------------------------------
def stdExceptionHandler(mes):
    """standard Exception Handler.

    エラーメッセージを送出し、デバッグのための情報を出力する
    """
    errPrint("Exception Captured: " + str(mes))
    errPrint("0:" + str(sys.exc_info()[0]))
    errPrint("1:" + str(sys.exc_info()[1]))
    errPrint("2:" + str(sys.exc_info()[2]))
#-------------------------------------------------------------------
# B2Dメソッド
def b2d(LL, LH, HL, HH):
    """Byte 2 Dword
    
    バイト整数4個を受け取り、Dワード整数に合成して返却する
    """
    return LL + LH * 256 + HL * 65536 + HH * 16777216

#-------------------------------------------------------------------
# ダンプパラメータ抽出メソッド
def dump2data(st, opt):
    """Dump string to data convertor
    
    以下形式の16進ダンプについて
    60083400  63 63 e8 e8 e8 e8 e8 00 08 f8 f8 e8 e8 e8 00 00
    opt=0ならば符号付きバイト配列
    opt=1ならば符号なしワード配列
    opt=2ならば符号なしバイト配列
    それ以外ならば符号なしDワード配列
    として解釈し、その結果(整数)をリストとして返す。
    なお、リストの要素0はアドレスであり、要素1以降にデータが入る
    解釈不能な場合は空リストを返す
    """
    work = []
    p0 = re.match('([0-9a-fA-F]{8})  ([0-9a-fA-F ]+)', st)
    try:
        if p0:
            adr = int(p0.group(1), 16)
            dat = p0.group(2).split()
            dLen = len(dat)
            if (opt==1) and ((dLen % 2) == 1):
                return []
            work.append(adr)
            if opt==0:
                for i in range(0,dLen):
                    temp = int(dat[i], 16)
                    temp = temp if temp < 127 else temp - 256
                    work.append(temp)
            elif opt==1:
                for i in range(0,(dLen // 2)):
                    temp = int(dat[i*2], 16) + int(dat[i*2+1], 16) * 256
                    work.append(temp)
            elif opt==2:
                for i in range(0,dLen):
                    temp = int(dat[i], 16)
                    work.append(temp)
            else:
                for i in range(0,(dLen // 4)):
                    temp = b2d(int(dat[i*2], 16), int(dat[i*2+1], 16), int(dat[i*2+2], 16), int(dat[i*2+3], 16))
                    work.append(temp)
    except:
        return []
    return work

#-------------------------------------------------------------------
def tryIntParse(st, dval, radix=10):
    """try Parse string to Integer

    文字列stをパースして整数化できれば値を返す、できなければデフォルト値dvalを返す
    """
    try:
        work = int(st, radix)
    except:
        return dval
    return work

#-------------------------------------------------------------------
def repExt(fname, newext):
    """Relpace Extension

    fnameの拡張子部分をnewextでリプレースした文字列を返す
    """
    tempName, ext = os.path.splitext(os.path.basename(fname))
    return tempName + newext

#-------------------------------------------------------------------
def satByte(fnum):
    """Saturation BYTE

    byte範囲に飽和させる
    """
    inum = int(fnum)
    if inum < 0:
        inum = 0
    elif inum > 255:
        inum = 255
    return inum

#=======================================================================
# LOGファイルリーダクラス
[ドキュメント]class logReader: """LOG File Reader Class. LOGファイルを読み取って numpy 配列形式で記録するクラス 配列サイズを dWidth, dHeightで与える """ #-------------------------------------------------------------------- def __init__(self, fnam, dWidth, dHeight, namode, bbmode): """LOG File Reader Constructor コンストラクタ """ self.logFname = fnam self.dWidth = dWidth if dWidth > 0 else 1 self.dHeight = dHeight if dHeight > 0 else 1 # 検出されたBLOCKをマークする配列 self.blkArray = np.zeros([self.dHeight, self.dWidth]) # クラスタリング済オブジェクトを保持する self.numObj = 0 self.objData = dict() #オブジェクト番号に対してその座標をリストで保持する # self.OBJLIS_S = 0x600FDC90 self.OBJLIS_E = 0x600FDD90 # OBJLIS_S = 0x600FDC88 # OBJLIS_E = 0x600FDD88 self.namode = namode # self.bbmode = bbmode if self.bbmode: self.BM_S = 0x600FD640 self.BM_E = 0x600FDC80 else: self.BM_S = 0x600FD000 self.BM_E = 0x600FD640 #デバッグ self.debug = False self.verbose = False #--------------------------------------------------------------------
[ドキュメント] def read(self): """read method ファイルから行をリードして処理するメソッド """ try: with open(self.logFname, 'r') as f: for line in f: if self.rLine(line): break except: stdExceptionHandler("ERROR: in file reading = " + self.logFname) return False return True
#--------------------------------------------------------------------
[ドキュメント] def rLine(self, lin): """Read Line method 読み取った1行を処理するメソッド 常にFalseを返す。 (エラーが定義されたらTrueを返す) """ lis = dump2data(lin, 2) if len(lis) < 2: return False #ダンプ形式でない行は無視して先に進める。 # BLOCKMATCH_BUFであればBLOCK配列として取り出す if (lis[0] >= self.BM_S) and (lis[0] < self.BM_E): ix = (lis[0] - self.BM_S) % 64 iy = (lis[0] - self.BM_S) // 64 for idx in range(1, len(lis)): self.blkArray[iy, ix+(idx-1)] = 1 if lis[idx] != 0 else 0 if self.debug: if self.bbmode: print "BLOCK_BUF_B=({0}, {1}), numData={2}".format(ix, iy, len(lis)-1) else: print "BLOCK_BUF_A=({0}, {1}), numData={2}".format(ix, iy, len(lis)-1) # OBJECT_BUF_Aであればオブジェクト配列として取り出す if (lis[0] == 0x600FDC80) and (len(lis) > 16): self.numObj = b2d(lis[5], lis[6], lis[7], lis[8]) if self.debug: print "NUM_OBJ={0}".format(self.numObj) if self.namode: self.saveX = b2d(lis[9], lis[10], lis[11], lis[12]) // 3 self.saveY = b2d(lis[13], lis[14], lis[15], lis[16]) // 3 if (lis[0] >= self.OBJLIS_S) and (lis[0] < self.OBJLIS_E) and (len(lis) > 16): bNumber = (lis[0] - self.OBJLIS_S) // 16 if self.namode: startX = self.saveX startY = self.saveY endX = b2d(lis[1], lis[2], lis[3], lis[4]) // 3 endY = b2d(lis[5], lis[6], lis[7], lis[8]) // 3 self.saveX = b2d(lis[9], lis[10], lis[11], lis[12]) // 3 self.saveY = b2d(lis[13], lis[14], lis[15], lis[16]) // 3 else: startX = b2d(lis[1], lis[2], lis[3], lis[4]) // 3 startY = b2d(lis[5], lis[6], lis[7], lis[8]) // 3 endX = b2d(lis[9], lis[10], lis[11], lis[12]) // 3 endY = b2d(lis[13], lis[14], lis[15], lis[16]) // 3 self.objData[bNumber] = [startX, startY, endX, endY] if self.debug: print "OBJ_COD({0}, {1}, {2}, {3})".format(startX, startY, endX, endY) return False
#--------------------------------------------------------------------
[ドキュメント] def dumpBlockMatchBuf(self): """dump Block Match Buffer ブロックマッチバッファをダンプする """ for iy in range(0, self.dHeight): for ix in range(0, self.dWidth): if self.blkArray[iy, ix] == 0: sys.stdout.write("_") else: sys.stdout.write("*") print
#--------------------------------------------------------------------
[ドキュメント] def saveDetectedOBJs(self, fnam): """save Detected Object cord. 検出したクラスタリング済オブジェクトの座標をpickleフォーマットで出力する。 """ work = [self.numObj, self.objData] try: with open(fnam, 'w') as f: pickle.dump(work, f) except: stdExceptionHandler("Error: in file writing = " + fnam) sys.exit(1)
#======================================================================= # ブロックHOGクラス
[ドキュメント]class BlockHog: """BlockHog Class. 1ブロック(8x8ピクセル)分のHOG(9方向)ヒストグラムを求めるクラス x軸方向上下π/8をビン1とし、 π/4毎左まわりに2,3,4、5(x軸負方向)、6、7、8 と数える。ビン0はフラットで勾配が計算できないものとする。 opt=0: atan2を使って計算する opt=1: 整数演算の範囲で求める """ #-------------------------------------------------------------------- def __init__(self, img, stX, stY, opt=0): """Block Hog Constructor コンストラクタ """ self.img = img self.stX = stX self.stY = stY self.opt = opt # self.hog = dict() # 要素0: 勾配なし、要素1~8:勾配あり #block data store self.block = np.zeros([8, 8, 2]) #Y, X, 0=gy, 1=gx self.bAVG = 0 # for i in range(0, 9): self.hog[i] = 0 # self.th1 = math.pi / 8 self.th2 = math.pi* 3 / 8 self.th3 = math.pi* 5 / 8 self.th4 = math.pi* 7 / 8 #デバッグ self.debug = False self.verbose = False #--------------------------------------------------------------------
[ドキュメント] def getBinA(self, gx, gy): """get Bin (atan2) gx, gyからBINを決める """ if gx == 0: if gy > 0: return 3 elif gy < 0: return 7 else: return 0 if gy == 0: if gx > 0: return 1 else: return 5 radIdx = math.atan2(gy, gx) if radIdx >= 0: if radIdx < self.th1: return 1 elif radIdx < self.th2: return 2 elif radIdx < self.th3: return 3 elif radIdx < self.th4: return 4 else: return 5 else: radIdx = - radIdx if radIdx < self.th1: return 1 elif radIdx < self.th2: return 8 elif radIdx < self.th3: return 7 elif radIdx < self.th4: return 6 else: return 5
#--------------------------------------------------------------------
[ドキュメント] def getBinB(self, gx, gy): """get Bin (table) gx, gyからBINを決める """ # gx, gyの正規化 px = abs(gx) py = abs(gy) px = px if px > self.bAVG else 0 py = py if py > self.bAVG else 0 temp = max(px, py) if temp > 64: px >>= 4 py >>= 4 elif temp > 32: px >>= 3 py >>= 3 elif temp > 16: px >>= 2 py >>= 2 elif temp > 8: px >>= 1 py >>= 1 if px == 0: if py == 0: return 0 elif gy > 0: return 3 elif gy < 0: return 7 if py == 0: if gx > 0: return 1 else: return 5 if (gx >= 0) and (gy >= 0): if py > 4: if px < 3: return 3 else: return 2 elif py > 2: if px < 2: return 3 else: return 2 elif py > 1: if px < 5: return 2 else: return 1 else: if px < 3: return 2 else: return 1 elif (gx < 0) and (gy >= 0): if py > 4: if px < 3: return 3 else: return 4 elif py > 2: if px < 2: return 3 else: return 4 elif py > 1: if px < 5: return 4 else: return 5 else: if px < 3: return 4 else: return 5 elif (gx < 0) and (gy < 0): if py > 4: if px < 3: return 7 else: return 6 elif py > 2: if px < 2: return 7 else: return 6 elif py > 1: if px < 5: return 6 else: return 5 else: if px < 3: return 6 else: return 5 else: if py > 4: if px < 3: return 7 else: return 8 elif py > 2: if px < 2: return 7 else: return 8 elif py > 1: if px < 5: return 8 else: return 1 else: if px < 3: return 8 else: return 1
#--------------------------------------------------------------------
[ドキュメント] def getHog(self): """get Hog HOGを求める """ temp_sum = 0 for py in range(1, 7): for px in range(1, 7): self.block[py, px, 0] = self.img[self.stX + px, self.stY + py + 1] - self.img[self.stX + px, self.stY + py - 1] self.block[py, px, 1] = self.img[self.stX + px + 1, self.stY + py] - self.img[self.stX + px - 1, self.stY + py] temp_sum += abs(self.block[py, px, 0]) + abs(self.block[py, px, 1]) self.bAVG = int( temp_sum / 128 ) # 周辺を0とした平均 for py in range(1, 7): for px in range(1, 7): if self.opt == 0: bin = self.getBinA(int(self.block[py, px, 1]), int(self.block[py, px, 0])) else: bin = self.getBinB(int(self.block[py, px, 1]), int(self.block[py, px, 0])) self.hog[bin] = self.hog[bin] + 1
#--------------------------------------------------------------------
[ドキュメント] def dumpHog(self): """dump Hog Hog 集計結果をダンプする """ for bin in range(0, 9): print "{0:2d}".format(self.hog[bin]), print
#--------------------------------------------------------------------
[ドキュメント] def typeHog(self): """type Hog Hog 集計結果を分類する 0 : フラット 1 : 縦優勢 2 : 横優勢 3 : 斜め優勢 4 : 混沌 """ if self.hog[0] > 18: return 0 vElem = self.hog[3] + self.hog[7] hElem = self.hog[1] + self.hog[5] oElem = self.hog[2] + self.hog[4] + self.hog[6] + self.hog[8] if vElem > (hElem + oElem): return 1 if hElem > (vElem + oElem): return 2 if oElem > (vElem + hElem): return 3 return 4
#======================================================================= # イメージファイルリーダライタクラス
[ドキュメント]class ImageFileRWriter: """Image File Reader/Writer Class. イメージファイルを読み込み、ブロック配列にしたがって加工した後 ファイルに書き戻すためのクラス """ #-------------------------------------------------------------------- def __init__(self, ifnam, basName, currName, blkArray, dWidth, dHeight, numObj, objData): """Image File Reader/Writer Constructor コンストラクタ """ self.iFname = ifnam self.basName = basName self.currName = currName self.oFname = currName + "_BLK.png" self.blkArray = blkArray self.dWidth = dWidth self.dHeight = dHeight self.numObj = numObj self.objData = objData #デバッグ self.debug = False self.verbose = False # self.typeH = False # self.DNN = False #--------------------------------------------------------------------
[ドキュメント] def load(self): """Image File Reader イメージファイルのリーダ """ try: self.im = Image.open(self.iFname) self.iformat = self.im.format self.isize = self.im.size self.imode = self.im.mode self.iPix = self.im.load() # self.cm = Image.open(self.currName) self.cformat = self.cm.format self.csize = self.cm.size self.cmode = self.cm.mode self.cPix = self.cm.load() # self.ready = True # self.oXc = self.isize[0] self.oYc = self.isize[1] self.oPic = Image.new("L",(self.oXc, self.oYc)) self.oPix = self.oPic.load() except: stdExceptionHandler("Error: reading image file = " + self.iFname) return False return True
#--------------------------------------------------------------------
[ドキュメント] def save(self): """Image File Writer イメージファイルのライタ """ try: self.oPic.save(self.oFname) except: stdExceptionHandler("Error: writing image file = " + self.oFname) return False return True
#--------------------------------------------------------------------
[ドキュメント] def draw(self): """Drawing image method ブロック配列に従ってイメージを加工する curr ->加工-> oPic """ for py in range(0, self.oYc): for px in range(0, self.oXc): self.oPix[px, py] = self.cPix[px, py] for iy in range(0, self.dHeight): for ix in range(0, self.dWidth): if self.blkArray[iy, ix] != 0: xStart = ix * 8 + 64 yStart = iy * 8 + 80 for py in range(yStart, yStart+8): for px in range(xStart, xStart+8): self.oPix[px, py] = satByte(self.cPix[px, py]*2) if self.verbose: print "BLOCK = ({0}, {1})".format(ix, iy) hog = BlockHog(self.cPix, xStart, yStart, 1) hog.getHog() hog.dumpHog()
#--------------------------------------------------------------------
[ドキュメント] def extract(self): """Extract Objects オブジェクト配列に従ってイメージを抽出する iPix ->抽出->tempPix """ for objNumber in range(0, self.numObj): oName = self.basName + "_OBJ_" + str(objNumber) + ".png" sX, sY, eX, eY = self.objData[objNumber] xW = eX - sX yH = eY - sY tempPic = Image.new("L",(xW, yH)) tempPix = tempPic.load() for py in range(0, yH): for px in range(0, xW): tempPix[px, py] = self.iPix[px+sX, py+sY] if self.typeH: print "{0} ({1}, {2})".format(oName, xW, yH), else: print "{0} ({1}, {2})".format(oName, xW, yH) self.hogStat(tempPix, xW, yH) try: tempPic.save(oName) except: stdExceptionHandler("Error: writing image file = " + oName) return False return True
#--------------------------------------------------------------------
[ドキュメント] def extractF(self, opt): """Extract Fixed Size Objects オブジェクト配列に従ってイメージを抽出するが、固定サイズ 選択領域ほぼ中央部の opt=0 32pix x 64pix opt=1 64pix x 64pix iPix ->抽出->tempPix """ if opt==0: xW = 32 yH = 64 yH2 = 32 else: xW = 64 yH = 64 yH2 = 32 for objNumber in range(0, self.numObj): oName = self.basName + "_FIX_" + str(objNumber) + ".png" sX, sY, eX, eY = self.objData[objNumber] if self.DNN: print "{0} : {1}, {2}, {3}, {4}".format(oName, sX, sY, eX, eY) centerX = ((eX - sX) // 2) + sX centerY = ((eY - sY) // 2) + sY sX = (centerX // 32) * 32 sY = centerY - yH2 tempPic = Image.new("L",(xW, yH)) tempPix = tempPic.load() for py in range(0, yH): for px in range(0, xW): tempPix[px, py] = self.iPix[px+sX, py+sY] if not self.DNN: if self.typeH: print "{0} ({1}, {2})".format(oName, xW, yH), else: print "{0} ({1}, {2})".format(oName, xW, yH) self.hogStat(tempPix, xW, yH) try: tempPic.save(oName) except: stdExceptionHandler("Error: writing image file = " + oName) return False return True
#--------------------------------------------------------------------
[ドキュメント] def hogStat(self, img, xW, yH): """hog Statisticss 抽出したイメージの各ブロックについてhogを求めさらに統計処理する """ stat = dict() for i in range(0, 5): stat[i] = 0 bW = xW // 8 bH = yH // 8 typeImage = [] for iy in range(0, bH): typeX = [] for ix in range(0, bW): hog = BlockHog(img, ix*8, iy*8, 1) hog.getHog() if self.typeH: tHog = hog.typeHog() stat[tHog] += 1 typeX.append(self.typeHog2char(tHog)) else: hog.dumpHog() typeImage.append(typeX) if self.typeH: for i in range(0, 5): print "{0:3d}".format(stat[i]), print for iy in range(0, bH): xLis = typeImage[iy] for ix in range(0, bW): sys.stdout.write(xLis[ix]) print
#--------------------------------------------------------------------
[ドキュメント] def typeHog2char(self, tHog): """Hog type 2 character tHog値を図形キャラクタに変換する """ if tHog==0: return "□" elif tHog==1: return "―" # 勾配が縦なら画像は横縞の筈 elif tHog==2: return "|" # 勾配が横なら画像は縦縞の筈 elif tHog==3: return "/" return "■"
#======================================================================= # メインプログラム def main(): """main. メインプログラム """ #----------------------------------------------------------------------- # コマンドラインオプション処理 # parser = argparse.ArgumentParser(description='detectedOBJprofiler.') parser.add_argument('--LOG', nargs=1, help='Specify LOG file name.') parser.add_argument('--PREV', nargs=1, help='Specify PREV PICTURE base name for DISP/HOG.') parser.add_argument('--CURR', nargs=1, help='Specify CURR PICTURE base name for BLOCKMATCH.') parser.add_argument('--W', nargs=1, help='Data Width (default=64).') parser.add_argument('--H', nargs=1, help='Data Height (default=25).') parser.add_argument('-b', dest='bbMode', help='DRAW BLOCKMATCH BUF B mode.', action='store_true', default=False) parser.add_argument('-n', dest='naMode', help='NON-ADAPTIVE mode.', action='store_true', default=False) parser.add_argument('-t', dest='type', help='Print Type of Hog.', action='store_true', default=False) parser.add_argument('-f', dest='fixed', help='Fixed size Object mode(32x64).', action='store_true', default=False) parser.add_argument('-F', dest='fx64', help='Fixed size Object mode(64x64).', action='store_true', default=False) parser.add_argument('-a', dest='DNN', help='DNN log mode, must be used with -F.', action='store_true', default=False) parser.add_argument('-d', dest='debug', help='Debug mode, print debug information.', action='store_true', default=False) parser.add_argument('-v', dest='verbose', help='Verbose mode.', action='store_true', default=False) parser.add_argument('-V', dest='VERSION', help='Show Version, then exit', action='store_true', default=False) args = parser.parse_args() #----------------------------------------------------------------------- # Version 表示 # print versionSTR if args.VERSION: sys.exit(0) #----------------------------------------------------------------------- # ファイル名処理 # if args.PREV is None: errPrint('ERROR: NO PREV image file!!!') sys.exit(1) else: basName = args.PREV[0] iFname = basName + ".pgm" if not os.path.isfile(iFname): errPrint('ERROR: PREV PGM file, NOT EXIST.') sys.exit(1) if args.CURR is None: errPrint('ERROR: NO CURR image file!!!') sys.exit(1) else: currName = args.CURR[0] cFname = currName + ".pgm" if not os.path.isfile(cFname): errPrint('ERROR: CURR PGM file, NOT EXIST.') sys.exit(1) if args.LOG is None: errPrint('ERROR: NO input LOG file!!!') sys.exit(1) else: logFname = args.LOG[0] if not os.path.isfile(logFname): errPrint('ERROR: LOG file, NOT EXIST.') sys.exit(1) #----------------------------------------------------------------------- # パラメータ処理 if args.W is None: dWidth = 64 else: dWidth = tryIntParse(args.W[0], 64) if args.H is None: dHeight = 25 else: dHeight = tryIntParse(args.H[0], 25) #----------------------------------------------------------------------- # 実処理 # data = logReader(logFname, dWidth, dHeight, args.naMode, args.bbMode) data.verbose = args.verbose data.debug = args.debug if data.read(): data.saveDetectedOBJs(basName + "_obj.pickle") if args.verbose: data.dumpBlockMatchBuf() image = ImageFileRWriter(iFname, basName, cFname, data.blkArray, data.dWidth, data.dHeight, data.numObj, data.objData) image.verbose = args.verbose image.debug = args.debug image.typeH = args.type image.DNN = args.DNN if image.load(): if args.fixed: image.extractF(0) elif args.fx64: image.extractF(1) else: image.draw() image.save() image.extract() else: sys.exit(1) #終了メッセージ today = datetime.today() print " " print today.strftime("FINISH: %Y/%m/%d %H:%M:%S") #----------------------------------------------------------------------- # 正常終了 # sys.exit(0) #======================================================================= # メインプログラムの起動 if __name__ == "__main__": main()