| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- # -------------------------------------------------------------------------
- # Copyright (c) Microsoft Corporation. All rights reserved.
- # Licensed under the MIT License.
- # --------------------------------------------------------------------------
- from __future__ import annotations
- import argparse
- import os
- import sys
- from timeit import default_timer as timer
- import numpy as np
- import onnxruntime as onnxrt
- float_dict = {
- "tensor(float16)": "float16",
- "tensor(float)": "float32",
- "tensor(double)": "float64",
- }
- integer_dict = {
- "tensor(int32)": "int32",
- "tensor(int8)": "int8",
- "tensor(uint8)": "uint8",
- "tensor(int16)": "int16",
- "tensor(uint16)": "uint16",
- "tensor(int64)": "int64",
- "tensor(uint64)": "uint64",
- }
- def generate_feeds(sess, symbolic_dims: dict | None = None):
- feeds = {}
- symbolic_dims = symbolic_dims or {}
- for input_meta in sess.get_inputs():
- # replace any symbolic dimensions
- shape = []
- for dim in input_meta.shape:
- if not dim:
- # unknown dim
- shape.append(1)
- elif isinstance(dim, str):
- # symbolic dim. see if we have a value otherwise use 1
- if dim in symbolic_dims:
- shape.append(int(symbolic_dims[dim]))
- else:
- shape.append(1)
- else:
- shape.append(dim)
- if input_meta.type in float_dict:
- feeds[input_meta.name] = np.random.rand(*shape).astype(float_dict[input_meta.type])
- elif input_meta.type in integer_dict:
- feeds[input_meta.name] = np.random.uniform(high=1000, size=tuple(shape)).astype(
- integer_dict[input_meta.type]
- )
- elif input_meta.type == "tensor(bool)":
- feeds[input_meta.name] = np.random.randint(2, size=tuple(shape)).astype("bool")
- else:
- print(f"unsupported input type {input_meta.type} for input {input_meta.name}")
- sys.exit(-1)
- return feeds
- # simple test program for loading onnx model, feeding all inputs and running the model num_iters times.
- def run_model(
- model_path,
- num_iters=1,
- debug=None,
- profile=None,
- symbolic_dims=None,
- feeds=None,
- override_initializers=True,
- ):
- symbolic_dims = symbolic_dims or {}
- if debug:
- print(f"Pausing execution ready for debugger to attach to pid: {os.getpid()}")
- print("Press key to continue.")
- sys.stdin.read(1)
- sess_options = None
- if profile:
- sess_options = onnxrt.SessionOptions()
- sess_options.enable_profiling = True
- sess_options.profile_file_prefix = os.path.basename(model_path)
- sess = onnxrt.InferenceSession(
- model_path,
- sess_options=sess_options,
- providers=onnxrt.get_available_providers(),
- )
- meta = sess.get_modelmeta()
- if not feeds:
- feeds = generate_feeds(sess, symbolic_dims)
- if override_initializers:
- # Starting with IR4 some initializers provide default values
- # and can be overridden (available in IR4). For IR < 4 models
- # the list would be empty
- for initializer in sess.get_overridable_initializers():
- shape = [dim if dim else 1 for dim in initializer.shape]
- if initializer.type in float_dict:
- feeds[initializer.name] = np.random.rand(*shape).astype(float_dict[initializer.type])
- elif initializer.type in integer_dict:
- feeds[initializer.name] = np.random.uniform(high=1000, size=tuple(shape)).astype(
- integer_dict[initializer.type]
- )
- elif initializer.type == "tensor(bool)":
- feeds[initializer.name] = np.random.randint(2, size=tuple(shape)).astype("bool")
- else:
- print(f"unsupported initializer type {initializer.type} for initializer {initializer.name}")
- sys.exit(-1)
- start = timer()
- for _i in range(num_iters):
- outputs = sess.run([], feeds) # fetch all outputs
- end = timer()
- print(f"model: {meta.graph_name}")
- print(f"version: {meta.version}")
- print(f"iterations: {num_iters}")
- print(f"avg latency: {((end - start) * 1000) / num_iters} ms")
- if profile:
- trace_file = sess.end_profiling()
- print(f"trace file written to: {trace_file}")
- return 0, feeds, num_iters > 0 and outputs
- def main():
- parser = argparse.ArgumentParser(description="Simple ONNX Runtime Test Tool.")
- parser.add_argument("model_path", help="model path")
- parser.add_argument(
- "num_iters",
- nargs="?",
- type=int,
- default=1000,
- help="model run iterations. default=1000",
- )
- parser.add_argument(
- "--debug",
- action="store_true",
- help="pause execution to allow attaching a debugger.",
- )
- parser.add_argument("--profile", action="store_true", help="enable chrome timeline trace profiling.")
- parser.add_argument(
- "--symbolic_dims",
- default={},
- type=lambda s: dict(x.split("=") for x in s.split(",")),
- help="Comma separated name=value pairs for any symbolic dimensions in the model input. "
- "e.g. --symbolic_dims batch=1,seqlen=5. "
- "If not provided, the value of 1 will be used for all symbolic dimensions.",
- )
- args = parser.parse_args()
- exit_code, _, _ = run_model(args.model_path, args.num_iters, args.debug, args.profile, args.symbolic_dims)
- sys.exit(exit_code)
- if __name__ == "__main__":
- main()
|