| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770 |
- import asyncio
- import collections
- import copy
- import errno
- import importlib
- import inspect
- import logging
- import random
- import re
- import time
- import uuid
- from decimal import ROUND_HALF_UP, Decimal
- from enum import Enum
- from functools import wraps
- from typing import Any, Callable, Dict, List, Optional, TypeVar, Union
- import requests
- import ray
- import ray.util.serialization_addons
- from ray._common.constants import HEAD_NODE_RESOURCE_NAME
- from ray._common.utils import get_random_alphanumeric_string, import_attr
- from ray._private.worker import LOCAL_MODE, SCRIPT_MODE
- from ray._raylet import MessagePackSerializer
- from ray.actor import ActorHandle
- from ray.serve._private.common import RequestMetadata, ServeComponentType
- from ray.serve._private.constants import HTTP_PROXY_TIMEOUT, SERVE_LOGGER_NAME
- from ray.types import ObjectRef
- from ray.util.serialization import StandaloneSerializationContext
- try:
- import pandas as pd
- except ImportError:
- pd = None
- try:
- import numpy as np
- except ImportError:
- np = None
- FILE_NAME_REGEX = r"[^\x20-\x7E]|[<>:\"/\\|?*]"
- MESSAGE_PACK_OFFSET = 9
- def asyncio_grpc_exception_handler(loop, context):
- """Exception handler to filter out false positive BlockingIOErrors from gRPC."""
- exc = context.get("exception")
- msg = context.get("message")
- if (
- exc
- and isinstance(exc, BlockingIOError)
- and exc.errno == errno.EAGAIN
- and "PollerCompletionQueue._handle_events" in msg
- ):
- return
- loop.default_exception_handler(context)
- def validate_ssl_config(
- ssl_certfile: Optional[str], ssl_keyfile: Optional[str]
- ) -> None:
- """Validate SSL configuration for HTTPS support.
- Args:
- ssl_certfile: Path to SSL certificate file
- ssl_keyfile: Path to SSL private key file
- Raises:
- ValueError: If only one of ssl_certfile or ssl_keyfile is provided
- """
- if (ssl_certfile and not ssl_keyfile) or (ssl_keyfile and not ssl_certfile):
- raise ValueError(
- "Both ssl_keyfile and ssl_certfile must be provided together "
- "to enable HTTPS."
- )
- GENERATOR_COMPOSITION_NOT_SUPPORTED_ERROR = RuntimeError(
- "Streaming deployment handle results cannot be passed to "
- "downstream handle calls. If you have a use case requiring "
- "this feature, please file a feature request on GitHub."
- )
- # Use a global singleton enum to emulate default options. We cannot use None
- # for those option because None is a valid new value.
- class DEFAULT(Enum):
- VALUE = 1
- class DeploymentOptionUpdateType(str, Enum):
- # Nothing needs to be done other than setting the target state.
- LightWeight = "LightWeight"
- # Each DeploymentReplica instance (tracked in DeploymentState) uses certain options
- # from the deployment config. These values need to be updated in DeploymentReplica.
- NeedsReconfigure = "NeedsReconfigure"
- # Options that are sent to the replica actor. If changed, reconfigure() on the actor
- # needs to be called to update these values.
- NeedsActorReconfigure = "NeedsActorReconfigure"
- # If changed, restart all replicas.
- HeavyWeight = "HeavyWeight"
- # Type alias: objects that can be DEFAULT.VALUE have type Default[T]
- T = TypeVar("T")
- Default = Union[DEFAULT, T]
- logger = logging.getLogger(SERVE_LOGGER_NAME)
- # Format for component files
- FILE_FMT = "{component_name}_{component_id}{suffix}"
- class _ServeCustomEncoders:
- """Group of custom encoders for common types that's not handled by FastAPI."""
- @staticmethod
- def encode_np_array(obj):
- assert isinstance(obj, np.ndarray)
- if obj.dtype.kind == "f": # floats
- obj = obj.astype(float)
- if obj.dtype.kind in {"i", "u"}: # signed and unsigned integers.
- obj = obj.astype(int)
- return obj.tolist()
- @staticmethod
- def encode_np_scaler(obj):
- assert isinstance(obj, np.generic)
- return obj.item()
- @staticmethod
- def encode_exception(obj):
- assert isinstance(obj, Exception)
- return str(obj)
- @staticmethod
- def encode_pandas_dataframe(obj):
- assert isinstance(obj, pd.DataFrame)
- return obj.to_dict(orient="records")
- serve_encoders = {Exception: _ServeCustomEncoders.encode_exception}
- if np is not None:
- serve_encoders[np.ndarray] = _ServeCustomEncoders.encode_np_array
- serve_encoders[np.generic] = _ServeCustomEncoders.encode_np_scaler
- if pd is not None:
- serve_encoders[pd.DataFrame] = _ServeCustomEncoders.encode_pandas_dataframe
- @ray.remote(num_cpus=0)
- def block_until_http_ready(
- http_endpoint,
- backoff_time_s=1,
- check_ready=None,
- timeout=HTTP_PROXY_TIMEOUT,
- ):
- http_is_ready = False
- start_time = time.time()
- while not http_is_ready:
- try:
- resp = requests.get(http_endpoint)
- assert resp.status_code == 200
- if check_ready is None:
- http_is_ready = True
- else:
- http_is_ready = check_ready(resp)
- except Exception:
- pass
- if 0 < timeout < time.time() - start_time:
- raise TimeoutError("HTTP proxy not ready after {} seconds.".format(timeout))
- time.sleep(backoff_time_s)
- def get_random_string(length: int = 8):
- return get_random_alphanumeric_string(length)
- def format_actor_name(actor_name, *modifiers):
- name = actor_name
- for modifier in modifiers:
- name += "-{}".format(modifier)
- return name
- CLASS_WRAPPER_METADATA_ATTRS = (
- "__name__",
- "__qualname__",
- "__module__",
- "__doc__",
- "__annotations__",
- )
- def copy_class_metadata(wrapper_cls, target_cls) -> None:
- """Copy common class-level metadata onto a wrapper class."""
- for attr in CLASS_WRAPPER_METADATA_ATTRS:
- if attr == "__annotations__":
- target_annotations = getattr(target_cls, "__annotations__", None)
- if target_annotations:
- merged_annotations = dict(
- wrapper_cls.__dict__.get("__annotations__", {})
- )
- for key, value in target_annotations.items():
- merged_annotations.setdefault(key, value)
- wrapper_cls.__annotations__ = merged_annotations
- continue
- if hasattr(target_cls, attr):
- setattr(wrapper_cls, attr, getattr(target_cls, attr))
- wrapper_cls.__wrapped__ = target_cls
- def ensure_serialization_context():
- """Ensure the serialization addons on registered, even when Ray has not
- been started."""
- ctx = StandaloneSerializationContext()
- ray.util.serialization_addons.apply(ctx)
- def msgpack_serialize(obj):
- ctx = ray._private.worker.global_worker.get_serialization_context()
- buffer = ctx.serialize(obj)
- serialized = buffer.to_bytes()
- return serialized
- def msgpack_deserialize(data):
- # todo: Ray does not provide a msgpack deserialization api.
- try:
- obj = MessagePackSerializer.loads(data[MESSAGE_PACK_OFFSET:], None)
- except Exception:
- raise
- return obj
- def merge_dict(dict1, dict2):
- if dict1 is None and dict2 is None:
- return None
- if dict1 is None:
- dict1 = dict()
- if dict2 is None:
- dict2 = dict()
- result = dict()
- for key in dict1.keys() | dict2.keys():
- result[key] = sum([e.get(key, 0) for e in (dict1, dict2)])
- return result
- def parse_import_path(import_path: str):
- """
- Takes in an import_path of form:
- [subdirectory 1].[subdir 2]...[subdir n].[file name].[attribute name]
- Parses this path and returns the module name (everything before the last
- dot) and attribute name (everything after the last dot), such that the
- attribute can be imported using "from module_name import attr_name".
- """
- nodes = import_path.split(".")
- if len(nodes) < 2:
- raise ValueError(
- f"Got {import_path} as import path. The import path "
- f"should at least specify the file name and "
- f"attribute name connected by a dot."
- )
- return ".".join(nodes[:-1]), nodes[-1]
- def override_runtime_envs_except_env_vars(parent_env: Dict, child_env: Dict) -> Dict:
- """Creates a runtime_env dict by merging a parent and child environment.
- This method is not destructive. It leaves the parent and child envs
- the same.
- The merge is a shallow update where the child environment inherits the
- parent environment's settings. If the child environment specifies any
- env settings, those settings take precdence over the parent.
- - Note: env_vars are a special case. The child's env_vars are combined
- with the parent.
- Args:
- parent_env: The environment to inherit settings from.
- child_env: The environment with override settings.
- Returns: A new dictionary containing the merged runtime_env settings.
- Raises:
- TypeError: If a dictionary is not passed in for parent_env or child_env.
- """
- if not isinstance(parent_env, Dict):
- raise TypeError(
- f'Got unexpected type "{type(parent_env)}" for parent_env. '
- "parent_env must be a dictionary."
- )
- if not isinstance(child_env, Dict):
- raise TypeError(
- f'Got unexpected type "{type(child_env)}" for child_env. '
- "child_env must be a dictionary."
- )
- defaults = copy.deepcopy(parent_env)
- overrides = copy.deepcopy(child_env)
- default_env_vars = defaults.get("env_vars", {})
- override_env_vars = overrides.get("env_vars", {})
- defaults.update(overrides)
- default_env_vars.update(override_env_vars)
- defaults["env_vars"] = default_env_vars
- return defaults
- class JavaActorHandleProxy:
- """Wraps actor handle and translate snake_case to camelCase."""
- def __init__(self, handle: ActorHandle):
- self.handle = handle
- self._available_attrs = set(dir(self.handle))
- def __getattr__(self, key: str):
- if key in self._available_attrs:
- camel_case_key = key
- else:
- components = key.split("_")
- camel_case_key = components[0] + "".join(x.title() for x in components[1:])
- return getattr(self.handle, camel_case_key)
- def require_packages(packages: List[str]):
- """Decorator making sure function run in specified environments
- Examples:
- >>> from ray.serve._private.utils import require_packages
- >>> @require_packages(["numpy", "package_a"]) # doctest: +SKIP
- ... def func(): # doctest: +SKIP
- ... import numpy as np # doctest: +SKIP
- ... ... # doctest: +SKIP
- >>> func() # doctest: +SKIP
- ImportError: func requires ["numpy", "package_a"] but
- ["package_a"] are not available, please pip install them.
- """
- def decorator(func):
- def check_import_once():
- if not hasattr(func, "_require_packages_checked"):
- missing_packages = []
- for package in packages:
- try:
- importlib.import_module(package)
- except ModuleNotFoundError:
- missing_packages.append(package)
- if len(missing_packages) > 0:
- raise ImportError(
- f"{func} requires packages {packages} to run but "
- f"{missing_packages} are missing. Please "
- "`pip install` them or add them to "
- "`runtime_env`."
- )
- func._require_packages_checked = True
- if inspect.iscoroutinefunction(func):
- @wraps(func)
- async def wrapped(*args, **kwargs):
- check_import_once()
- return await func(*args, **kwargs)
- elif inspect.isroutine(func):
- @wraps(func)
- def wrapped(*args, **kwargs):
- check_import_once()
- return func(*args, **kwargs)
- else:
- raise ValueError("Decorator expect callable functions.")
- return wrapped
- return decorator
- def in_interactive_shell():
- # Taken from:
- # https://stackoverflow.com/questions/15411967/how-can-i-check-if-code-is-executed-in-the-ipython-notebook
- import __main__ as main
- return not hasattr(main, "__file__")
- def snake_to_camel_case(snake_str: str) -> str:
- """Convert a snake case string to camel case."""
- words = snake_str.strip("_").split("_")
- return words[0] + "".join(word[:1].upper() + word[1:] for word in words[1:])
- def check_obj_ref_ready_nowait(obj_ref: ObjectRef) -> bool:
- """Check if ray object reference is ready without waiting for it."""
- finished, _ = ray.wait([obj_ref], timeout=0)
- return len(finished) == 1
- def extract_self_if_method_call(args: List[Any], func: Callable) -> Optional[object]:
- """Check if this is a method rather than a function.
- Does this by checking to see if `func` is the attribute of the first
- (`self`) argument under `func.__name__`. Unfortunately, this is the most
- robust solution to this I was able to find. It would also be preferable
- to do this check when the decorator runs, rather than when the method is.
- Returns the `self` object if it's a method call, else None.
- Arguments:
- args: arguments to the function/method call.
- func: the unbound function that was called.
- """
- if len(args) > 0:
- method = getattr(args[0], func.__name__, False)
- if method:
- wrapped = getattr(method, "__wrapped__", False)
- if wrapped and wrapped == func:
- return args[0]
- return None
- def call_function_from_import_path(import_path: str) -> Any:
- """Call the function given import path.
- Args:
- import_path: The import path of the function to call.
- Raises:
- ValueError: If the import path is invalid.
- TypeError: If the import path is not callable.
- RuntimeError: if the function raise exeception during execution.
- Returns:
- The result of the function call.
- """
- try:
- callback_func = import_attr(import_path)
- except Exception as e:
- raise ValueError(f"The import path {import_path} cannot be imported: {e}")
- if not callable(callback_func):
- raise TypeError(f"The import path {import_path} is not callable.")
- try:
- return callback_func()
- except Exception as e:
- raise RuntimeError(f"The function {import_path} raised an exception: {e}")
- def get_head_node_id() -> str:
- """Get the head node id.
- Iterate through all nodes in the ray cluster and return the node id of the first
- alive node with head node resource.
- """
- head_node_id = None
- for node in ray.nodes():
- if HEAD_NODE_RESOURCE_NAME in node["Resources"] and node["Alive"]:
- head_node_id = node["NodeID"]
- break
- assert head_node_id is not None, "Cannot find alive head node."
- return head_node_id
- def calculate_remaining_timeout(
- *,
- timeout_s: Optional[float],
- start_time_s: float,
- curr_time_s: float,
- ) -> Optional[float]:
- """Get the timeout remaining given an overall timeout, start time, and curr time.
- If the timeout passed in was `None` or negative, will always return that timeout
- directly.
- If the timeout is >= 0, the returned remaining timeout always be >= 0.
- """
- if timeout_s is None or timeout_s < 0:
- return timeout_s
- time_since_start_s = curr_time_s - start_time_s
- return max(0, timeout_s - time_since_start_s)
- def get_all_live_placement_group_names() -> List[str]:
- """Fetch and parse the Ray placement group table for live placement group names.
- Placement groups are filtered based on their `scheduling_state`; any placement
- group not in the "REMOVED" state is considered live.
- """
- placement_group_table = ray.util.placement_group_table()
- live_pg_names = []
- for entry in placement_group_table.values():
- pg_name = entry.get("name", "")
- if (
- pg_name
- and entry.get("stats", {}).get("scheduling_state", "UNKNOWN") != "REMOVED"
- ):
- live_pg_names.append(pg_name)
- return live_pg_names
- def get_current_actor_id() -> str:
- """Gets the ID of the calling actor.
- If this is called in a driver, returns "DRIVER."
- If otherwise called outside of an actor, returns an empty string.
- This function hangs when GCS is down due to the `ray.get_runtime_context()`
- call.
- """
- worker_mode = ray.get_runtime_context().worker.mode
- if worker_mode in {SCRIPT_MODE, LOCAL_MODE}:
- return "DRIVER"
- else:
- try:
- actor_id = ray.get_runtime_context().get_actor_id()
- if actor_id is None:
- return ""
- else:
- return actor_id
- except Exception:
- return ""
- def is_running_in_asyncio_loop() -> bool:
- try:
- asyncio.get_running_loop()
- return True
- except RuntimeError:
- return False
- def get_capacity_adjusted_num_replicas(
- num_replicas: int, target_capacity: Optional[float]
- ) -> int:
- """Return the `num_replicas` adjusted by the `target_capacity`.
- The output will only ever be 0 if `target_capacity` is 0 or `num_replicas` is
- 0 (to support autoscaling deployments using scale-to-zero).
- Rather than using the default `round` behavior in Python, which rounds half to
- even, uses the `decimal` module to round half up (standard rounding behavior).
- """
- if target_capacity is None or target_capacity == 100:
- return num_replicas
- if target_capacity == 0 or num_replicas == 0:
- return 0
- adjusted_num_replicas = Decimal(num_replicas * target_capacity) / Decimal(100.0)
- rounded_adjusted_num_replicas = adjusted_num_replicas.to_integral_value(
- rounding=ROUND_HALF_UP
- )
- return max(1, int(rounded_adjusted_num_replicas))
- def generate_request_id() -> str:
- # NOTE(edoakes): we use random.getrandbits because it reduces CPU overhead
- # significantly. This is less cryptographically secure but should be ok for
- # request ID generation.
- # See https://bugs.python.org/issue45556 for discussion.
- return str(uuid.UUID(int=random.getrandbits(128), version=4))
- def inside_ray_client_context() -> bool:
- return ray.util.client.ray.is_connected()
- def get_component_file_name(
- component_name: str,
- component_id: str,
- component_type: Optional[ServeComponentType],
- suffix: str = "",
- ) -> str:
- """Get the component's file name. Replaces special characters with underscores."""
- component_name = re.sub(FILE_NAME_REGEX, "_", component_name)
- # For DEPLOYMENT component type, we want to log the deployment name
- # instead of adding the component type to the component name.
- component_log_file_name = component_name
- if component_type is not None:
- component_log_file_name = f"{component_type.value}_{component_name}"
- if component_type != ServeComponentType.REPLICA:
- component_name = f"{component_type}_{component_name}"
- file_name = FILE_FMT.format(
- component_name=component_log_file_name,
- component_id=component_id,
- suffix=suffix,
- )
- return file_name
- def validate_route_prefix(route_prefix: Union[DEFAULT, None, str]):
- if route_prefix is DEFAULT.VALUE or route_prefix is None:
- return
- if not route_prefix.startswith("/"):
- raise ValueError(
- f"Invalid route_prefix '{route_prefix}', "
- "must start with a forward slash ('/')."
- )
- if route_prefix != "/" and route_prefix.endswith("/"):
- raise ValueError(
- f"Invalid route_prefix '{route_prefix}', "
- "may not end with a trailing '/'."
- )
- if "{" in route_prefix or "}" in route_prefix:
- raise ValueError(
- f"Invalid route_prefix '{route_prefix}', may not contain wildcards."
- )
- async def await_deployment_response(deployment_response):
- return await deployment_response
- async def resolve_deployment_response(obj: Any, request_metadata: RequestMetadata):
- """Resolve `DeploymentResponse` objects to underlying object references.
- This enables composition without explicitly calling `_to_object_ref`.
- """
- from ray.serve.handle import DeploymentResponse, DeploymentResponseGenerator
- if isinstance(obj, DeploymentResponseGenerator):
- raise GENERATOR_COMPOSITION_NOT_SUPPORTED_ERROR
- elif isinstance(obj, DeploymentResponse):
- if request_metadata._by_reference and obj.by_reference:
- # If sending requests by reference, launch async task to
- # convert DeploymentResponse to an object ref
- return asyncio.create_task(obj._to_object_ref())
- else:
- # Otherwise, resolve DeploymentResponse directly to result
- return asyncio.create_task(await_deployment_response(obj))
- elif not request_metadata._by_reference and isinstance(obj, ray.ObjectRef):
- # If the router is sending requests by value (i.e. using gRPC),
- # resolve all Ray objects to mirror Ray behavior
- return asyncio.wrap_future(obj.future())
- def wait_for_interrupt() -> None:
- try:
- while True:
- # Block, letting Ray print logs to the terminal.
- time.sleep(10)
- except KeyboardInterrupt:
- logger.warning("Got KeyboardInterrupt, exiting...")
- # We need to re-raise KeyboardInterrupt, so serve components can be shutdown
- # from the main script.
- raise
- def is_grpc_enabled(grpc_config) -> bool:
- return grpc_config.port > 0 and len(grpc_config.grpc_servicer_functions) > 0
- class Semaphore:
- """Based on asyncio.Semaphore.
- This is a semaphore that can be used to limit the number of concurrent requests.
- Its maximum value is dynamic and is determined by the `get_value_fn` function.
- """
- def __init__(self, get_value_fn: Callable[[], int]):
- self._waiters = None
- self._value = 0
- self._get_value_fn = get_value_fn
- def __repr__(self):
- res = super().__repr__()
- extra = "locked" if self.locked() else f"unlocked, value:{self._value}"
- if self._waiters:
- extra = f"{extra}, waiters:{len(self._waiters)}"
- return f"<{res[1:-1]} [{extra}]>"
- async def __aenter__(self):
- await self.acquire()
- # We have no use for the "as ..." clause in the with
- # statement for locks.
- return None
- async def __aexit__(self, exc_type, exc, tb):
- self.release()
- def get_max_value(self):
- return self._get_value_fn()
- def locked(self):
- """Returns True if semaphore cannot be acquired immediately."""
- return self._value >= self.get_max_value() or (
- any(not w.cancelled() for w in (self._waiters or ()))
- )
- async def acquire(self):
- """Acquire a semaphore.
- If the internal counter is larger than zero on entry,
- decrement it by one and return True immediately. If it is
- zero on entry, block, waiting until some other coroutine has
- called release() to make it larger than 0, and then return
- True.
- """
- if not self.locked():
- self._value += 1
- return True
- if self._waiters is None:
- self._waiters = collections.deque()
- fut = asyncio.Future()
- self._waiters.append(fut)
- # Finally block should be called before the CancelledError
- # handling as we don't want CancelledError to call
- # _wake_up_first() and attempt to wake up itself.
- try:
- try:
- await fut
- finally:
- self._waiters.remove(fut)
- except asyncio.CancelledError:
- if not fut.cancelled():
- self._value -= 1
- self._wake_up_next()
- raise
- if self._value < self.get_max_value():
- self._wake_up_next()
- return True
- def release(self):
- """Release a semaphore, incrementing the internal counter by one.
- When it was zero on entry and another coroutine is waiting for it to
- become larger than zero again, wake up that coroutine.
- """
- self._value -= 1
- self._wake_up_next()
- def _wake_up_next(self):
- """Wake up the first waiter that isn't done."""
- if not self._waiters:
- return
- for fut in self._waiters:
- if not fut.done():
- self._value += 1
- fut.set_result(True)
- return
|