| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482 |
- import logging
- import os
- import pathlib
- from typing import Dict, List, Optional
- import ray._private.ray_constants as ray_constants
- from ray._private.resource_isolation_config import ResourceIsolationConfig
- from ray._private.utils import get_ray_client_dependency_error
- logger = logging.getLogger(__name__)
- class RayParams:
- """A class used to store the parameters used by Ray.
- Attributes:
- redis_address: The address of the Redis server to connect to. If
- this address is not provided, then this command will start Redis, a
- raylet, a plasma store, a plasma manager, and some workers.
- It will also kill these processes when Python exits.
- redis_port: The port that the primary Redis shard should listen
- to. If None, then it will fall back to
- ray._private.ray_constants.DEFAULT_PORT, or a random port if the default is
- not available.
- redis_shard_ports: A list of the ports to use for the non-primary Redis
- shards. If None, then it will fall back to the ports right after
- redis_port, or random ports if those are not available.
- num_cpus: Number of CPUs to configure the raylet with.
- num_gpus: Number of GPUs to configure the raylet with.
- resources: A dictionary mapping the name of a resource to the quantity
- of that resource available.
- labels: The key-value labels of the node.
- memory: Total available memory for workers requesting memory.
- available_memory_bytes: The memory available for use on this node in bytes.
- object_store_memory: The amount of memory (in bytes) to start the
- object store with.
- object_manager_port int: The port to use for the object manager.
- node_manager_port: The port to use for the node manager.
- gcs_server_port: The port to use for the GCS server.
- node_ip_address: The IP address of the node that we are on.
- min_worker_port: The lowest port number that workers will bind
- on. If not set or set to 0, random ports will be chosen.
- max_worker_port: The highest port number that workers will bind
- on. If set, min_worker_port must also be set.
- worker_port_list: An explicit list of ports to be used for
- workers (comma-separated). Overrides min_worker_port and
- max_worker_port.
- ray_client_server_port: The port number the ray client server
- will bind on. If not set, the ray client server will not
- be started.
- redirect_output: True if stdout and stderr for non-worker
- processes should be redirected to files and false otherwise.
- log_to_stderr: If set, controls whether non-worker stdout/stderr should be
- written to stderr (True) or redirected to log files (False). This is the
- preferred replacement for the deprecated `redirect_output` field.
- external_addresses: The address of external Redis server to
- connect to, in format of "ip1:port1,ip2:port2,...". If this
- address is provided, then ray won't start Redis instances in the
- head node but use external Redis server(s) instead.
- num_redis_shards: The number of Redis shards to start in addition to
- the primary Redis shard.
- redis_max_clients: If provided, attempt to configure Redis with this
- maxclients number.
- redis_username: Prevents external clients without the username
- from connecting to Redis if provided.
- redis_password: Prevents external clients without the password
- from connecting to Redis if provided.
- plasma_directory: A directory where the Plasma memory mapped files will
- be created.
- object_spilling_directory: The path to spill objects to. The same path will
- be used as the object store fallback directory as well.
- worker_path: The path of the source code that will be run by the
- worker.
- setup_worker_path: The path of the Python file that will set up
- the environment for the worker process.
- huge_pages: Boolean flag indicating whether to start the Object
- Store with hugetlbfs support. Requires plasma_directory.
- include_dashboard: Boolean flag indicating whether to start the web
- UI, which displays the status of the Ray cluster. If this value is
- None, then the UI will be started if the relevant dependencies are
- present.
- dashboard_host: The host to bind the web UI server to. Can either be
- localhost (127.0.0.1) or 0.0.0.0 (available from all interfaces).
- By default, this is set to localhost to prevent access from
- external machines.
- dashboard_port: The port to bind the dashboard server to.
- Defaults to 8265.
- dashboard_agent_listen_port: The port for dashboard agents to listen on
- for HTTP requests.
- Defaults to 52365.
- runtime_env_agent_port: The port at which the runtime env agent
- listens to for HTTP.
- Defaults to random available port.
- plasma_store_socket_name: If provided, it specifies the socket
- name used by the plasma store.
- raylet_socket_name: If provided, it specifies the socket path
- used by the raylet process.
- temp_dir: If provided, it will specify the root temporary
- directory for the Ray process. Must be an absolute path.
- runtime_env_dir_name: If provided, specifies the directory that
- will be created in the session dir to hold runtime_env files.
- include_log_monitor: If True, then start a log monitor to
- monitor the log files for all processes on this node and push their
- contents to Redis.
- autoscaling_config: path to autoscaling config file.
- metrics_agent_port: The port to bind metrics agent.
- metrics_export_port: The port at which metrics are exposed
- through a Prometheus endpoint.
- no_monitor: If True, the ray autoscaler monitor for this cluster
- will not be started.
- _system_config: Configuration for overriding RayConfig
- defaults. Used to set system configuration and for experimental Ray
- core feature flags.
- enable_object_reconstruction: Enable plasma reconstruction on
- failure.
- ray_debugger_external: If true, make the Ray debugger for a
- worker available externally to the node it is running on. This will
- bind on 0.0.0.0 instead of localhost.
- env_vars: Override environment variables for the raylet.
- session_name: The current Ray session name.
- webui: The url of the UI.
- cluster_id: The cluster ID in hex string.
- resource_isolation_config: settings for cgroupv2 based isolation of ray
- system processes (defaults to no isolation if config not provided)
- """
- def __init__(
- self,
- redis_address: Optional[str] = None,
- gcs_address: Optional[str] = None,
- num_cpus: Optional[int] = None,
- num_gpus: Optional[int] = None,
- resources: Optional[Dict[str, float]] = None,
- labels: Optional[Dict[str, str]] = None,
- memory: Optional[float] = None,
- available_memory_bytes: Optional[int] = None,
- object_store_memory: Optional[float] = None,
- redis_port: Optional[int] = None,
- redis_shard_ports: Optional[List[int]] = None,
- object_manager_port: Optional[int] = None,
- node_manager_port: int = 0,
- gcs_server_port: Optional[int] = None,
- node_ip_address: Optional[str] = None,
- node_name: Optional[str] = None,
- min_worker_port: Optional[int] = None,
- max_worker_port: Optional[int] = None,
- worker_port_list: Optional[List[int]] = None,
- ray_client_server_port: Optional[int] = None,
- driver_mode=None,
- redirect_output: Optional[bool] = None,
- log_to_stderr: Optional[bool] = None,
- external_addresses: Optional[List[str]] = None,
- num_redis_shards: Optional[int] = None,
- redis_max_clients: Optional[int] = None,
- redis_username: Optional[str] = ray_constants.REDIS_DEFAULT_USERNAME,
- redis_password: Optional[str] = ray_constants.REDIS_DEFAULT_PASSWORD,
- plasma_directory: Optional[str] = None,
- object_spilling_directory: Optional[str] = None,
- worker_path: Optional[str] = None,
- setup_worker_path: Optional[str] = None,
- huge_pages: Optional[bool] = False,
- include_dashboard: Optional[bool] = None,
- dashboard_host: Optional[str] = ray_constants.DEFAULT_DASHBOARD_IP,
- dashboard_port: Optional[bool] = ray_constants.DEFAULT_DASHBOARD_PORT,
- dashboard_agent_listen_port: Optional[
- int
- ] = ray_constants.DEFAULT_DASHBOARD_AGENT_LISTEN_PORT,
- runtime_env_agent_port: Optional[int] = None,
- plasma_store_socket_name: Optional[str] = None,
- raylet_socket_name: Optional[str] = None,
- temp_dir: Optional[str] = None,
- runtime_env_dir_name: Optional[str] = None,
- include_log_monitor: Optional[str] = None,
- autoscaling_config: Optional[str] = None,
- ray_debugger_external: bool = False,
- _system_config: Optional[Dict[str, str]] = None,
- enable_object_reconstruction: Optional[bool] = False,
- metrics_agent_port: Optional[int] = None,
- metrics_export_port: Optional[int] = None,
- tracing_startup_hook=None,
- no_monitor: Optional[bool] = False,
- env_vars: Optional[Dict[str, str]] = None,
- session_name: Optional[str] = None,
- webui: Optional[str] = None,
- cluster_id: Optional[str] = None,
- node_id: Optional[str] = None,
- resource_isolation_config: Optional[ResourceIsolationConfig] = None,
- ):
- self.redis_address = redis_address
- self.gcs_address = gcs_address
- self.num_cpus = num_cpus
- self.num_gpus = num_gpus
- self.memory = memory
- self.available_memory_bytes = available_memory_bytes
- self.object_store_memory = object_store_memory
- self.resources = resources
- self.redis_port = redis_port
- self.redis_shard_ports = redis_shard_ports
- self.object_manager_port = object_manager_port
- self.node_manager_port = node_manager_port
- self.gcs_server_port = gcs_server_port
- self.node_ip_address = node_ip_address
- self.node_name = node_name
- self.min_worker_port = min_worker_port
- self.max_worker_port = max_worker_port
- self.worker_port_list = worker_port_list
- self.ray_client_server_port = ray_client_server_port
- self.driver_mode = driver_mode
- self.redirect_output = redirect_output
- self.log_to_stderr = log_to_stderr
- self.external_addresses = external_addresses
- self.num_redis_shards = num_redis_shards
- self.redis_max_clients = redis_max_clients
- self.redis_username = redis_username
- self.redis_password = redis_password
- self.plasma_directory = plasma_directory
- self.object_spilling_directory = object_spilling_directory
- self.worker_path = worker_path
- self.setup_worker_path = setup_worker_path
- self.huge_pages = huge_pages
- self.include_dashboard = include_dashboard
- self.dashboard_host = dashboard_host
- self.dashboard_port = dashboard_port
- self.dashboard_agent_listen_port = dashboard_agent_listen_port
- self.runtime_env_agent_port = runtime_env_agent_port
- self.plasma_store_socket_name = plasma_store_socket_name
- self.raylet_socket_name = raylet_socket_name
- self.temp_dir = temp_dir
- self.runtime_env_dir_name = (
- runtime_env_dir_name or ray_constants.DEFAULT_RUNTIME_ENV_DIR_NAME
- )
- self.include_log_monitor = include_log_monitor
- self.autoscaling_config = autoscaling_config
- self.metrics_agent_port = metrics_agent_port
- self.metrics_export_port = metrics_export_port
- self.tracing_startup_hook = tracing_startup_hook
- self.no_monitor = no_monitor
- self.ray_debugger_external = ray_debugger_external
- self.env_vars = env_vars
- self.session_name = session_name
- self.webui = webui
- self._system_config = _system_config or {}
- self._enable_object_reconstruction = enable_object_reconstruction
- self.labels = labels
- self._check_usage()
- self.cluster_id = cluster_id
- self.node_id = node_id
- self.resource_isolation_config = resource_isolation_config
- if not self.resource_isolation_config:
- self.resource_isolation_config = ResourceIsolationConfig(
- object_store_memory=object_store_memory, enable_resource_isolation=False
- )
- # Set the internal config options for object reconstruction.
- if enable_object_reconstruction:
- # Turn off object pinning.
- if self._system_config is None:
- self._system_config = dict()
- print(self._system_config)
- self._system_config["lineage_pinning_enabled"] = True
- def update(self, **kwargs):
- """Update the settings according to the keyword arguments.
- Args:
- kwargs: The keyword arguments to set corresponding fields.
- """
- for arg in kwargs:
- if hasattr(self, arg):
- setattr(self, arg, kwargs[arg])
- else:
- raise ValueError(f"Invalid RayParams parameter in update: {arg}")
- self._check_usage()
- def update_if_absent(self, **kwargs):
- """Update the settings when the target fields are None.
- Args:
- kwargs: The keyword arguments to set corresponding fields.
- """
- for arg in kwargs:
- if hasattr(self, arg):
- if getattr(self, arg) is None:
- setattr(self, arg, kwargs[arg])
- else:
- raise ValueError(
- f"Invalid RayParams parameter in update_if_absent: {arg}"
- )
- self._check_usage()
- def update_pre_selected_port(self):
- """Update the pre-selected port information
- Returns:
- The dictionary mapping of component -> ports.
- """
- def wrap_port(port):
- # 0 port means select a random port for the grpc server.
- if port is None or port == 0:
- return []
- else:
- return [port]
- # Create a dictionary of the component -> port mapping.
- pre_selected_ports = {
- "gcs": wrap_port(self.redis_port),
- "object_manager": wrap_port(self.object_manager_port),
- "node_manager": wrap_port(self.node_manager_port),
- "gcs_server": wrap_port(self.gcs_server_port),
- "client_server": wrap_port(self.ray_client_server_port),
- "dashboard": wrap_port(self.dashboard_port),
- "dashboard_agent_grpc": wrap_port(self.metrics_agent_port),
- "dashboard_agent_http": wrap_port(self.dashboard_agent_listen_port),
- "runtime_env_agent": wrap_port(self.runtime_env_agent_port),
- "metrics_export": wrap_port(self.metrics_export_port),
- }
- redis_shard_ports = self.redis_shard_ports
- if redis_shard_ports is None:
- redis_shard_ports = []
- pre_selected_ports["redis_shards"] = redis_shard_ports
- if self.worker_port_list is None:
- if self.min_worker_port is not None and self.max_worker_port is not None:
- pre_selected_ports["worker_ports"] = list(
- range(self.min_worker_port, self.max_worker_port + 1)
- )
- else:
- # The dict is not updated when it requires random ports.
- pre_selected_ports["worker_ports"] = []
- else:
- pre_selected_ports["worker_ports"] = [
- int(port) for port in self.worker_port_list.split(",")
- ]
- # Update the pre selected port set.
- self.reserved_ports = set()
- for comp, port_list in pre_selected_ports.items():
- for port in port_list:
- if port in self.reserved_ports:
- raise ValueError(
- f"Ray component {comp} is trying to use "
- f"a port number {port} that is used by other components.\n"
- f"Port information: {self._format_ports(pre_selected_ports)}\n"
- "If you allocate ports, please make sure the same port "
- "is not used by multiple components."
- )
- self.reserved_ports.add(port)
- def _check_usage(self):
- if self.worker_port_list is not None:
- for port_str in self.worker_port_list.split(","):
- try:
- port = int(port_str)
- except ValueError as e:
- raise ValueError(
- "worker_port_list must be a comma-separated "
- f"list of integers: {e}"
- ) from None
- if port < 1024 or port > 65535:
- raise ValueError(
- "Ports in worker_port_list must be "
- f"between 1024 and 65535. Got: {port}"
- )
- # Used primarily for testing.
- if os.environ.get("RAY_USE_RANDOM_PORTS", False):
- if self.min_worker_port is None and self.max_worker_port is None:
- self.min_worker_port = 0
- self.max_worker_port = 0
- if self.min_worker_port is not None:
- if self.min_worker_port != 0 and (
- self.min_worker_port < 1024 or self.min_worker_port > 65535
- ):
- raise ValueError(
- "min_worker_port must be 0 or an integer between 1024 and 65535."
- )
- if self.max_worker_port is not None:
- if self.min_worker_port is None:
- raise ValueError(
- "If max_worker_port is set, min_worker_port must also be set."
- )
- elif self.max_worker_port != 0:
- if self.max_worker_port < 1024 or self.max_worker_port > 65535:
- raise ValueError(
- "max_worker_port must be 0 or an integer between "
- "1024 and 65535."
- )
- elif self.max_worker_port <= self.min_worker_port:
- raise ValueError(
- "max_worker_port must be higher than min_worker_port."
- )
- if self.ray_client_server_port is not None:
- if get_ray_client_dependency_error() is not None:
- raise ValueError(
- "Ray Client requires pip package `ray[client]`. "
- "If you installed the minimal Ray (e.g. `pip install ray`), "
- "please reinstall by executing `pip install ray[client]`."
- )
- if (
- self.ray_client_server_port < 1024
- or self.ray_client_server_port > 65535
- ):
- raise ValueError(
- "ray_client_server_port must be an integer "
- "between 1024 and 65535."
- )
- if self.runtime_env_agent_port is not None:
- if self.runtime_env_agent_port != 0 and (
- self.runtime_env_agent_port < 1024
- or self.runtime_env_agent_port > 65535
- ):
- raise ValueError(
- "runtime_env_agent_port must be 0 (auto-assign) or an integer "
- "between 1024 and 65535."
- )
- if self.resources is not None:
- def build_error(resource, alternative):
- return (
- f"{self.resources} -> `{resource}` cannot be a "
- "custom resource because it is one of the default resources "
- f"({ray_constants.DEFAULT_RESOURCES}). "
- f"Use `{alternative}` instead. For example, use `ray start "
- f"--{alternative.replace('_', '-')}=1` instead of "
- f"`ray start --resources={{'{resource}': 1}}`"
- )
- assert "CPU" not in self.resources, build_error("CPU", "num_cpus")
- assert "GPU" not in self.resources, build_error("GPU", "num_gpus")
- assert "memory" not in self.resources, build_error("memory", "memory")
- assert "object_store_memory" not in self.resources, build_error(
- "object_store_memory", "object_store_memory"
- )
- if self.redirect_output is not None:
- raise DeprecationWarning("The redirect_output argument is deprecated.")
- if self.temp_dir is not None and not os.path.isabs(self.temp_dir):
- raise ValueError("temp_dir must be absolute path or None.")
- if self.temp_dir is not None and os.getenv("VIRTUAL_ENV"):
- is_relative = True
- try:
- (
- pathlib.Path(self.temp_dir)
- .resolve()
- .relative_to(pathlib.Path(os.getenv("VIRTUAL_ENV")).resolve())
- )
- except ValueError:
- is_relative = False
- if is_relative:
- raise ValueError(
- "temp_dir must not be child directory of virtualenv root"
- )
- def _format_ports(self, pre_selected_ports):
- """Format the pre-selected ports information to be more human-readable."""
- ports = pre_selected_ports.copy()
- for comp, port_list in ports.items():
- if len(port_list) == 1:
- ports[comp] = port_list[0]
- elif len(port_list) == 0:
- # Nothing is selected, meaning it will be randomly selected.
- ports[comp] = "random"
- elif comp == "worker_ports":
- min_port = port_list[0]
- max_port = port_list[len(port_list) - 1]
- if len(port_list) < 50:
- port_range_str = str(port_list)
- else:
- port_range_str = f"from {min_port} to {max_port}"
- ports[comp] = f"{len(port_list)} ports {port_range_str}"
- return ports
|