optical_flow.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. #!/usr/bin/env python
  2. '''
  3. This sample using FlowNet v2 and RAFT model to calculate optical flow.
  4. FlowNet v2 Original Paper: https://arxiv.org/abs/1612.01925.
  5. FlowNet v2 Repo: https://github.com/lmb-freiburg/flownet2.
  6. Download the converted .caffemodel model from https://drive.google.com/open?id=16qvE9VNmU39NttpZwZs81Ga8VYQJDaWZ
  7. and .prototxt from https://drive.google.com/file/d/1RyNIUsan1ZOh2hpYIH36A-jofAvJlT6a/view?usp=sharing.
  8. Otherwise download original model from https://lmb.informatik.uni-freiburg.de/resources/binaries/flownet2/flownet2-models.tar.gz,
  9. convert .h5 model to .caffemodel and modify original .prototxt using .prototxt from link above.
  10. RAFT Original Paper: https://arxiv.org/pdf/2003.12039.pdf
  11. RAFT Repo: https://github.com/princeton-vl/RAFT
  12. Download the .onnx model from here https://github.com/opencv/opencv_zoo/raw/281d232cd99cd920853106d853c440edd35eb442/models/optical_flow_estimation_raft/optical_flow_estimation_raft_2023aug.onnx.
  13. '''
  14. import argparse
  15. import os.path
  16. import numpy as np
  17. import cv2 as cv
  18. class OpticalFlow(object):
  19. def __init__(self, model, height, width, proto=""):
  20. if proto:
  21. self.net = cv.dnn.readNetFromCaffe(proto, model)
  22. else:
  23. self.net = cv.dnn.readNet(model)
  24. self.net.setPreferableBackend(cv.dnn.DNN_BACKEND_OPENCV)
  25. self.height = height
  26. self.width = width
  27. def compute_flow(self, first_img, second_img):
  28. inp0 = cv.dnn.blobFromImage(first_img, size=(self.width, self.height))
  29. inp1 = cv.dnn.blobFromImage(second_img, size=(self.width, self.height))
  30. self.net.setInputsNames(["img0", "img1"])
  31. self.net.setInput(inp0, "img0")
  32. self.net.setInput(inp1, "img1")
  33. flow = self.net.forward()
  34. output = self.motion_to_color(flow)
  35. return output
  36. def motion_to_color(self, flow):
  37. arr = np.arange(0, 255, dtype=np.uint8)
  38. colormap = cv.applyColorMap(arr, cv.COLORMAP_HSV)
  39. colormap = colormap.squeeze(1)
  40. flow = flow.squeeze(0)
  41. fx, fy = flow[0, ...], flow[1, ...]
  42. rad = np.sqrt(fx**2 + fy**2)
  43. maxrad = rad.max() if rad.max() != 0 else 1
  44. ncols = arr.size
  45. rad = rad[..., np.newaxis] / maxrad
  46. a = np.arctan2(-fy / maxrad, -fx / maxrad) / np.pi
  47. fk = (a + 1) / 2.0 * (ncols - 1)
  48. k0 = fk.astype(np.int32)
  49. k1 = (k0 + 1) % ncols
  50. f = fk[..., np.newaxis] - k0[..., np.newaxis]
  51. col0 = colormap[k0] / 255.0
  52. col1 = colormap[k1] / 255.0
  53. col = (1 - f) * col0 + f * col1
  54. col = np.where(rad <= 1, 1 - rad * (1 - col), col * 0.75)
  55. output = (255.0 * col).astype(np.uint8)
  56. return output
  57. if __name__ == '__main__':
  58. parser = argparse.ArgumentParser(description='Use this script to calculate optical flow',
  59. formatter_class=argparse.ArgumentDefaultsHelpFormatter)
  60. parser.add_argument('-input', '-i', required=True, help='Path to input video file. Skip this argument to capture frames from a camera.')
  61. parser.add_argument('--height', default=320, type=int, help='Input height')
  62. parser.add_argument('--width', default=448, type=int, help='Input width')
  63. parser.add_argument('--proto', '-p', default='', help='Path to prototxt.')
  64. parser.add_argument('--model', '-m', required=True, help='Path to model.')
  65. args, _ = parser.parse_known_args()
  66. if not os.path.isfile(args.model):
  67. raise OSError("Model does not exist")
  68. if args.proto and not os.path.isfile(args.proto):
  69. raise OSError("Prototxt does not exist")
  70. winName = 'Calculation optical flow in OpenCV'
  71. cv.namedWindow(winName, cv.WINDOW_NORMAL)
  72. cap = cv.VideoCapture(args.input if args.input else 0)
  73. hasFrame, first_frame = cap.read()
  74. if args.proto:
  75. divisor = 64.
  76. var = {}
  77. var['ADAPTED_WIDTH'] = int(np.ceil(args.width/divisor) * divisor)
  78. var['ADAPTED_HEIGHT'] = int(np.ceil(args.height/divisor) * divisor)
  79. var['SCALE_WIDTH'] = args.width / float(var['ADAPTED_WIDTH'])
  80. var['SCALE_HEIGHT'] = args.height / float(var['ADAPTED_HEIGHT'])
  81. config = ''
  82. proto = open(args.proto).readlines()
  83. for line in proto:
  84. for key, value in var.items():
  85. tag = "$%s$" % key
  86. line = line.replace(tag, str(value))
  87. config += line
  88. caffemodel = open(args.model, 'rb').read()
  89. opt_flow = OpticalFlow(caffemodel, var['ADAPTED_HEIGHT'], var['ADAPTED_WIDTH'], bytearray(config.encode()))
  90. else:
  91. opt_flow = OpticalFlow(args.model, 360, 480)
  92. while cv.waitKey(1) < 0:
  93. hasFrame, second_frame = cap.read()
  94. if not hasFrame:
  95. break
  96. flow = opt_flow.compute_flow(first_frame, second_frame)
  97. first_frame = second_frame
  98. cv.imshow(winName, flow)