| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121 |
- from typing import List, Optional, Union
- from ray._private.client_mode_hook import client_mode_hook
- from ray._raylet import GcsClient
- _initialized = False
- global_gcs_client = None
- def _internal_kv_reset():
- global global_gcs_client, _initialized
- global_gcs_client = None
- _initialized = False
- def internal_kv_get_gcs_client():
- return global_gcs_client
- def _initialize_internal_kv(gcs_client: GcsClient):
- """Initialize the internal KV for use in other function calls."""
- global global_gcs_client, _initialized
- assert gcs_client is not None
- global_gcs_client = gcs_client
- _initialized = True
- @client_mode_hook
- def _internal_kv_initialized():
- return global_gcs_client is not None
- @client_mode_hook
- def _internal_kv_get(
- key: Union[str, bytes], *, namespace: Optional[Union[str, bytes]] = None
- ) -> bytes:
- """Fetch the value of a binary key."""
- if isinstance(key, str):
- key = key.encode()
- if isinstance(namespace, str):
- namespace = namespace.encode()
- assert isinstance(key, bytes)
- return global_gcs_client.internal_kv_get(key, namespace)
- @client_mode_hook
- def _internal_kv_exists(
- key: Union[str, bytes], *, namespace: Optional[Union[str, bytes]] = None
- ) -> bool:
- """Check key exists or not."""
- if isinstance(key, str):
- key = key.encode()
- if isinstance(namespace, str):
- namespace = namespace.encode()
- assert isinstance(key, bytes)
- return global_gcs_client.internal_kv_exists(key, namespace)
- @client_mode_hook
- def _pin_runtime_env_uri(uri: str, *, expiration_s: int) -> None:
- """Pin a runtime_env URI for expiration_s."""
- return global_gcs_client.pin_runtime_env_uri(uri, expiration_s)
- @client_mode_hook
- def _internal_kv_put(
- key: Union[str, bytes],
- value: Union[str, bytes],
- overwrite: bool = True,
- *,
- namespace: Optional[Union[str, bytes]] = None
- ) -> bool:
- """Globally associates a value with a given binary key.
- This only has an effect if the key does not already have a value.
- Returns:
- already_exists: whether the value already exists.
- """
- if isinstance(key, str):
- key = key.encode()
- if isinstance(value, str):
- value = value.encode()
- if isinstance(namespace, str):
- namespace = namespace.encode()
- assert (
- isinstance(key, bytes)
- and isinstance(value, bytes)
- and isinstance(overwrite, bool)
- )
- return global_gcs_client.internal_kv_put(key, value, overwrite, namespace) == 0
- @client_mode_hook
- def _internal_kv_del(
- key: Union[str, bytes],
- *,
- del_by_prefix: bool = False,
- namespace: Optional[Union[str, bytes]] = None
- ) -> int:
- if isinstance(key, str):
- key = key.encode()
- if isinstance(namespace, str):
- namespace = namespace.encode()
- assert isinstance(key, bytes)
- return global_gcs_client.internal_kv_del(key, del_by_prefix, namespace)
- @client_mode_hook
- def _internal_kv_list(
- prefix: Union[str, bytes], *, namespace: Optional[Union[str, bytes]] = None
- ) -> List[bytes]:
- """List all keys in the internal KV store that start with the prefix."""
- if isinstance(prefix, str):
- prefix = prefix.encode()
- if isinstance(namespace, str):
- namespace = namespace.encode()
- return global_gcs_client.internal_kv_keys(prefix, namespace)
|