internal_kv.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. from typing import List, Optional, Union
  2. from ray._private.client_mode_hook import client_mode_hook
  3. from ray._raylet import GcsClient
  4. _initialized = False
  5. global_gcs_client = None
  6. def _internal_kv_reset():
  7. global global_gcs_client, _initialized
  8. global_gcs_client = None
  9. _initialized = False
  10. def internal_kv_get_gcs_client():
  11. return global_gcs_client
  12. def _initialize_internal_kv(gcs_client: GcsClient):
  13. """Initialize the internal KV for use in other function calls."""
  14. global global_gcs_client, _initialized
  15. assert gcs_client is not None
  16. global_gcs_client = gcs_client
  17. _initialized = True
  18. @client_mode_hook
  19. def _internal_kv_initialized():
  20. return global_gcs_client is not None
  21. @client_mode_hook
  22. def _internal_kv_get(
  23. key: Union[str, bytes], *, namespace: Optional[Union[str, bytes]] = None
  24. ) -> bytes:
  25. """Fetch the value of a binary key."""
  26. if isinstance(key, str):
  27. key = key.encode()
  28. if isinstance(namespace, str):
  29. namespace = namespace.encode()
  30. assert isinstance(key, bytes)
  31. return global_gcs_client.internal_kv_get(key, namespace)
  32. @client_mode_hook
  33. def _internal_kv_exists(
  34. key: Union[str, bytes], *, namespace: Optional[Union[str, bytes]] = None
  35. ) -> bool:
  36. """Check key exists or not."""
  37. if isinstance(key, str):
  38. key = key.encode()
  39. if isinstance(namespace, str):
  40. namespace = namespace.encode()
  41. assert isinstance(key, bytes)
  42. return global_gcs_client.internal_kv_exists(key, namespace)
  43. @client_mode_hook
  44. def _pin_runtime_env_uri(uri: str, *, expiration_s: int) -> None:
  45. """Pin a runtime_env URI for expiration_s."""
  46. return global_gcs_client.pin_runtime_env_uri(uri, expiration_s)
  47. @client_mode_hook
  48. def _internal_kv_put(
  49. key: Union[str, bytes],
  50. value: Union[str, bytes],
  51. overwrite: bool = True,
  52. *,
  53. namespace: Optional[Union[str, bytes]] = None
  54. ) -> bool:
  55. """Globally associates a value with a given binary key.
  56. This only has an effect if the key does not already have a value.
  57. Returns:
  58. already_exists: whether the value already exists.
  59. """
  60. if isinstance(key, str):
  61. key = key.encode()
  62. if isinstance(value, str):
  63. value = value.encode()
  64. if isinstance(namespace, str):
  65. namespace = namespace.encode()
  66. assert (
  67. isinstance(key, bytes)
  68. and isinstance(value, bytes)
  69. and isinstance(overwrite, bool)
  70. )
  71. return global_gcs_client.internal_kv_put(key, value, overwrite, namespace) == 0
  72. @client_mode_hook
  73. def _internal_kv_del(
  74. key: Union[str, bytes],
  75. *,
  76. del_by_prefix: bool = False,
  77. namespace: Optional[Union[str, bytes]] = None
  78. ) -> int:
  79. if isinstance(key, str):
  80. key = key.encode()
  81. if isinstance(namespace, str):
  82. namespace = namespace.encode()
  83. assert isinstance(key, bytes)
  84. return global_gcs_client.internal_kv_del(key, del_by_prefix, namespace)
  85. @client_mode_hook
  86. def _internal_kv_list(
  87. prefix: Union[str, bytes], *, namespace: Optional[Union[str, bytes]] = None
  88. ) -> List[bytes]:
  89. """List all keys in the internal KV store that start with the prefix."""
  90. if isinstance(prefix, str):
  91. prefix = prefix.encode()
  92. if isinstance(namespace, str):
  93. namespace = namespace.encode()
  94. return global_gcs_client.internal_kv_keys(prefix, namespace)