initializers.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. import warnings
  2. def _viztracer_init(init_kwargs):
  3. """Initialize viztracer's profiler in worker processes"""
  4. from viztracer import VizTracer
  5. tracer = VizTracer(**init_kwargs)
  6. tracer.register_exit()
  7. tracer.start()
  8. def _make_viztracer_initializer_and_initargs():
  9. try:
  10. import viztracer
  11. tracer = viztracer.get_tracer()
  12. if tracer is not None and getattr(tracer, "enable", False):
  13. # Profiler is active: introspect its configuration to
  14. # initialize the workers with the same configuration.
  15. return _viztracer_init, (tracer.init_kwargs,)
  16. except ImportError:
  17. # viztracer is not installed: nothing to do
  18. pass
  19. except Exception as e:
  20. # In case viztracer's API evolve, we do not want to crash loky but
  21. # we want to know about it to be able to update loky.
  22. warnings.warn(f"Unable to introspect viztracer state: {e}")
  23. return None, ()
  24. class _ChainedInitializer:
  25. """Compound worker initializer
  26. This is meant to be used in conjunction with _chain_initializers to
  27. produce the necessary chained_args list to be passed to __call__.
  28. """
  29. def __init__(self, initializers):
  30. self._initializers = initializers
  31. def __call__(self, *chained_args):
  32. for initializer, args in zip(self._initializers, chained_args):
  33. initializer(*args)
  34. def _chain_initializers(initializer_and_args):
  35. """Convenience helper to combine a sequence of initializers.
  36. If some initializers are None, they are filtered out.
  37. """
  38. filtered_initializers = []
  39. filtered_initargs = []
  40. for initializer, initargs in initializer_and_args:
  41. if initializer is not None:
  42. filtered_initializers.append(initializer)
  43. filtered_initargs.append(initargs)
  44. if not filtered_initializers:
  45. return None, ()
  46. elif len(filtered_initializers) == 1:
  47. return filtered_initializers[0], filtered_initargs[0]
  48. else:
  49. return _ChainedInitializer(filtered_initializers), filtered_initargs
  50. def _prepare_initializer(initializer, initargs):
  51. if initializer is not None and not callable(initializer):
  52. raise TypeError(
  53. f"initializer must be a callable, got: {initializer!r}"
  54. )
  55. # Introspect runtime to determine if we need to propagate the viztracer
  56. # profiler information to the workers:
  57. return _chain_initializers(
  58. [
  59. (initializer, initargs),
  60. _make_viztracer_initializer_and_initargs(),
  61. ]
  62. )