Source code for openem.Count

""" Module for finding keyframes """
import tensorflow as tf
import numpy as np
import cv2
import math

from openem.models import ImageModel
from openem.models import Preprocessor
from openem.image import crop

KEYFRAME_OFFSET = 32
MIN_SPACING = 1
PEAK_THRESHOLD = 0.03
AREA_THRESHOLD = 0.10

def peak_sum(array, idx, width):
    sum_value = array[idx]
    if idx - width > 0:
        sum_value += array[idx-width]
    if idx + width < len(array):
        sum_value += array[idx+width]
    return sum_value

[docs]class KeyframeFinder: """ Model to find keyframes of a given species """ def __init__(self, model_path, img_width, img_height, gpu_fraction=1.0): """ Initialize a keyframe finder model. Gives a list of keyframes for each species. Caveats of this model: - Assumes tracking 1 classification/detection per frame model_path : str or path-like object Path to the frozen protobuf of the tensorflow graph img_width: Width of the image input to detector (pixels) img_height: Height of image input to decttor (pixels) gpu_fraction : float Fraction of GPU allowed to be used by this object. """ # Create session first with requested gpu_fraction parameter config = tf.compat.v1.ConfigProto() config.gpu_options.allow_growth = True config.gpu_options.per_process_gpu_memory_fraction = gpu_fraction self.tf_session = tf.compat.v1.Session(config=config) with tf.io.gfile.GFile(model_path, 'rb') as graph_file: # Load graph off of disk into a graph definition graph_def = tf.compat.v1.GraphDef() graph_def.ParseFromString(graph_file.read()) self.input_tensor, self.output_tensor = tf.import_graph_def( graph_def, return_elements=['input_1:0', 'cumsum_values_1:0']) self.img_width = img_width self.img_height = img_height
[docs] def process(self, classifications, detections): """ Process the list of classifications and detections, which must be the same length. The outer dimension in each parameter is a frame; and the inner a list of detection or classification in a given frame classifications: list of list of openem.Classify.Classfication detections: list of list of openem.Detect.Detection """ det_len = len(detections) if len(classifications) != len(detections): raise Exception("Classifications / Detections difer in length!") sequence_length = self.sequenceSize() sequence_count = math.ceil(det_len / sequence_length) overall_keyframes = [] sequences=[] # The sequence of features to run the network on seq_class=[] # the underlying classifictions for each sequence # Iterate over each sequence to generate a batch request for sequence_idx in range(sequence_count): start_idx = sequence_idx*sequence_length end_idx = max((sequence_idx+1)*sequence_length,len(detections)) classification_sublist=classifications[start_idx:end_idx] detections_sublist=detections[start_idx:end_idx] sequences.append(self._generateSequence(classification_sublist, detections_sublist)) seq_class.append(classification_sublist) # Now that each sequence is setup, run the network on all of them # TODO: May have practical limitations here if we have a huge # video return self._processSequences(sequences, seq_class)
[docs] def sequenceSize(self): """ Returns the effective number of frames one can process in an individual sequence """ return int(self.input_tensor.shape[1]-(KEYFRAME_OFFSET*2))
def _normalizeDetection(self, detection): """ Normalize a detection coordinates to be relative coordinates """ return np.array([detection[0] / self.img_width, detection[1] / self.img_height, detection[2] / self.img_width, detection[3] / self.img_height]) def _generateSequence(self, classifications, detections): """ Handle an individual sequence, returns frame offsets unique to that sequence based on the output length. """ det_len = len(detections) # Convert classifications and detections to input required for network seq_len = int(self.input_tensor.shape[1]) fea_len = int(self.input_tensor.shape[2]) input_data = np.zeros((seq_len,fea_len)) # Add padding before and after sequence based on KEYFRAME_OFFSET input_data[:KEYFRAME_OFFSET,0] = np.ones(KEYFRAME_OFFSET) input_data[det_len:det_len+KEYFRAME_OFFSET,0] = np.ones(KEYFRAME_OFFSET) # Iterate through each frame of the data for idx, frame_detections in enumerate(detections): # We have already padded before and after seq_idx = idx + KEYFRAME_OFFSET # Skip through frames with no detections if len(frame_detections) == 0: input_data[seq_idx][0] = 1.0 continue detection = frame_detections[0] classification = classifications[idx][0] # Do a size check on input # We expect either 1 or 2 models per sequence num_species = len(classification.species) num_cover = len(classification.cover) num_loc = len(detection.location) num_fea = num_species + num_cover + num_loc + 2 num_of_models = int(fea_len / num_fea) if num_of_models != 2 and num_of_models != 1: raise Exception('Bad Feature Length') # Layout of the feature is: # Species, Cover, Normalized Location, Confidence, SSD Species # Optional duplicate for model_idx in range(num_of_models): # Calculate indices of vector based on model_idx fea_idx = model_idx * num_fea species_stop = fea_idx + num_species cover_stop = species_stop + num_cover loc_stop = cover_stop + num_loc ssd_conf = loc_stop ssd_species = ssd_conf + 1 input_data[seq_idx,fea_idx:species_stop] = \ classification.species input_data[seq_idx,species_stop:cover_stop] = \ classification.cover input_data[seq_idx,cover_stop:loc_stop] = \ self._normalizeDetection(detection.location) input_data[seq_idx, ssd_conf] = detection.confidence input_data[seq_idx, ssd_species] = detection.species return input_data def _findKeyframeSegments(self,array, classifications): """ Based on a sequence result and the underlying classifications, find the best keyframes """ keyframes=[] while True: max_idx = np.argmax(array) max_value = array[max_idx] if max_value < PEAK_THRESHOLD: return keyframes area_sum = peak_sum(array, max_idx, MIN_SPACING) low_idx = max(max_idx-MIN_SPACING,0) limit = min(max_idx+MIN_SPACING+1,len(array)) if area_sum > AREA_THRESHOLD: max_clear = 0.0 clear_idx = None for area_idx in range(low_idx,limit): # The actual frame is KEYFRAME_OFFSET away from the # result vector due to padding class_idx = area_idx - KEYFRAME_OFFSET # If there are no detections don't attemt to extract # then don't attempt to extract cover info if len(classifications[class_idx]) == 0: continue element_cover = classifications[class_idx][0].cover[2] if element_cover > max_clear: max_clear = element_cover clear_idx = area_idx if clear_idx is not None: keyframes.append(clear_idx) keyframes.sort() # Zero out the area identified for clear_idx in range(low_idx, limit): array[clear_idx] = 0.0 def _processSequences(self, sequences, seq_classes): """ Process a result of sequences and returns the list of keyframes """ result = self.tf_session.run(self.output_tensor, feed_dict={self.input_tensor: np.array(sequences)}) keyframes=[] assert self.sequenceSize() == result.shape[1] # Iterate over the output and if we found a match, add it to the # keyframe lists # Process each sequence result adding its actual frame number # to the overall list for seq_idx,array in enumerate(result): sequence_keyframes = self._findKeyframeSegments( array, seq_classes[seq_idx]) for keyframe in sequence_keyframes: keyframes.append(keyframe + (seq_idx*self.sequenceSize())) return keyframes