| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217 |
- #if !defined(TORCH_STABLE_ONLY) && !defined(TORCH_TARGET_VERSION)
- #pragma once
- #include <c10/core/Stream.h>
- #include <c10/core/impl/GPUTrace.h>
- #include <c10/xpu/XPUFunctions.h>
- namespace c10::xpu {
- /*
- * Note [Stream Management]
- *
- * An XPUStream is an abstraction of an actual SYCL queue in which SYCL kernel
- * can execute. Currently, there are several pools per device to manage SYCL
- * queue, and a device's pool is lazily created.
- *
- * There are two pools per device. The first pool contains "normal priority"
- * queues. The second pool is the "high priority" queues. There are 32 queues in
- * per pool per device, and when a queue is requested one of these queues is
- * returned round-robin. That is, the first queue requested is at index 0, the
- * second at index 1... to index 31, then index 0 again.
- *
- * This means that if 33 queues are requested, the first and last queues
- * requested are actually the same queue (under the covers) and kernels enqueued
- * on them cannot run concurrently.
- *
- * It is safe to enqueue a kernel on the same queue from two different
- * threads as the SYCL specification described.
- */
- static constexpr int max_compile_time_stream_priorities = 3;
- /*
- * This serves as a wrapper around c10::Stream and acts as a representation for
- * a SYCL queue, which allows asynchronous execution of XPU tasks.
- */
- class C10_XPU_API XPUStream {
- public:
- enum Unchecked { UNCHECKED };
- /// Construct a XPUStream from a Stream. This construction is checked, and
- /// will raise an error if the Stream is not, in fact, a XPU stream.
- explicit XPUStream(Stream stream) : stream_(stream) {
- TORCH_CHECK(stream_.device_type() == DeviceType::XPU);
- }
- /// Construct a XPUStream from a Stream with no error checking.
- explicit XPUStream(Unchecked /*unused*/, Stream stream) : stream_(stream) {}
- bool operator==(const XPUStream& other) const noexcept {
- return unwrap() == other.unwrap();
- }
- bool operator!=(const XPUStream& other) const noexcept {
- return unwrap() != other.unwrap();
- }
- /// Implicit conversion to sycl::queue&.
- operator sycl::queue&() const {
- return queue();
- }
- /// Implicit conversion to sycl::queue*.
- operator sycl::queue*() const {
- return &queue();
- }
- /// Implicit conversion to Stream (a.k.a., forget that the stream is a
- /// XPU stream).
- operator Stream() const {
- return unwrap();
- }
- /// Get the XPU device type that this stream is associated with.
- DeviceType device_type() const {
- return DeviceType::XPU;
- }
- /// Get the XPU device index that this stream is associated with.
- DeviceIndex device_index() const {
- return stream_.device_index();
- }
- /// Get the full Device that this stream is associated with. The Device is
- /// guaranteed to be a XPU device.
- Device device() const {
- return Device(DeviceType::XPU, device_index());
- }
- /// Return the stream ID corresponding to this particular stream. StreamId is
- /// a int64_t representation generated by its type and index.
- StreamId id() const {
- return stream_.id();
- }
- /// Return true if all enqueued tasks in this stream have been completed,
- /// otherwise return false.
- bool query() const {
- return queue().ext_oneapi_empty();
- }
- /// Performs a blocking wait for the completion of all enqueued tasks in this
- /// stream.
- void synchronize() const {
- queue().wait_and_throw();
- const c10::impl::PyInterpreter* interp = c10::impl::GPUTrace::get_trace();
- if (C10_UNLIKELY(interp)) {
- (*interp)->trace_gpu_stream_synchronization(
- c10::kXPU, reinterpret_cast<uintptr_t>(&queue()));
- }
- }
- /// Return the priority that this stream is associated with. Lower numbers
- /// represent higher priority.
- int priority() const;
- /// Explicit conversion to sycl::queue&.
- sycl::queue& queue() const;
- /// Explicit conversion to Stream.
- Stream unwrap() const {
- return stream_;
- }
- /// Reversibly pack a XPUStream into a struct representation. The XPUStream
- /// can be unpacked using unpack3().
- struct c10::StreamData3 pack3() const {
- return stream_.pack3();
- }
- /// Unpack a XPUStream from the 3 fields generated by pack3().
- static XPUStream unpack3(
- StreamId stream_id,
- DeviceIndex device_index,
- DeviceType device_type) {
- return XPUStream(Stream::unpack3(stream_id, device_index, device_type));
- }
- /// Return the range of priority **supported by PyTorch**.
- static std::tuple<int, int> priority_range() {
- // See Note [XPU Stream priorities]
- return std::make_tuple(1, -max_compile_time_stream_priorities + 2);
- }
- private:
- Stream stream_;
- };
- /**
- * Get a stream from the pool in a round-robin fashion.
- *
- * You can request a stream from the highest priority pool by setting
- * isHighPriority to true for a specific device.
- */
- C10_XPU_API XPUStream
- getStreamFromPool(const bool isHighPriority = false, DeviceIndex device = -1);
- /**
- * Get a stream from the pool in a round-robin fashion.
- *
- * You can request a stream by setting a priority value for a specific device.
- * The priority number lower, the priority higher.
- */
- C10_XPU_API XPUStream
- getStreamFromPool(const int priority, DeviceIndex device = -1);
- /**
- * Get an XPUStream from an external SYCL queue.
- *
- * This function allows interoperability with other libraries by enabling
- * the use of an external SYCL queue that was not created by PyTorch. This
- * can be useful for data exchange or other operations where integration
- * with non-PyTorch queues is required.
- *
- * NOTE: It is the user's responsibility to ensure that the referenced SYCL
- * queue remains alive while the corresponding XPUStream, or any c10::Stream
- * derived from it, is in use. The different SYCL queue pointers will result in
- * distinct XPUStream instances, even if the SYCL queues they dereference are
- * equivalent.
- */
- C10_XPU_API XPUStream
- getStreamFromExternal(sycl::queue* ext_queue, DeviceIndex device_index);
- /**
- * Get the current XPU stream, for the passed XPU device, or for the current
- * device if no device index is passed.
- */
- C10_XPU_API XPUStream getCurrentXPUStream(DeviceIndex device = -1);
- /**
- * Set the current stream on the device of the passed in stream to be the passed
- * in stream.
- */
- C10_XPU_API void setCurrentXPUStream(XPUStream stream);
- C10_XPU_API std::ostream& operator<<(std::ostream& stream, const XPUStream& s);
- /**
- * Block all reserved SYCL queues in the stream pools on the device, and wait
- * for their synchronizations.
- */
- C10_XPU_API void syncStreamsOnDevice(DeviceIndex device = -1);
- } // namespace c10::xpu
- namespace std {
- template <>
- struct hash<c10::xpu::XPUStream> {
- size_t operator()(c10::xpu::XPUStream s) const noexcept {
- return std::hash<c10::Stream>{}(s.unwrap());
- }
- };
- } // namespace std
- #else
- #error "This file should not be included when either TORCH_STABLE_ONLY or TORCH_TARGET_VERSION is defined."
- #endif // !defined(TORCH_STABLE_ONLY) && !defined(TORCH_TARGET_VERSION)
|