Source code for openem.Detect.SSD

from openem.models import ImageModel
from openem.models import Preprocessor

import cv2
import numpy as np
import tensorflow as tf

from collections import namedtuple

from openem.Detect import Detection

[docs]class SSDDetector(ImageModel): preprocessor=Preprocessor(1.0, np.array([-103.939,-116.779,-123.68]), False)
[docs] def addImage(self, image, cookie=None): """ Add an image to process in the underlying ImageModel after running preprocessing on it specific to this model. image: np.array of the underlying image (not pre-processed) to add to the model's current batch. """ if cookie is None: cookie = {} cookie.update({"size": image.shape}) return self._addImage(image, self.preprocessor, cookie)
[docs] def process(self): """ Runs network to find fish in batched images by performing object detection with a Single Shot Detector (SSD). Returns a list of Detection (or None if batch is empty) """ batch_result, cookies = super(SSDDetector, self).process() if batch_result is None: return batch_result batch_detections=[] # Split out the tensor into bounding bboxes per image for image_idx,image_result in enumerate(batch_result): image_dims=cookies[image_idx]["size"] pred_stop = 4 conf_stop = image_result.shape[1] - 8 anc_stop = conf_stop + 4 var_stop = anc_stop + 4 loc = image_result[:,:pred_stop] conf = image_result[:,pred_stop:conf_stop] anchors = image_result[:,conf_stop:anc_stop] variances = image_result[:,anc_stop:var_stop] boxes = decodeBoxes(loc, anchors, variances, image_dims) scores=np.zeros(loc.shape[0]) class_index=np.zeros(loc.shape[0]) for idx,r in enumerate(conf): _,maxScore,__,maxIdx = cv2.minMaxLoc(r[1:]) scores[idx] = maxScore class_index[idx] = maxIdx[1] + 1 # +1 for background class indices = tf.image.non_max_suppression(boxes, scores, 200, 0.01, 0.45) detections = [] for idx in indices.eval(session=self.tf_session): detection = Detection( location = boxes[idx], confidence = scores[idx], species = class_index[idx], frame = None, video_id = None) detections.append(detection) def get_confidence(detection): return detection.confidence detections.sort(key=get_confidence, reverse=True) batch_detections.append(detections) return batch_detections
[docs]def decodeBoxes(loc, anchors, variances, img_size): """ Decodes bounding box from network output loc: Bounding box parameters one box per element anchors: Anchors box parameters, one box per element variances: Variances per box Returns a Nx4 matrix of bounding boxes """ decoded=[] image_height = img_size[0] image_width = img_size[1] anchor_width = anchors[:,2] - anchors[:,0] anchor_height = anchors[:,3] - anchors[:,1] anchor_center_x = 0.5 * (anchors[:,2] + anchors[:,0]) anchor_center_y = 0.5 * (anchors[:,3] + anchors[:,1]) decode_center_x = loc[:,0]*anchor_width*variances[:,0] decode_center_x += anchor_center_x decode_center_y = loc[:,1]*anchor_height*variances[:,1] decode_center_y += anchor_center_y decode_width = np.exp(loc[:,2]*variances[:,2])*anchor_width decode_height = np.exp(loc[:,3]*variances[:,3])*anchor_height decode_x0 = np.maximum((decode_center_x - 0.5 * decode_width) * image_width,0) decode_y0 = np.maximum((decode_center_y - 0.5 * decode_height) * image_height,0) decode_x1 = np.maximum((decode_center_x + 0.5 * decode_width) * image_width,0) decode_y1 = np.maximum((decode_center_y + 0.5 * decode_height) * image_height,0) decoded=np.zeros((len(decode_x0),4)) decoded[:,0] = decode_x0 decoded[:,1] = decode_y0 decoded[:,2] = decode_x1 - decode_x0 + 1 decoded[:,3] = decode_y1 - decode_y0 + 1 return decoded
""" #Using tensorflow nms instead --- this one required a function that did #not work in opencv so well. def nmsBoxes(bboxes, scores, score_threshold, nms_threshold, top_k = 0): \""" Performs non-maximum supression on a series of overlapping bounding boxes bboxes: list of bounding boxes score_threshold a threshold used to filter boxes by score. nms_threshold a threshold used in non maximum suppression. indices the kept indices of bboxes after NMS. top_k if `>0`, keep at most @p top_k picked indices. \""" # Sort the scores in descending order, create a new list keeping # indices and value in a tuple value score_sort_list=[] for idx,score in enumerate(scores): score_sort_list.append((idx,score)) # inline-function to get second element of tuple def second_element(val): return val[1] # Sort the new list by the 2nd element (score) in descending order score_sort_list.sort(key = second_element, reverse = True) # If only keeping top_k, truncate the list if top_k > 0: score_sort_list[:top_k] # Iterate over the scores/boxes and pick winners winning_indices=[] for idx_score in score_sort_list: keep = True for winner in winning_indices: if not keep: break overlap = 1.0 - cv2.jaccardDistance(idx_score[0], winner) keep = overlap <= nms_threshold if keep: winning_indices.append(idx_score[0]) return winning_indices """