gapi_async_test.cpp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525
  1. // This file is part of OpenCV project.
  2. // It is subject to the license terms in the LICENSE file found in the top-level directory
  3. // of this distribution and at http://opencv.org/license.html.
  4. //
  5. // Copyright (C) 2019 Intel Corporation
  6. #include "test_precomp.hpp"
  7. #include <opencv2/gapi/gcomputation_async.hpp>
  8. #include <opencv2/gapi/gcompiled_async.hpp>
  9. #include <opencv2/gapi/gasync_context.hpp>
  10. #include <condition_variable>
  11. #include <stdexcept>
  12. #include <thread>
  13. namespace opencv_test
  14. {
  15. //Main idea behind these tests is to have the same test script that is parameterized in order to test all setups (GCompiled vs apply, callback vs future).
  16. //So these differences are factored into devoted helper classes (mixins) which are then used by the common test script by help of CRTP.
  17. //Actual GAPI Computation with parameters to run on is mixed into test via CRTP as well.
  18. struct SumOfSum2x2 {
  19. cv::GComputation sum_of_sum;
  20. SumOfSum2x2() : sum_of_sum([]{
  21. cv::GMat in;
  22. cv::GScalar out = cv::gapi::sum(in + in);
  23. return GComputation{in, out};
  24. })
  25. {}
  26. const cv::Size sz{2, 2};
  27. cv::Mat in_mat{sz, CV_8U, cv::Scalar(1)};
  28. cv::Scalar out_sc;
  29. cv::GCompiled compile(){
  30. return sum_of_sum.compile(descr_of(in_mat));
  31. }
  32. cv::GComputation& computation(){
  33. return sum_of_sum;
  34. }
  35. cv::GCompileArgs compile_args(){
  36. return {};
  37. }
  38. cv::GRunArgs in_args(){
  39. return cv::gin(in_mat);
  40. }
  41. cv::GRunArgsP out_args(){
  42. return cv::gout(out_sc);
  43. }
  44. void verify(){
  45. EXPECT_EQ(8, out_sc[0]);
  46. }
  47. };
  48. namespace {
  49. G_TYPED_KERNEL(GThrow, <GMat(GMat)>, "org.opencv.test.throw")
  50. {
  51. static GMatDesc outMeta(GMatDesc in) { return in; }
  52. };
  53. struct gthrow_exception : std::runtime_error {
  54. using std::runtime_error::runtime_error;
  55. };
  56. GAPI_OCV_KERNEL(GThrowImpl, GThrow)
  57. {
  58. static void run(const cv::Mat& in, cv::Mat&)
  59. {
  60. //this condition is needed to avoid "Unreachable code" warning on windows inside OCVCallHelper
  61. if (!in.empty())
  62. {
  63. throw gthrow_exception{"test"};
  64. }
  65. }
  66. };
  67. //TODO: unify with callback helper code
  68. struct cancel_struct {
  69. std::atomic<int> num_tasks_to_spawn;
  70. cv::gapi::wip::GAsyncContext ctx;
  71. cancel_struct(int tasks_to_spawn) : num_tasks_to_spawn(tasks_to_spawn) {}
  72. };
  73. G_TYPED_KERNEL(GCancelationAdHoc, <GMat(GMat, cancel_struct*)>, "org.opencv.test.cancel_ad_hoc")
  74. {
  75. static GMatDesc outMeta(GMatDesc in, cancel_struct* ) { return in; }
  76. };
  77. GAPI_OCV_KERNEL(GCancelationAdHocImpl, GCancelationAdHoc)
  78. {
  79. static void run(const cv::Mat& , cancel_struct* cancel_struct_p, cv::Mat&) {
  80. auto& cancel_struct_ = * cancel_struct_p;
  81. auto num_tasks_to_spawn = -- cancel_struct_.num_tasks_to_spawn;
  82. cancel_struct_.ctx.cancel();
  83. EXPECT_GT(num_tasks_to_spawn, 0)<<"Incorrect Test setup - to small number of tasks to feed the queue \n";
  84. }
  85. };
  86. }
  87. struct ExceptionOnExecution {
  88. cv::GComputation throwing_gcomp;
  89. ExceptionOnExecution() : throwing_gcomp([]{
  90. cv::GMat in;
  91. auto gout = GThrow::on(in);
  92. return GComputation{in, gout};
  93. })
  94. {}
  95. const cv::Size sz{2, 2};
  96. cv::Mat in_mat{sz, CV_8U, cv::Scalar(1)};
  97. cv::Mat out;
  98. cv::GCompiled compile(){
  99. return throwing_gcomp.compile(descr_of(in_mat), compile_args());
  100. }
  101. cv::GComputation& computation(){
  102. return throwing_gcomp;
  103. }
  104. cv::GRunArgs in_args(){
  105. return cv::gin(in_mat);
  106. }
  107. cv::GRunArgsP out_args(){
  108. return cv::gout(out);
  109. }
  110. cv::GCompileArgs compile_args(){
  111. auto pkg = cv::gapi::kernels<GThrowImpl>();
  112. return cv::compile_args(pkg);
  113. }
  114. };
  115. struct SelfCanceling {
  116. cv::GComputation self_cancel;
  117. SelfCanceling(cancel_struct* cancel_struct_p) : self_cancel([cancel_struct_p]{
  118. cv::GMat in;
  119. cv::GMat out = GCancelationAdHoc::on(in, cancel_struct_p);
  120. return GComputation{in, out};
  121. })
  122. {}
  123. const cv::Size sz{2, 2};
  124. cv::Mat in_mat{sz, CV_8U, cv::Scalar(1)};
  125. cv::Mat out_mat;
  126. cv::GCompiled compile(){
  127. return self_cancel.compile(descr_of(in_mat), compile_args());
  128. }
  129. cv::GComputation& computation(){
  130. return self_cancel;
  131. }
  132. cv::GRunArgs in_args(){
  133. return cv::gin(in_mat);
  134. }
  135. cv::GRunArgsP out_args(){
  136. return cv::gout(out_mat);
  137. }
  138. cv::GCompileArgs compile_args(){
  139. auto pkg = cv::gapi::kernels<GCancelationAdHocImpl>();
  140. return cv::compile_args(pkg);
  141. }
  142. };
  143. template<typename crtp_final_t>
  144. struct crtp_cast {
  145. template<typename crtp_base_t>
  146. static crtp_final_t* crtp_cast_(crtp_base_t* this_)
  147. {
  148. return static_cast<crtp_final_t*>(this_);
  149. }
  150. };
  151. //Test Mixin, hiding details of callback based notification
  152. template<typename crtp_final_t>
  153. struct CallBack: crtp_cast<crtp_final_t> {
  154. std::atomic<bool> callback_called = {false};
  155. std::mutex mtx;
  156. std::exception_ptr ep;
  157. std::condition_variable cv;
  158. std::function<void(std::exception_ptr)> callback(){
  159. return [&](std::exception_ptr ep_){
  160. ep = ep_;
  161. callback_called = true;
  162. mtx.lock();
  163. mtx.unlock();
  164. cv.notify_one();
  165. };
  166. }
  167. template<typename... Args >
  168. void start_async(Args&&... args){
  169. this->crtp_cast_(this)->async(callback(), std::forward<Args>(args)...);
  170. }
  171. template<typename... Args >
  172. void start_async(cv::gapi::wip::GAsyncContext& ctx, Args&&... args){
  173. this->crtp_cast_(this)->async(ctx, callback(), std::forward<Args>(args)...);
  174. }
  175. void wait_for_result()
  176. {
  177. std::unique_lock<std::mutex> lck{mtx};
  178. cv.wait(lck,[&]{return callback_called == true;});
  179. if (ep)
  180. {
  181. std::rethrow_exception(ep);
  182. }
  183. }
  184. };
  185. //Test Mixin, hiding details of future based notification
  186. template<typename crtp_final_t>
  187. struct Future: crtp_cast<crtp_final_t> {
  188. std::future<void> f;
  189. template<typename... Args >
  190. void start_async(Args&&... args){
  191. f = this->crtp_cast_(this)->async(std::forward<Args>(args)...);
  192. }
  193. void wait_for_result()
  194. {
  195. f.get();
  196. }
  197. };
  198. //Test Mixin, hiding details of using compiled GAPI object
  199. template<typename crtp_final_t>
  200. struct AsyncCompiled : crtp_cast<crtp_final_t>{
  201. template<typename... Args>
  202. auto async(Args&&... args) -> decltype(cv::gapi::wip::async(std::declval<cv::GCompiled&>(), std::forward<Args>(args)...)){
  203. auto gcmpld = this->crtp_cast_(this)->compile();
  204. return cv::gapi::wip::async(gcmpld, std::forward<Args>(args)...);
  205. }
  206. template<typename... Args>
  207. auto async(cv::gapi::wip::GAsyncContext& ctx, Args&&... args) ->
  208. decltype(cv::gapi::wip::async(std::declval<cv::GCompiled&>(), std::forward<Args>(args)..., std::declval<cv::gapi::wip::GAsyncContext&>()))
  209. {
  210. auto gcmpld = this->crtp_cast_(this)->compile();
  211. return cv::gapi::wip::async(gcmpld, std::forward<Args>(args)..., ctx);
  212. }
  213. };
  214. //Test Mixin, hiding details of calling apply (async_apply) on GAPI Computation object
  215. template<typename crtp_final_t>
  216. struct AsyncApply : crtp_cast<crtp_final_t> {
  217. template<typename... Args>
  218. auto async(Args&&... args) ->
  219. decltype(cv::gapi::wip::async_apply(std::declval<cv::GComputation&>(), std::forward<Args>(args)..., std::declval<cv::GCompileArgs>()))
  220. {
  221. return cv::gapi::wip::async_apply(
  222. this->crtp_cast_(this)->computation(), std::forward<Args>(args)..., this->crtp_cast_(this)->compile_args()
  223. );
  224. }
  225. template<typename... Args>
  226. auto async(cv::gapi::wip::GAsyncContext& ctx, Args&&... args) ->
  227. decltype(cv::gapi::wip::async_apply(std::declval<cv::GComputation&>(), std::forward<Args>(args)... , std::declval<cv::GCompileArgs>(), std::declval<cv::gapi::wip::GAsyncContext&>()))
  228. {
  229. return cv::gapi::wip::async_apply(
  230. this->crtp_cast_(this)->computation(), std::forward<Args>(args)..., this->crtp_cast_(this)->compile_args(), ctx
  231. );
  232. }
  233. };
  234. template<typename case_t>
  235. struct normal: ::testing::Test, case_t{};
  236. TYPED_TEST_CASE_P(normal);
  237. TYPED_TEST_P(normal, basic){
  238. //Normal scenario: start function asynchronously and wait for the result, and verify it
  239. this->start_async(this->in_args(), this->out_args());
  240. this->wait_for_result();
  241. this->verify();
  242. }
  243. REGISTER_TYPED_TEST_CASE_P(normal,
  244. basic
  245. );
  246. template<typename case_t>
  247. struct exception: ::testing::Test, case_t{};
  248. TYPED_TEST_CASE_P(exception);
  249. TYPED_TEST_P(exception, basic){
  250. //Exceptional scenario: start function asynchronously and make sure exception is passed to the user
  251. this->start_async(this->in_args(), this->out_args());
  252. EXPECT_THROW(this->wait_for_result(), gthrow_exception);
  253. }
  254. REGISTER_TYPED_TEST_CASE_P(exception,
  255. basic
  256. );
  257. template<typename case_t>
  258. struct stress : ::testing::Test{};
  259. TYPED_TEST_CASE_P(stress);
  260. TYPED_TEST_P(stress, test){
  261. //Some stress testing: use a number of threads to start a bunch of async requests
  262. const std::size_t request_per_thread = 10;
  263. const std::size_t number_of_threads = 4;
  264. auto thread_body = [&](){
  265. std::vector<TypeParam> requests(request_per_thread);
  266. for (auto&& r : requests){
  267. r.start_async(r.in_args(), r.out_args());
  268. }
  269. for (auto&& r : requests){
  270. r.wait_for_result();
  271. r.verify();
  272. }
  273. };
  274. std::vector<std::thread> pool {number_of_threads};
  275. for (auto&& t : pool){
  276. t = std::thread{thread_body};
  277. }
  278. for (auto&& t : pool){
  279. t.join();
  280. }
  281. }
  282. REGISTER_TYPED_TEST_CASE_P(stress, test);
  283. template<typename case_t>
  284. struct cancel : ::testing::Test{};
  285. TYPED_TEST_CASE_P(cancel);
  286. TYPED_TEST_P(cancel, basic)
  287. {
  288. #if defined(__GNUC__) && __GNUC__ == 11
  289. // std::vector<TypeParam> requests can't handle type with ctor parameter (SelfCanceling)
  290. FAIL() << "Test code is not available due to compilation error with GCC 11";
  291. #else
  292. constexpr int num_tasks = 100;
  293. cancel_struct cancel_struct_ {num_tasks};
  294. std::vector<TypeParam> requests; requests.reserve(num_tasks);
  295. for (auto i = num_tasks; i>0; i--){
  296. requests.emplace_back(&cancel_struct_);
  297. }
  298. for (auto&& r : requests){
  299. //first request will cancel other on it's execution
  300. r.start_async(cancel_struct_.ctx, r.in_args(), r.out_args());
  301. }
  302. unsigned int canceled = 0 ;
  303. for (auto&& r : requests){
  304. try {
  305. r.wait_for_result();
  306. }catch (cv::gapi::wip::GAsyncCanceled&){
  307. ++canceled;
  308. }
  309. }
  310. ASSERT_GT(canceled, 0u);
  311. #endif
  312. }
  313. namespace {
  314. GRunArgs deep_copy_out_args(const GRunArgsP& args ){
  315. GRunArgs result; result.reserve(args.size());
  316. for (auto&& arg : args){
  317. //FIXME: replace this switch with use of visit() on variant, when it will be available
  318. switch (arg.index()){
  319. case GRunArgP::index_of<cv::UMat*>() : result.emplace_back(*util::get<cv::UMat*>(arg)); break;
  320. case GRunArgP::index_of<cv::Mat*>() : result.emplace_back(*util::get<cv::Mat*>(arg)); break;
  321. case GRunArgP::index_of<cv::Scalar*>() : result.emplace_back(*util::get<cv::Scalar*> (arg)); break;
  322. case GRunArgP::index_of<cv::detail::VectorRef>() : result.emplace_back(util::get<cv::detail::VectorRef> (arg)); break;
  323. default : ;
  324. }
  325. }
  326. return result;
  327. }
  328. GRunArgsP args_p_from_args(GRunArgs& args){
  329. GRunArgsP result; result.reserve(args.size());
  330. for (auto&& arg : args){
  331. switch (arg.index()){
  332. case GRunArg::index_of<cv::Mat>() : result.emplace_back(&util::get<cv::Mat>(arg)); break;
  333. case GRunArg::index_of<cv::UMat>() : result.emplace_back(&util::get<cv::UMat>(arg)); break;
  334. case GRunArg::index_of<cv::Scalar>() : result.emplace_back(&util::get<cv::Scalar> (arg)); break;
  335. case GRunArg::index_of<cv::detail::VectorRef>() : result.emplace_back(util::get<cv::detail::VectorRef> (arg)); break;
  336. default : ;
  337. }
  338. }
  339. return result;
  340. }
  341. }
  342. REGISTER_TYPED_TEST_CASE_P(cancel, basic);
  343. template<typename case_t>
  344. struct output_args_lifetime : ::testing::Test{
  345. static constexpr const int num_of_requests = 20;
  346. };
  347. TYPED_TEST_CASE_P(output_args_lifetime);
  348. //There are intentionally no actual checks (asserts and verify) in output_args_lifetime tests.
  349. //They are more of example use-cases than real tests. (ASAN/valgrind can still catch issues here)
  350. TYPED_TEST_P(output_args_lifetime, callback){
  351. std::atomic<int> active_requests = {0};
  352. for (int i=0; i<this->num_of_requests; i++)
  353. {
  354. TypeParam r;
  355. //As output arguments are __captured by reference__ calling code
  356. //__must__ ensure they live long enough to complete asynchronous activity.
  357. //(i.e. live at least until callback is called)
  358. auto out_args_ptr = std::make_shared<cv::GRunArgs>(deep_copy_out_args(r.out_args()));
  359. //Extend lifetime of out_args_ptr content by capturing it into a callback
  360. auto cb = [&active_requests, out_args_ptr](std::exception_ptr ){
  361. --active_requests;
  362. };
  363. ++active_requests;
  364. r.async(cb, r.in_args(), args_p_from_args(*out_args_ptr));
  365. }
  366. while(active_requests){
  367. std::this_thread::sleep_for(std::chrono::milliseconds{2});
  368. }
  369. }
  370. TYPED_TEST_P(output_args_lifetime, future){
  371. std::vector<std::future<void>> fs(this->num_of_requests);
  372. std::vector<std::shared_ptr<cv::GRunArgs>> out_ptrs(this->num_of_requests);
  373. for (int i=0; i<this->num_of_requests; i++)
  374. {
  375. TypeParam r;
  376. //As output arguments are __captured by reference__ calling code
  377. //__must__ ensure they live long enough to complete asynchronous activity.
  378. //(i.e. live at least until future.get()/wait() is returned)
  379. auto out_args_ptr = std::make_shared<cv::GRunArgs>(deep_copy_out_args(r.out_args()));
  380. //Extend lifetime of out_args_ptr content
  381. out_ptrs[i] = out_args_ptr;
  382. fs[i] = r.async(r.in_args(), args_p_from_args(*out_args_ptr));
  383. }
  384. for (auto const& ftr : fs ){
  385. ftr.wait();
  386. }
  387. }
  388. REGISTER_TYPED_TEST_CASE_P(output_args_lifetime, callback, future);
  389. //little helpers to match up all combinations of setups
  390. template<typename compute_fixture_t, template<typename> class... args_t>
  391. struct Case
  392. : compute_fixture_t,
  393. args_t<Case<compute_fixture_t, args_t...>> ...
  394. {
  395. template<typename... Args>
  396. Case(Args&&... args) : compute_fixture_t(std::forward<Args>(args)...) { }
  397. Case(Case const & ) = default;
  398. Case(Case && ) = default;
  399. Case() = default;
  400. };
  401. template<typename computation_t>
  402. using cases = ::testing::Types<
  403. Case<computation_t, CallBack, AsyncCompiled>,
  404. Case<computation_t, CallBack, AsyncApply>,
  405. Case<computation_t, Future, AsyncCompiled>,
  406. Case<computation_t, Future, AsyncApply>
  407. >;
  408. INSTANTIATE_TYPED_TEST_CASE_P(AsyncAPINormalFlow_, normal, cases<SumOfSum2x2>);
  409. INSTANTIATE_TYPED_TEST_CASE_P(AsyncAPIExceptionHandling_, exception, cases<ExceptionOnExecution>);
  410. INSTANTIATE_TYPED_TEST_CASE_P(AsyncAPIStress, stress, cases<SumOfSum2x2>);
  411. INSTANTIATE_TYPED_TEST_CASE_P(AsyncAPICancelation, cancel, cases<SelfCanceling>);
  412. template<typename computation_t>
  413. using explicit_wait_cases = ::testing::Types<
  414. Case<computation_t, AsyncCompiled>,
  415. Case<computation_t, AsyncApply>,
  416. Case<computation_t, AsyncCompiled>,
  417. Case<computation_t, AsyncApply>
  418. >;
  419. INSTANTIATE_TYPED_TEST_CASE_P(AsyncAPIOutArgsLifetTime, output_args_lifetime, explicit_wait_cases<SumOfSum2x2>);
  420. } // namespace opencv_test