deduperreload_patching.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. from __future__ import annotations
  2. import ctypes
  3. import sys
  4. from typing import Any
  5. NOT_FOUND: object = object()
  6. NULL: object = object()
  7. _MAX_FIELD_SEARCH_OFFSET = 50
  8. POINTER_N_BYTES = ctypes.sizeof(ctypes.c_void_p)
  9. class DeduperReloaderPatchingMixin:
  10. @staticmethod
  11. def infer_field_offset(
  12. obj: object,
  13. field: str,
  14. ) -> int:
  15. field_value = getattr(obj, field, NOT_FOUND)
  16. if field_value is NOT_FOUND:
  17. return -1
  18. obj_addr = ctypes.c_void_p.from_buffer(ctypes.py_object(obj)).value
  19. field_addr = ctypes.c_void_p.from_buffer(ctypes.py_object(field_value)).value
  20. if obj_addr is None or field_addr is None:
  21. return -1
  22. ret = -1
  23. for offset in range(1, _MAX_FIELD_SEARCH_OFFSET):
  24. if (
  25. ctypes.cast(
  26. obj_addr + POINTER_N_BYTES * offset, ctypes.POINTER(ctypes.c_void_p)
  27. ).contents.value
  28. == field_addr
  29. ):
  30. ret = offset
  31. break
  32. return ret
  33. @classmethod
  34. def try_write_readonly_attr(
  35. cls,
  36. obj: object,
  37. field: str,
  38. new_value: object,
  39. offset: int | None = None,
  40. ) -> None:
  41. prev_value = getattr(obj, field, NOT_FOUND)
  42. if prev_value is NOT_FOUND:
  43. return
  44. if offset is None:
  45. offset = cls.infer_field_offset(obj, field)
  46. if offset == -1:
  47. return
  48. obj_addr = ctypes.c_void_p.from_buffer(ctypes.py_object(obj)).value
  49. if new_value is NULL:
  50. new_value_addr: int | None = 0
  51. else:
  52. new_value_addr = ctypes.c_void_p.from_buffer(
  53. ctypes.py_object(new_value)
  54. ).value
  55. if obj_addr is None or new_value_addr is None:
  56. return
  57. if prev_value is not None:
  58. ctypes.pythonapi.Py_DecRef(ctypes.py_object(prev_value))
  59. if new_value not in (None, NULL):
  60. ctypes.pythonapi.Py_IncRef(ctypes.py_object(new_value))
  61. ctypes.cast(
  62. obj_addr + POINTER_N_BYTES * offset, ctypes.POINTER(ctypes.c_void_p)
  63. ).contents.value = new_value_addr
  64. @classmethod
  65. def try_patch_readonly_attr(
  66. cls,
  67. old: object,
  68. new: object,
  69. field: str,
  70. new_is_value: bool = False,
  71. offset: int = -1,
  72. ) -> None:
  73. old_value = getattr(old, field, NOT_FOUND)
  74. new_value = new if new_is_value else getattr(new, field, NOT_FOUND)
  75. if old_value is NOT_FOUND or new_value is NOT_FOUND:
  76. return
  77. elif old_value is new_value:
  78. return
  79. elif old_value is not None and offset < 0:
  80. offset = cls.infer_field_offset(old, field)
  81. elif offset < 0:
  82. assert not new_is_value
  83. assert new_value is not None
  84. offset = cls.infer_field_offset(new, field)
  85. cls.try_write_readonly_attr(old, field, new_value, offset=offset)
  86. @classmethod
  87. def try_patch_attr(
  88. cls,
  89. old: object,
  90. new: object,
  91. field: str,
  92. new_is_value: bool = False,
  93. offset: int = -1,
  94. ) -> None:
  95. try:
  96. setattr(old, field, new if new_is_value else getattr(new, field))
  97. except (AttributeError, TypeError, ValueError):
  98. cls.try_patch_readonly_attr(old, new, field, new_is_value, offset)
  99. @classmethod
  100. def patch_function(
  101. cls, to_patch_to: Any, to_patch_from: Any, is_method: bool
  102. ) -> None:
  103. new_closure = []
  104. for freevar, closure_val in zip(
  105. to_patch_from.__code__.co_freevars or [], to_patch_from.__closure__ or []
  106. ):
  107. if (
  108. callable(closure_val.cell_contents)
  109. and freevar in to_patch_to.__code__.co_freevars
  110. ):
  111. new_closure.append(
  112. to_patch_to.__closure__[
  113. to_patch_to.__code__.co_freevars.index(freevar)
  114. ]
  115. )
  116. else:
  117. new_closure.append(closure_val)
  118. # lambdas may complain if there is more than one freevar
  119. cls.try_patch_attr(to_patch_to, to_patch_from, "__code__")
  120. offset = -1
  121. if to_patch_to.__closure__ is None and to_patch_from.__closure__ is not None:
  122. offset = cls.infer_field_offset(to_patch_from, "__closure__")
  123. if to_patch_to.__closure__ is not None or to_patch_from.__closure__ is not None:
  124. cls.try_patch_readonly_attr(
  125. to_patch_to,
  126. tuple(new_closure) or NULL,
  127. "__closure__",
  128. new_is_value=True,
  129. offset=offset,
  130. )
  131. for attr in ("__defaults__", "__kwdefaults__", "__doc__", "__dict__"):
  132. cls.try_patch_attr(to_patch_to, to_patch_from, attr)
  133. if is_method:
  134. cls.try_patch_readonly_attr(to_patch_to, to_patch_from, "__self__")