deploy_utils.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. import hashlib
  2. import json
  3. import logging
  4. import time
  5. from typing import Any, Dict, Optional, Union
  6. import ray
  7. import ray.util.serialization_addons
  8. from ray.serve._private.common import DeploymentID
  9. from ray.serve._private.config import DeploymentConfig, ReplicaConfig
  10. from ray.serve._private.constants import SERVE_LOGGER_NAME
  11. from ray.serve._private.deployment_info import DeploymentInfo
  12. from ray.serve.schema import ServeApplicationSchema
  13. logger = logging.getLogger(SERVE_LOGGER_NAME)
  14. def get_deploy_args(
  15. name: str,
  16. replica_config: ReplicaConfig,
  17. ingress: bool = False,
  18. deployment_config: Optional[Union[DeploymentConfig, Dict[str, Any]]] = None,
  19. version: Optional[str] = None,
  20. route_prefix: Optional[str] = None,
  21. serialized_autoscaling_policy_def: Optional[bytes] = None,
  22. serialized_request_router_cls: Optional[bytes] = None,
  23. ) -> Dict:
  24. """
  25. Takes a deployment's configuration, and returns the arguments needed
  26. for the controller to deploy it.
  27. """
  28. if deployment_config is None:
  29. deployment_config = {}
  30. if isinstance(deployment_config, dict):
  31. deployment_config = DeploymentConfig.parse_obj(deployment_config)
  32. elif not isinstance(deployment_config, DeploymentConfig):
  33. raise TypeError("config must be a DeploymentConfig or a dictionary.")
  34. deployment_config.version = version
  35. controller_deploy_args = {
  36. "deployment_name": name,
  37. "deployment_config_proto_bytes": deployment_config.to_proto_bytes(),
  38. "replica_config_proto_bytes": replica_config.to_proto_bytes(),
  39. "route_prefix": route_prefix,
  40. "deployer_job_id": ray.get_runtime_context().get_job_id(),
  41. "ingress": ingress,
  42. "serialized_autoscaling_policy_def": serialized_autoscaling_policy_def,
  43. "serialized_request_router_cls": serialized_request_router_cls,
  44. }
  45. return controller_deploy_args
  46. def deploy_args_to_deployment_info(
  47. deployment_name: str,
  48. deployment_config_proto_bytes: bytes,
  49. replica_config_proto_bytes: bytes,
  50. deployer_job_id: Union[str, bytes],
  51. app_name: Optional[str] = None,
  52. ingress: bool = False,
  53. route_prefix: Optional[str] = None,
  54. **kwargs,
  55. ) -> DeploymentInfo:
  56. """Takes deployment args passed to the controller after building an application and
  57. constructs a DeploymentInfo object.
  58. """
  59. deployment_config = DeploymentConfig.from_proto_bytes(deployment_config_proto_bytes)
  60. version = deployment_config.version
  61. replica_config = ReplicaConfig.from_proto_bytes(
  62. replica_config_proto_bytes, deployment_config.needs_pickle()
  63. )
  64. # Java API passes in JobID as bytes
  65. if isinstance(deployer_job_id, bytes):
  66. deployer_job_id = ray.JobID.from_int(
  67. int.from_bytes(deployer_job_id, "little")
  68. ).hex()
  69. return DeploymentInfo(
  70. actor_name=DeploymentID(
  71. name=deployment_name, app_name=app_name
  72. ).to_replica_actor_class_name(),
  73. version=version,
  74. deployment_config=deployment_config,
  75. replica_config=replica_config,
  76. deployer_job_id=deployer_job_id,
  77. start_time_ms=int(time.time() * 1000),
  78. route_prefix=route_prefix,
  79. ingress=ingress,
  80. )
  81. def get_app_code_version(app_config: ServeApplicationSchema) -> str:
  82. """Returns the code version of an application.
  83. Args:
  84. app_config: The application config.
  85. Returns: a hash of the import path and (application level) runtime env representing
  86. the code version of the application.
  87. """
  88. request_router_configs = [
  89. deployment.request_router_config
  90. for deployment in app_config.deployments
  91. if isinstance(deployment.request_router_config, dict)
  92. ]
  93. deployment_autoscaling_policies = [
  94. deployment_config.autoscaling_config.get("policy", None)
  95. for deployment_config in app_config.deployments
  96. if isinstance(deployment_config.autoscaling_config, dict)
  97. ]
  98. encoded = json.dumps(
  99. {
  100. "import_path": app_config.import_path,
  101. "runtime_env": app_config.runtime_env,
  102. "args": app_config.args,
  103. # NOTE: trigger a change in the code version when
  104. # application level autoscaling policy is changed or
  105. # any one of the deployment level autoscaling policy is changed
  106. "autoscaling_policy": app_config.autoscaling_policy,
  107. "deployment_autoscaling_policies": deployment_autoscaling_policies,
  108. "request_router_configs": request_router_configs,
  109. },
  110. sort_keys=True,
  111. ).encode("utf-8")
  112. return hashlib.sha256(encoded).hexdigest()