| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970 |
- import uuid
- from typing import Any, Optional
- from ray.experimental.channel import ChannelContext
- from ray.experimental.channel.common import ChannelInterface
- from ray.util.annotations import PublicAPI
- @PublicAPI(stability="alpha")
- class IntraProcessChannel(ChannelInterface):
- """
- IntraProcessChannel is a channel for communication between two tasks in the same
- worker process. It writes data directly to the worker's _SerializationContext
- and reads data from the _SerializationContext to avoid the serialization
- overhead and the need for reading/writing from shared memory. Note that if the
- readers may mutate the data, users should deep copy the data themselves to avoid
- side effects.
- Args:
- num_readers: The number of readers that will read from this channel. Readers
- can be the same method of the same actor.
- """
- def __init__(
- self,
- num_readers,
- _channel_id: Optional[str] = None,
- ):
- # Generate a unique ID for the channel. The writer and reader will use
- # this ID to store and retrieve data from the _SerializationContext.
- self._channel_id = _channel_id
- self._num_readers = num_readers
- if self._channel_id is None:
- self._channel_id = str(uuid.uuid4())
- def ensure_registered_as_writer(self) -> None:
- pass
- def ensure_registered_as_reader(self) -> None:
- pass
- def __reduce__(self):
- return IntraProcessChannel, (
- self._num_readers,
- self._channel_id,
- )
- def __str__(self) -> str:
- return f"IntraProcessChannel(channel_id={self._channel_id})"
- def write(self, value: Any, timeout: Optional[float] = None):
- self.ensure_registered_as_writer()
- # No need to check timeout as the operation is non-blocking.
- # Because both the reader and writer are in the same worker process,
- # we can directly store the data in the context instead of storing
- # it in the channel object. This removes the serialization overhead of `value`.
- ctx = ChannelContext.get_current().serialization_context
- ctx.set_data(self._channel_id, value, self._num_readers)
- def read(self, timeout: Optional[float] = None, deserialize: bool = True) -> Any:
- self.ensure_registered_as_reader()
- assert deserialize, "Data passed from the actor to itself is never serialized"
- # No need to check timeout as the operation is non-blocking.
- ctx = ChannelContext.get_current().serialization_context
- return ctx.get_data(self._channel_id)
- def close(self) -> None:
- ctx = ChannelContext.get_current().serialization_context
- ctx.reset_data(self._channel_id)
|