test_func_inspect.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. """
  2. Test the func_inspect module.
  3. """
  4. # Author: Gael Varoquaux <gael dot varoquaux at normalesup dot org>
  5. # Copyright (c) 2009 Gael Varoquaux
  6. # License: BSD Style, 3 clauses.
  7. import functools
  8. from joblib.func_inspect import (
  9. _clean_win_chars,
  10. filter_args,
  11. format_signature,
  12. get_func_code,
  13. get_func_name,
  14. )
  15. from joblib.memory import Memory
  16. from joblib.test.common import with_numpy
  17. from joblib.testing import fixture, parametrize, raises
  18. ###############################################################################
  19. # Module-level functions and fixture, for tests
  20. def f(x, y=0):
  21. pass
  22. def g(x):
  23. pass
  24. def h(x, y=0, *args, **kwargs):
  25. pass
  26. def i(x=1):
  27. pass
  28. def j(x, y, **kwargs):
  29. pass
  30. def k(*args, **kwargs):
  31. pass
  32. def m1(x, *, y):
  33. pass
  34. def m2(x, *, y, z=3):
  35. pass
  36. @fixture(scope="module")
  37. def cached_func(tmpdir_factory):
  38. # Create a Memory object to test decorated functions.
  39. # We should be careful not to call the decorated functions, so that
  40. # cache directories are not created in the temp dir.
  41. cachedir = tmpdir_factory.mktemp("joblib_test_func_inspect")
  42. mem = Memory(cachedir.strpath)
  43. @mem.cache
  44. def cached_func_inner(x):
  45. return x
  46. return cached_func_inner
  47. class Klass(object):
  48. def f(self, x):
  49. return x
  50. ###############################################################################
  51. # Tests
  52. @parametrize(
  53. "func,args,filtered_args",
  54. [
  55. (f, [[], (1,)], {"x": 1, "y": 0}),
  56. (f, [["x"], (1,)], {"y": 0}),
  57. (f, [["y"], (0,)], {"x": 0}),
  58. (f, [["y"], (0,), {"y": 1}], {"x": 0}),
  59. (f, [["x", "y"], (0,)], {}),
  60. (f, [[], (0,), {"y": 1}], {"x": 0, "y": 1}),
  61. (f, [["y"], (), {"x": 2, "y": 1}], {"x": 2}),
  62. (g, [[], (), {"x": 1}], {"x": 1}),
  63. (i, [[], (2,)], {"x": 2}),
  64. ],
  65. )
  66. def test_filter_args(func, args, filtered_args):
  67. assert filter_args(func, *args) == filtered_args
  68. def test_filter_args_method():
  69. obj = Klass()
  70. assert filter_args(obj.f, [], (1,)) == {"x": 1, "self": obj}
  71. @parametrize(
  72. "func,args,filtered_args",
  73. [
  74. (h, [[], (1,)], {"x": 1, "y": 0, "*": [], "**": {}}),
  75. (h, [[], (1, 2, 3, 4)], {"x": 1, "y": 2, "*": [3, 4], "**": {}}),
  76. (h, [[], (1, 25), {"ee": 2}], {"x": 1, "y": 25, "*": [], "**": {"ee": 2}}),
  77. (h, [["*"], (1, 2, 25), {"ee": 2}], {"x": 1, "y": 2, "**": {"ee": 2}}),
  78. ],
  79. )
  80. def test_filter_varargs(func, args, filtered_args):
  81. assert filter_args(func, *args) == filtered_args
  82. test_filter_kwargs_extra_params = [
  83. (m1, [[], (1,), {"y": 2}], {"x": 1, "y": 2}),
  84. (m2, [[], (1,), {"y": 2}], {"x": 1, "y": 2, "z": 3}),
  85. ]
  86. @parametrize(
  87. "func,args,filtered_args",
  88. [
  89. (k, [[], (1, 2), {"ee": 2}], {"*": [1, 2], "**": {"ee": 2}}),
  90. (k, [[], (3, 4)], {"*": [3, 4], "**": {}}),
  91. ]
  92. + test_filter_kwargs_extra_params,
  93. )
  94. def test_filter_kwargs(func, args, filtered_args):
  95. assert filter_args(func, *args) == filtered_args
  96. def test_filter_args_2():
  97. assert filter_args(j, [], (1, 2), {"ee": 2}) == {"x": 1, "y": 2, "**": {"ee": 2}}
  98. ff = functools.partial(f, 1)
  99. # filter_args has to special-case partial
  100. assert filter_args(ff, [], (1,)) == {"*": [1], "**": {}}
  101. assert filter_args(ff, ["y"], (1,)) == {"*": [1], "**": {}}
  102. @parametrize("func,funcname", [(f, "f"), (g, "g"), (cached_func, "cached_func")])
  103. def test_func_name(func, funcname):
  104. # Check that we are not confused by decoration
  105. # here testcase 'cached_func' is the function itself
  106. assert get_func_name(func)[1] == funcname
  107. def test_func_name_on_inner_func(cached_func):
  108. # Check that we are not confused by decoration
  109. # here testcase 'cached_func' is the 'cached_func_inner' function
  110. # returned by 'cached_func' fixture
  111. assert get_func_name(cached_func)[1] == "cached_func_inner"
  112. def test_func_name_collision_on_inner_func():
  113. # Check that two functions defining and caching an inner function
  114. # with the same do not cause (module, name) collision
  115. def f():
  116. def inner_func():
  117. return # pragma: no cover
  118. return get_func_name(inner_func)
  119. def g():
  120. def inner_func():
  121. return # pragma: no cover
  122. return get_func_name(inner_func)
  123. module, name = f()
  124. other_module, other_name = g()
  125. assert name == other_name
  126. assert module != other_module
  127. def test_func_inspect_errors():
  128. # Check that func_inspect is robust and will work on weird objects
  129. assert get_func_name("a".lower)[-1] == "lower"
  130. assert get_func_code("a".lower)[1:] == (None, -1)
  131. ff = lambda x: x # noqa: E731
  132. assert get_func_name(ff, win_characters=False)[-1] == "<lambda>"
  133. assert get_func_code(ff)[1] == __file__.replace(".pyc", ".py")
  134. # Simulate a function defined in __main__
  135. ff.__module__ = "__main__"
  136. assert get_func_name(ff, win_characters=False)[-1] == "<lambda>"
  137. assert get_func_code(ff)[1] == __file__.replace(".pyc", ".py")
  138. def func_with_kwonly_args(a, b, *, kw1="kw1", kw2="kw2"):
  139. pass
  140. def func_with_signature(a: int, b: int) -> None:
  141. pass
  142. def test_filter_args_edge_cases():
  143. assert filter_args(func_with_kwonly_args, [], (1, 2), {"kw1": 3, "kw2": 4}) == {
  144. "a": 1,
  145. "b": 2,
  146. "kw1": 3,
  147. "kw2": 4,
  148. }
  149. # filter_args doesn't care about keyword-only arguments so you
  150. # can pass 'kw1' into *args without any problem
  151. with raises(ValueError) as excinfo:
  152. filter_args(func_with_kwonly_args, [], (1, 2, 3), {"kw2": 2})
  153. excinfo.match("Keyword-only parameter 'kw1' was passed as positional parameter")
  154. assert filter_args(
  155. func_with_kwonly_args, ["b", "kw2"], (1, 2), {"kw1": 3, "kw2": 4}
  156. ) == {"a": 1, "kw1": 3}
  157. assert filter_args(func_with_signature, ["b"], (1, 2)) == {"a": 1}
  158. def test_bound_methods():
  159. """Make sure that calling the same method on two different instances
  160. of the same class does resolv to different signatures.
  161. """
  162. a = Klass()
  163. b = Klass()
  164. assert filter_args(a.f, [], (1,)) != filter_args(b.f, [], (1,))
  165. @parametrize(
  166. "exception,regex,func,args",
  167. [
  168. (
  169. ValueError,
  170. "ignore_lst must be a list of parameters to ignore",
  171. f,
  172. ["bar", (None,)],
  173. ),
  174. (
  175. ValueError,
  176. r"Ignore list: argument \'(.*)\' is not defined",
  177. g,
  178. [["bar"], (None,)],
  179. ),
  180. (ValueError, "Wrong number of arguments", h, [[]]),
  181. ],
  182. )
  183. def test_filter_args_error_msg(exception, regex, func, args):
  184. """Make sure that filter_args returns decent error messages, for the
  185. sake of the user.
  186. """
  187. with raises(exception) as excinfo:
  188. filter_args(func, *args)
  189. excinfo.match(regex)
  190. def test_filter_args_no_kwargs_mutation():
  191. """None-regression test against 0.12.0 changes.
  192. https://github.com/joblib/joblib/pull/75
  193. Make sure filter args doesn't mutate the kwargs dict that gets passed in.
  194. """
  195. kwargs = {"x": 0}
  196. filter_args(g, [], [], kwargs)
  197. assert kwargs == {"x": 0}
  198. def test_clean_win_chars():
  199. string = r"C:\foo\bar\main.py"
  200. mangled_string = _clean_win_chars(string)
  201. for char in ("\\", ":", "<", ">", "!"):
  202. assert char not in mangled_string
  203. @parametrize(
  204. "func,args,kwargs,sgn_expected",
  205. [
  206. (g, [list(range(5))], {}, "g([0, 1, 2, 3, 4])"),
  207. (k, [1, 2, (3, 4)], {"y": True}, "k(1, 2, (3, 4), y=True)"),
  208. ],
  209. )
  210. def test_format_signature(func, args, kwargs, sgn_expected):
  211. # Test signature formatting.
  212. path, sgn_result = format_signature(func, *args, **kwargs)
  213. assert sgn_result == sgn_expected
  214. def test_format_signature_long_arguments():
  215. shortening_threshold = 1500
  216. # shortening gets it down to 700 characters but there is the name
  217. # of the function in the signature and a few additional things
  218. # like dots for the ellipsis
  219. shortening_target = 700 + 10
  220. arg = "a" * shortening_threshold
  221. _, signature = format_signature(h, arg)
  222. assert len(signature) < shortening_target
  223. nb_args = 5
  224. args = [arg for _ in range(nb_args)]
  225. _, signature = format_signature(h, *args)
  226. assert len(signature) < shortening_target * nb_args
  227. kwargs = {str(i): arg for i, arg in enumerate(args)}
  228. _, signature = format_signature(h, **kwargs)
  229. assert len(signature) < shortening_target * nb_args
  230. _, signature = format_signature(h, *args, **kwargs)
  231. assert len(signature) < shortening_target * 2 * nb_args
  232. @with_numpy
  233. def test_format_signature_numpy():
  234. """Test the format signature formatting with numpy."""
  235. def test_special_source_encoding():
  236. from joblib.test.test_func_inspect_special_encoding import big5_f
  237. func_code, source_file, first_line = get_func_code(big5_f)
  238. assert first_line == 5
  239. assert "def big5_f():" in func_code
  240. assert "test_func_inspect_special_encoding" in source_file
  241. def _get_code():
  242. from joblib.test.test_func_inspect_special_encoding import big5_f
  243. return get_func_code(big5_f)[0]
  244. def test_func_code_consistency():
  245. from joblib.parallel import Parallel, delayed
  246. codes = Parallel(n_jobs=2)(delayed(_get_code)() for _ in range(5))
  247. assert len(set(codes)) == 1