OpenEM and Tator Pipelines¶
Tator is a web-based media management and curation project. Part of the media mangement is executing algorithms or workflows on a set of media. OpenEM is able to be run within the confines of a Tator workflow. Currently Retinanet-based Detection is supported for inference within a workflow.
Using the Reference Detection Workflow¶
The reference workflow can be used by modifying the scripts/tator/detection_workflow.yaml to match those of the given project.
Generating a data image¶
The reference workflow at run-time pulls a docker image containing network coefficients and weights. To generate a weights image, one can use the scripts/make_pipeline_image.py in a manner similar to below:
1 | python3 make_pipeline_image.py --graph-pb <trained.pb> --train-ini <path_to_train.ini> --publish <docker_hub_user>/<image_name>
|
Note the values of <docker_hub_user> and <image_name> for use in the next section.
The referenced train.ini can be a subset of full train.ini; a minimimal configuration such as the following is acceptable for the requirements of uploadToTator.py:
1 2 3 | [Data]
# Names of species, separated by commas.
Species=Fish
|
Using the reference workflow definition¶
A reference workflow yaml is in the repository which can be modified to indicate project-specific requirements. img_max_side, img_min_side, batch_size, and keep_threshold map to the arguments in infer.py directly.
This workflow is for executing retinanet-based detections on a video dataset using tensor-rt enabled hardware.
Nominally the only parameters required to change are the TATOR_PIPELINE_ARGS for each stage of the workflow.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 | apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: openem-workflow-
spec:
entrypoint: pipeline
ttlSecondsAfterFinished: 3600
volumeClaimTemplates:
- metadata:
name: workdir
spec:
accessModes: [ "ReadWriteOnce" ]
resources:
requests:
storage: 20Gi
volumes:
- name: dockersock
hostPath:
path: /var/run/docker.sock
templates:
- name: pipeline
steps:
- - name: setup
template: setup
- - name: worker
template: worker
- - name: teardown
template: teardown
- name: setup
container:
image: cvisionai/openem_lite:latest
resources:
limits:
cpu: 250m
memory: 1024Mi
env:
- name: TATOR_MEDIA_IDS
value: "{{workflow.parameters.media_ids}}"
- name: TATOR_API_SERVICE
value: "{{workflow.parameters.rest_url}}"
- name: TATOR_AUTH_TOKEN
value: "{{workflow.parameters.rest_token}}"
- name: TATOR_PROJECT_ID
value: "{{workflow.parameters.project_id}}"
- name: TATOR_WORK_DIR
value: "/work"
- name: TATOR_PIPELINE_ARGS
value: "{\"data_image\" : \"<docker_hub_user>/<data_image>\"}"
volumeMounts:
- name: workdir
mountPath: /work
- name: dockersock
mountPath: /var/run/docker.sock
command: [python3]
args: ["/scripts/tator/setup.py"]
- name: worker
container:
image: cvisionai/openem_lite:latest
resources:
limits:
nvidia.com/gpu: 1
env:
- name: TATOR_MEDIA_IDS
value: "{{workflow.parameters.media_ids}}"
- name: TATOR_API_SERVICE
value: "{{workflow.parameters.rest_url}}"
- name: TATOR_AUTH_TOKEN
value: "{{workflow.parameters.rest_token}}"
- name: TATOR_PROJECT_ID
value: "{{workflow.parameters.project_id}}"
- name: TATOR_WORK_DIR
value: "/work"
- name: TATOR_PIPELINE_ARGS
value: "{"img_max_side\": <max>, \"img_min_side\": <min>, \"keep_threshold\": <keep>, \"media_type\": \"video\", \"img_ext\": \"mp4\", \"batch_size\" : <batch_size>}"
volumeMounts:
- name: workdir
mountPath: /work
command: [python3]
args: ["/scripts/tator/detection_entry.py"]
- name: teardown
container:
image: cvisionai/openem_lite:latest
resources:
limits:
cpu: 250m
memory: 1024Mi
env:
- name: TATOR_MEDIA_IDS
value: "{{workflow.parameters.media_ids}}"
- name: TATOR_API_SERVICE
value: "{{workflow.parameters.rest_url}}"
- name: TATOR_AUTH_TOKEN
value: "{{workflow.parameters.rest_token}}"
- name: TATOR_PROJECT_ID
value: "{{workflow.parameters.project_id}}"
- name: TATOR_WORK_DIR
value: "/work"
- name: TATOR_PIPELINE_ARGS
value: "{\"type_id\": <box_type_id>, \"media_type\": \"pipeline\", \"img_ext\": \"mp4\"}"
volumeMounts:
- name: workdir
mountPath: /work
command: [python3]
args: ["/scripts/tator/teardown.py"]
|
Detailed Mechanics¶
This section walks through the mechanics of the reference workflow so that users could build more ellaborate workflows on OpenEM technology.
A Tator Workflow is specifed no differently than a regular Argo <https://argoproj.github.io/argo/> workflow, other than there is an expectation the Tator REST API is used to access media files and supply results to a project.
A canonoical Tator workflow has three parts: setup, execution, and teardown. More advanced workflows can replace the execution stage with multiple stages using the directed acylcic graph capabilities of argo.
Project setup¶
A project for using this workflow has a media type (either a video type or
an image type) represented by a <media_type_id>
. The project also has a
localization box type represented by <box_type_id>
. The
<media_type_id>>
has the following required attributes:
- Object Detector Processed
- A string attribute type that is set to the date time when the object detector finishes processing the media file.
The <box_type_id>
requires the the following attributes:
- Species
- A string representing the name for an object class. If ‘Species’ is not
an appropriate name for class, this can be customized via the
species_attr_name
key in the pipeline argument object to the teardown stage. It defaults to ‘Species’ if not specified. - Confidence
- A float attribute representing the score of the detection. If ‘Confidence’
is not a desired name, it can be customized via the
confidence_attr_name
key in the pipeline argument object to the teardown stage. It defaults to ‘Confidence’ if not specified.
Acquiring media¶
The example setup.py provides a canonical way to download media for a given workflow.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 | #!/usr/bin/env python3
import pytator
import docker
import tarfile
import os
import sys
import json
import pandas as pd
import requests
import time
import math
if __name__ == '__main__':
media_ids = os.getenv('TATOR_MEDIA_IDS')
print(f"processing = {media_ids})")
media_ids = [int(m) for m in media_ids.split(',')]
rest_svc = os.getenv('TATOR_API_SERVICE')
work_dir = os.getenv('TATOR_WORK_DIR')
token=os.getenv('TATOR_AUTH_TOKEN')
project_id=os.getenv('TATOR_PROJECT_ID')
pipeline_args_str = os.getenv('TATOR_PIPELINE_ARGS')
if pipeline_args_str:
pipeline_args = json.loads(pipeline_args_str)
else:
pipeline_args = {}
tator=pytator.Tator(rest_svc, token, project_id)
work_filepath=os.path.join(work_dir, "work.csv")
try:
os.remove(work_filepath)
except:
pass
# Download the network coefficients
# Image stores coeffients in "/network" folder
client=docker.from_env()
image=client.images.pull(pipeline_args['data_image'])
container=client.containers.create(pipeline_args['data_image'])
bits, stats = container.get_archive("/network")
network_tar = os.path.join(work_dir, "network.tar")
with open(network_tar, 'wb') as tar_file:
for chunk in bits:
tar_file.write(chunk)
with tarfile.TarFile(network_tar) as tar_file:
print(tar_file.getmembers())
tar_file.extract("network/graph.pb", work_dir)
tar_file.extract("network/train.ini", work_dir)
container.remove()
# First write CSV header
cols=['media']
work_frame=pd.DataFrame(columns=cols)
work_frame.to_csv(work_filepath, index=False)
media_elements=[]
count=0
chunk=100
for _ in range(math.ceil(len(media_ids)/chunk)):
chunk_ids=media_ids[count:count+chunk]
str_ids = [str(x) for x in chunk_ids]
media_elements.extend(tator.Media.filter({"media_id":
','.join(str_ids)}))
count+=chunk
print(f"Starting on {work_filepath}")
for media_id,media in zip(media_ids,media_elements):
media_unique_name = f"{media['id']}_{media['name']}"
media_filepath = os.path.join(work_dir,media_unique_name)
data={'media': media_filepath}
print(f"Downloading {media['name']} to {media_filepath}")
tator.Media.downloadFile(media, media_filepath)
if not os.path.exists(media_filepath):
print("File did not download!")
sys.exit(255)
work_frame=pd.DataFrame(columns=cols, data=[data])
work_frame.to_csv(work_filepath, index=False, header=False, mode='a')
|
Executing Work¶
The heart of the reference workflow is infer.py from the openem_lite docker image. However, it is useful to have a layer of scripting above that CLI utility to translate workflow definitions to the underlying utility.
Submitting results¶
infer.py generates a csv with inference results, so another utility must interpret these results and submit to the underlying Tator web service. A script called uploadToTator.py is located in scripts, but similar to infer.py; inserting a layer between the raw script can be helpful to mananage the environment.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | #!/usr/bin/env python3
import pytator
import json
import os
import sys
import time
import subprocess
import sys
if __name__ == '__main__':
rest_svc = os.getenv('TATOR_API_SERVICE')
work_dir = os.getenv('TATOR_WORK_DIR')
token = os.getenv('TATOR_AUTH_TOKEN')
project_id = os.getenv('TATOR_PROJECT_ID')
pipeline_args_str = os.getenv('TATOR_PIPELINE_ARGS')
if pipeline_args_str:
pipeline_args = json.loads(pipeline_args_str)
else:
print("ERROR: No pipeline arguments specified!")
sys.exit(-1)
box_type_id = pipeline_args['type_id']
img_ext = pipeline_args.get('img_ext', None)
media_type = pipeline_args.get('media_type', None)
image_ext = pipeline_args.get('img_ext', None)
species_attr_name = pipeline_args.get('species_attr_name','Species')
confidence_attr_name = pipeline_args.get('confidence_attr_name','Confidence')
optional_args=[]
version_number = pipeline_args.get('version_number', None)
if version_number:
optional_args.extend(['--version-number', str(version_number)])
args = ["python3",
"/scripts/uploadToTator.py",
"--url", rest_svc,
"--project", str(project_id),
"--token", token,
"--img-base-dir", "/work",
"--localization-type-id", str(box_type_id),
"--media-type-id", str(0),
'--media-type', media_type,
*optional_args,
'--img-ext', image_ext,
'--species-attr-name', species_attr_name,
'--confidence-attr-name', confidence_attr_name,
'--train-ini', '/work/network/train.ini',
'/work/results.csv']
cmd = " ".join(args)
print(f"Upload Command = '{cmd}'")
p=subprocess.Popen(args)
p.wait()
sys.exit(p.returncode)
|