OpenEM and Tator Pipelines

Tator is a web-based media management and curation project. Part of the media mangement is executing algorithms or workflows on a set of media. OpenEM is able to be run within the confines of a Tator workflow. Currently Retinanet-based Detection is supported for inference within a workflow.

Using the Reference Detection Workflow

The reference workflow can be used by modifying the scripts/tator/detection_workflow.yaml to match those of the given project.

Generating a data image

The reference workflow at run-time pulls a docker image containing network coefficients and weights. To generate a weights image, one can use the scripts/make_pipeline_image.py in a manner similar to below:

1
python3 make_pipeline_image.py --graph-pb <trained.pb> --train-ini <path_to_train.ini> --publish <docker_hub_user>/<image_name>

Note the values of <docker_hub_user> and <image_name> for use in the next section.

The referenced train.ini can be a subset of full train.ini; a minimimal configuration such as the following is acceptable for the requirements of uploadToTator.py:

1
2
3
[Data]
# Names of species, separated by commas.
Species=Fish

Using the reference workflow definition

A reference workflow yaml is in the repository which can be modified to indicate project-specific requirements. img_max_side, img_min_side, batch_size, and keep_threshold map to the arguments in infer.py directly.

This workflow is for executing retinanet-based detections on a video dataset using tensor-rt enabled hardware.

Nominally the only parameters required to change are the TATOR_PIPELINE_ARGS for each stage of the workflow.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: openem-workflow-
spec:
  entrypoint: pipeline
  ttlSecondsAfterFinished: 3600
  volumeClaimTemplates:
  - metadata:
      name: workdir
    spec:
      accessModes: [ "ReadWriteOnce" ]
      resources:
        requests:
          storage: 20Gi
  volumes:
  - name: dockersock
    hostPath:
      path: /var/run/docker.sock
  templates:
  - name: pipeline
    steps:
    - - name: setup
        template: setup
    - - name: worker
        template: worker
    - - name: teardown
        template: teardown
  - name: setup
    container:
      image: cvisionai/openem_lite:latest
      resources:
        limits:
          cpu: 250m
          memory: 1024Mi
      env:
      - name: TATOR_MEDIA_IDS
        value: "{{workflow.parameters.media_ids}}"
      - name: TATOR_API_SERVICE
        value: "{{workflow.parameters.rest_url}}"
      - name: TATOR_AUTH_TOKEN
        value: "{{workflow.parameters.rest_token}}"
      - name: TATOR_PROJECT_ID
        value: "{{workflow.parameters.project_id}}"
      - name: TATOR_WORK_DIR
        value: "/work"
      - name: TATOR_PIPELINE_ARGS
        value: "{\"data_image\" : \"<docker_hub_user>/<data_image>\"}"
      volumeMounts:
      - name: workdir
        mountPath: /work
      - name: dockersock
        mountPath: /var/run/docker.sock
      command: [python3]
      args: ["/scripts/tator/setup.py"]
  - name: worker
    container:
      image: cvisionai/openem_lite:latest
      resources:
        limits:
          nvidia.com/gpu: 1
      env:
      - name: TATOR_MEDIA_IDS
        value: "{{workflow.parameters.media_ids}}"
      - name: TATOR_API_SERVICE
        value: "{{workflow.parameters.rest_url}}"
      - name: TATOR_AUTH_TOKEN
        value: "{{workflow.parameters.rest_token}}"
      - name: TATOR_PROJECT_ID
        value: "{{workflow.parameters.project_id}}"
      - name: TATOR_WORK_DIR
        value: "/work"
      - name: TATOR_PIPELINE_ARGS
        value: "{"img_max_side\": <max>, \"img_min_side\": <min>, \"keep_threshold\": <keep>, \"media_type\": \"video\", \"img_ext\": \"mp4\", \"batch_size\" : <batch_size>}"
      volumeMounts:
      - name: workdir
        mountPath: /work
      command: [python3]
      args: ["/scripts/tator/detection_entry.py"]
  - name: teardown
    container:
      image: cvisionai/openem_lite:latest
      resources:
        limits:
          cpu: 250m
          memory: 1024Mi
      env:
      - name: TATOR_MEDIA_IDS
        value: "{{workflow.parameters.media_ids}}"
      - name: TATOR_API_SERVICE
        value: "{{workflow.parameters.rest_url}}"
      - name: TATOR_AUTH_TOKEN
        value: "{{workflow.parameters.rest_token}}"
      - name: TATOR_PROJECT_ID
        value: "{{workflow.parameters.project_id}}"
      - name: TATOR_WORK_DIR
        value: "/work"
      - name: TATOR_PIPELINE_ARGS
        value: "{\"type_id\": <box_type_id>, \"media_type\": \"pipeline\", \"img_ext\": \"mp4\"}"
      volumeMounts:
      - name: workdir
        mountPath: /work
      command: [python3]
      args: ["/scripts/tator/teardown.py"]

Detailed Mechanics

This section walks through the mechanics of the reference workflow so that users could build more ellaborate workflows on OpenEM technology.

A Tator Workflow is specifed no differently than a regular Argo <https://argoproj.github.io/argo/> workflow, other than there is an expectation the Tator REST API is used to access media files and supply results to a project.

A canonoical Tator workflow has three parts: setup, execution, and teardown. More advanced workflows can replace the execution stage with multiple stages using the directed acylcic graph capabilities of argo.

Project setup

A project for using this workflow has a media type (either a video type or an image type) represented by a <media_type_id>. The project also has a localization box type represented by <box_type_id>. The <media_type_id>> has the following required attributes:

Object Detector Processed
A string attribute type that is set to the date time when the object detector finishes processing the media file.

The <box_type_id> requires the the following attributes:

Species
A string representing the name for an object class. If ‘Species’ is not an appropriate name for class, this can be customized via the species_attr_name key in the pipeline argument object to the teardown stage. It defaults to ‘Species’ if not specified.
Confidence
A float attribute representing the score of the detection. If ‘Confidence’ is not a desired name, it can be customized via the confidence_attr_name key in the pipeline argument object to the teardown stage. It defaults to ‘Confidence’ if not specified.

Acquiring media

The example setup.py provides a canonical way to download media for a given workflow.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
#!/usr/bin/env python3

import pytator
import docker
import tarfile
import os
import sys
import json
import pandas as pd
import requests
import time
import math

if __name__ == '__main__':
    media_ids = os.getenv('TATOR_MEDIA_IDS')
    print(f"processing = {media_ids})")
    media_ids = [int(m) for m in media_ids.split(',')]
    rest_svc = os.getenv('TATOR_API_SERVICE')
    work_dir = os.getenv('TATOR_WORK_DIR')
    token=os.getenv('TATOR_AUTH_TOKEN')
    project_id=os.getenv('TATOR_PROJECT_ID')
    pipeline_args_str = os.getenv('TATOR_PIPELINE_ARGS')
    if pipeline_args_str:
        pipeline_args = json.loads(pipeline_args_str)
    else:
        pipeline_args = {}
    tator=pytator.Tator(rest_svc, token, project_id)
    
    work_filepath=os.path.join(work_dir, "work.csv")
    try:
        os.remove(work_filepath)
    except:
        pass

    # Download the network coefficients
    # Image stores coeffients in "/network" folder
    client=docker.from_env()
    image=client.images.pull(pipeline_args['data_image'])
    container=client.containers.create(pipeline_args['data_image'])
    bits, stats = container.get_archive("/network")
    network_tar = os.path.join(work_dir, "network.tar") 
    with open(network_tar, 'wb') as tar_file:
        for chunk in bits:
            tar_file.write(chunk)

    with tarfile.TarFile(network_tar) as tar_file:
        print(tar_file.getmembers())
        tar_file.extract("network/graph.pb", work_dir)
        tar_file.extract("network/train.ini", work_dir)

    container.remove()
    # First write CSV header
    cols=['media']
    work_frame=pd.DataFrame(columns=cols)
    work_frame.to_csv(work_filepath, index=False)

    media_elements=[]
    count=0
    chunk=100
    for _ in range(math.ceil(len(media_ids)/chunk)):
        chunk_ids=media_ids[count:count+chunk]
        str_ids = [str(x) for x in chunk_ids]
        media_elements.extend(tator.Media.filter({"media_id":
                                                  ','.join(str_ids)}))
        count+=chunk
        
    print(f"Starting on {work_filepath}")
    for media_id,media in zip(media_ids,media_elements):
        media_unique_name = f"{media['id']}_{media['name']}"
        media_filepath = os.path.join(work_dir,media_unique_name)
        data={'media': media_filepath}
        print(f"Downloading {media['name']} to {media_filepath}")
        tator.Media.downloadFile(media, media_filepath)
        if not os.path.exists(media_filepath):
            print("File did not download!")
            sys.exit(255)
        work_frame=pd.DataFrame(columns=cols, data=[data])
        work_frame.to_csv(work_filepath, index=False, header=False, mode='a')

Executing Work

The heart of the reference workflow is infer.py from the openem_lite docker image. However, it is useful to have a layer of scripting above that CLI utility to translate workflow definitions to the underlying utility.

Submitting results

infer.py generates a csv with inference results, so another utility must interpret these results and submit to the underlying Tator web service. A script called uploadToTator.py is located in scripts, but similar to infer.py; inserting a layer between the raw script can be helpful to mananage the environment.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#!/usr/bin/env python3

import pytator
import json
import os
import sys
import time
import subprocess
import sys

if __name__ == '__main__':
    rest_svc = os.getenv('TATOR_API_SERVICE')
    work_dir = os.getenv('TATOR_WORK_DIR')
    token = os.getenv('TATOR_AUTH_TOKEN')
    project_id = os.getenv('TATOR_PROJECT_ID')
    pipeline_args_str = os.getenv('TATOR_PIPELINE_ARGS')
    if pipeline_args_str:
        pipeline_args = json.loads(pipeline_args_str)
    else:
        print("ERROR: No pipeline arguments specified!")
        sys.exit(-1)
    box_type_id = pipeline_args['type_id']
    img_ext = pipeline_args.get('img_ext', None)
    media_type = pipeline_args.get('media_type', None)
    image_ext = pipeline_args.get('img_ext', None)
    species_attr_name = pipeline_args.get('species_attr_name','Species')
    confidence_attr_name = pipeline_args.get('confidence_attr_name','Confidence')
    optional_args=[]
    version_number = pipeline_args.get('version_number', None)
    if version_number:
        optional_args.extend(['--version-number', str(version_number)])

    args = ["python3",
            "/scripts/uploadToTator.py",
            "--url", rest_svc,
            "--project", str(project_id),
            "--token", token,
            "--img-base-dir", "/work",
            "--localization-type-id", str(box_type_id),
            "--media-type-id", str(0),
            '--media-type', media_type,
            *optional_args,
            '--img-ext', image_ext,
            '--species-attr-name', species_attr_name,
            '--confidence-attr-name', confidence_attr_name,
            '--train-ini', '/work/network/train.ini',
            '/work/results.csv']
    cmd = " ".join(args)
    print(f"Upload Command = '{cmd}'")
    p=subprocess.Popen(args)
    p.wait()
    sys.exit(p.returncode)