testing_refleaks.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. # Protocol Buffers - Google's data interchange format
  2. # Copyright 2008 Google Inc. All rights reserved.
  3. #
  4. # Use of this source code is governed by a BSD-style
  5. # license that can be found in the LICENSE file or at
  6. # https://developers.google.com/open-source/licenses/bsd
  7. """A subclass of unittest.TestCase which checks for reference leaks.
  8. To use:
  9. - Use testing_refleak.BaseTestCase instead of unittest.TestCase
  10. - Configure and compile Python with --with-pydebug
  11. If sys.gettotalrefcount() is not available (because Python was built without
  12. the Py_DEBUG option), then this module is a no-op and tests will run normally.
  13. """
  14. import copyreg
  15. import gc
  16. import sys
  17. import unittest
  18. class LocalTestResult(unittest.TestResult):
  19. """A TestResult which forwards events to a parent object, except for Skips."""
  20. def __init__(self, parent_result):
  21. unittest.TestResult.__init__(self)
  22. self.parent_result = parent_result
  23. def addError(self, test, error):
  24. self.parent_result.addError(test, error)
  25. def addFailure(self, test, error):
  26. self.parent_result.addFailure(test, error)
  27. def addSkip(self, test, reason):
  28. pass
  29. def addDuration(self, test, duration):
  30. pass
  31. class ReferenceLeakCheckerMixin(object):
  32. """A mixin class for TestCase, which checks reference counts."""
  33. NB_RUNS = 3
  34. def run(self, result=None):
  35. testMethod = getattr(self, self._testMethodName)
  36. expecting_failure_method = getattr(testMethod, "__unittest_expecting_failure__", False)
  37. expecting_failure_class = getattr(self, "__unittest_expecting_failure__", False)
  38. if expecting_failure_class or expecting_failure_method:
  39. return
  40. # python_message.py registers all Message classes to some pickle global
  41. # registry, which makes the classes immortal.
  42. # We save a copy of this registry, and reset it before we could references.
  43. self._saved_pickle_registry = copyreg.dispatch_table.copy()
  44. # Run the test twice, to warm up the instance attributes.
  45. super(ReferenceLeakCheckerMixin, self).run(result=result)
  46. super(ReferenceLeakCheckerMixin, self).run(result=result)
  47. local_result = LocalTestResult(result)
  48. num_flakes = 0
  49. refcount_deltas = []
  50. # Observe the refcount, then create oldrefcount which actually makes the
  51. # refcount 1 higher than the recorded value immediately
  52. oldrefcount = self._getRefcounts()
  53. while len(refcount_deltas) < self.NB_RUNS:
  54. oldrefcount = self._getRefcounts()
  55. super(ReferenceLeakCheckerMixin, self).run(result=local_result)
  56. newrefcount = self._getRefcounts()
  57. # If the GC was able to collect some objects after the call to run() that
  58. # it could not collect before the call, then the counts won't match.
  59. if newrefcount < oldrefcount and num_flakes < 2:
  60. # This result is (probably) a flake -- garbage collectors aren't very
  61. # predictable, but a lower ending refcount is the opposite of the
  62. # failure we are testing for. If the result is repeatable, then we will
  63. # eventually report it, but not after trying to eliminate it.
  64. num_flakes += 1
  65. continue
  66. num_flakes = 0
  67. refcount_deltas.append(newrefcount - oldrefcount)
  68. print(refcount_deltas, self)
  69. try:
  70. self.assertEqual(refcount_deltas, [0] * self.NB_RUNS)
  71. except Exception: # pylint: disable=broad-except
  72. result.addError(self, sys.exc_info())
  73. def _getRefcounts(self):
  74. if hasattr(sys, "_clear_internal_caches"): # Since 3.13
  75. sys._clear_internal_caches() # pylint: disable=protected-access
  76. else:
  77. sys._clear_type_cache() # pylint: disable=protected-access
  78. copyreg.dispatch_table.clear()
  79. copyreg.dispatch_table.update(self._saved_pickle_registry)
  80. # It is sometimes necessary to gc.collect() multiple times, to ensure
  81. # that all objects can be collected.
  82. gc.collect()
  83. gc.collect()
  84. gc.collect()
  85. return sys.gettotalrefcount()
  86. if hasattr(sys, 'gettotalrefcount'):
  87. def TestCase(test_class):
  88. new_bases = (ReferenceLeakCheckerMixin,) + test_class.__bases__
  89. new_class = type(test_class)(
  90. test_class.__name__, new_bases, dict(test_class.__dict__))
  91. return new_class
  92. SkipReferenceLeakChecker = unittest.skip
  93. else:
  94. # When PyDEBUG is not enabled, run the tests normally.
  95. def TestCase(test_class):
  96. return test_class
  97. def SkipReferenceLeakChecker(reason):
  98. del reason # Don't skip, so don't need a reason.
  99. def Same(func):
  100. return func
  101. return Same