Skip to content
2 changes: 1 addition & 1 deletion paddle/fluid/pybind/tensor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -903,7 +903,7 @@ void BindTensor(pybind11::module &m) { // NOLINT
const auto &device_id =
paddle::platform::GetXPUCurrentDeviceId();
auto stream = paddle::platform::get_current_stream(device_id);
xpu_wait(stream);
xpu_wait(stream->raw_stream());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

未检查返回值,下个PR修复

int type_idx = static_cast<int>(self.type());
size_t data_size = self.numel() *
framework::SizeOfType(
Expand Down
225 changes: 209 additions & 16 deletions paddle/fluid/pybind/xpu_streams_py.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,27 @@ namespace py = pybind11;
namespace paddle {
namespace platform {
#ifdef PADDLE_WITH_XPU
XPUStream get_current_stream(int device_id) {
if (device_id == -1) {
device_id = phi::backends::xpu::GetXPUCurrentDeviceId();
}
phi::XPUStreamHandle *get_current_stream(int device_id) {
auto place = phi::XPUPlace(device_id);
auto *dev_ctx = static_cast<phi::XPUContext *>(
phi::DeviceContextPool::Instance().Get(place));
dev_ctx->Wait();
return dev_ctx->stream();
return dev_ctx->get_current_stream_handle();
}

phi::XPUStreamHandle *set_current_stream(int idx) {
int device_id = phi::backends::xpu::GetXPUCurrentDeviceId();
auto original_stream = get_current_stream(device_id);
auto place = phi::XPUPlace(device_id);
auto *dev_ctx = static_cast<phi::XPUContext *>(
phi::DeviceContextPool::Instance().Get(place));
dev_ctx->SetCurrentStream(idx);
return original_stream;
}

#endif
} // namespace platform

namespace pybind {
void BindXpuStream(py::module *m_ptr) {
auto &m = *m_ptr;
Expand All @@ -69,7 +77,7 @@ void BindXpuStream(py::module *m_ptr) {
#endif
});
m.def(
"_get_current_stream",
"_xpu_get_current_stream",
[](int device_id) {
#ifdef PADDLE_WITH_XPU
if (device_id == -1) {
Expand All @@ -79,7 +87,19 @@ void BindXpuStream(py::module *m_ptr) {
return platform::get_current_stream(device_id);
#else
PADDLE_THROW(
common::errors::Unavailable("Paddle is not compiled with CUDA. "
common::errors::Unavailable("Paddle is not compiled with XPU. "
"Cannot visit device synchronize."));
#endif
},
py::return_value_policy::reference);
m.def(
"_xpu_set_current_stream",
[](int stream_id) {
#ifdef PADDLE_WITH_XPU
return platform::set_current_stream(stream_id);
#else
PADDLE_THROW(
common::errors::Unavailable("Paddle is not compiled with XPU. "
"Cannot visit device synchronize."));
#endif
},
Expand All @@ -100,12 +120,167 @@ void BindXpuStream(py::module *m_ptr) {
#endif
});

py::class_<phi::XPUStreamHandle>(m, "XPUStream", R"DOC(
The handle of the XPU stream.

Parameters:
device(paddle.XPUPlace()|int|None, optional): The device which wanted to allocate the stream.
If device is None or negative integer, device will be the current device.
If device is positive integer, it must less than the device count. Default: None.

Examples:
.. code-block:: python

>>> # doctest: +REQUIRES(env:XPU)
>>> import paddle
>>> s1 = paddle.device.xpu.Stream(paddle.XPUPlace(0))
>>> s2 = paddle.device.xpu.Stream(0)
>>> s3 = paddle.device.xpu.Stream()

)DOC")
#ifdef PADDLE_WITH_XPU
.def_property_readonly(
"xpu_stream",
[](phi::XPUStreamHandle &self) {
return reinterpret_cast<std::uintptr_t>(self.raw_stream());
})
.def("wait_stream",
[](phi::XPUStreamHandle &self, phi::XPUStreamHandle &other) {
auto *dev_ctx = phi::get_xpu_context();
dev_ctx->StreamWaitStreamInPool(self.id(), other.id());
})
.def("wait_event",
[](phi::XPUStreamHandle &self, phi::XPUEventHandle &other) {
self.wait_event(other.get_event());
})
.def("query",
[](phi::XPUStreamHandle &self) {
PADDLE_THROW(common::errors::Unavailable(
"Query function for XPUStream is not supported now"));
})
.def("record_event",
[](phi::XPUStreamHandle &self, phi::XPUEventHandle *event) {
if (event == nullptr) {
event = new phi::XPUEventHandle();
}
self.record_event(event->get_event());
return event;
})
.def(
"synchronize",
[](phi::XPUStreamHandle &self) { self.synchronize(); },
R"DOC(
Waits for stream tasks to complete.

Examples:
.. code-block:: python

>>> # doctest: +REQUIRES(env:XPU)
>>> import paddle
>>> s = paddle.device.xpu.Stream(paddle.XPUPlace(0), 1)
>>> s.synchronize()

)DOC")
.def_property_readonly(
"place",
[](phi::XPUStreamHandle &self) {
return phi::XPUPlace(platform::GetXPUCurrentDeviceId());
})
.def_property_readonly(
"idx", [](phi::XPUStreamHandle &self) { return self.id(); })
#endif

.def("__init__",
[](phi::XPUStreamHandle &self) {
#ifdef PADDLE_WITH_XPU
new (&self) phi::XPUStreamHandle();
self.Init();
#else
PADDLE_THROW(common::errors::Unavailable(
"Class XPUStream can only be initialized on the XPU "
"platform."));
#endif
})
.def(
"__init__",
[](phi::XPUStreamHandle &self, phi::XPUPlace *place) {
#ifdef PADDLE_WITH_XPU
if (place == nullptr) {
int curr_device_id = platform::GetXPUCurrentDeviceId();
auto place_tmp = phi::XPUPlace(curr_device_id);
new (&self) phi::XPUStreamHandle(place_tmp);
} else {
new (&self) phi::XPUStreamHandle(*place);
}
#else
PADDLE_THROW(common::errors::Unavailable(
"Class XPUStream can only be initialized on the XPU "
"platform."));
#endif
},
py::arg("device") = nullptr)
.def(
"__init__",
[](phi::XPUStreamHandle &self, int device) {
#ifdef PADDLE_WITH_XPU
if (device < 0) {
device = platform::GetXPUCurrentDeviceId();
}
auto place_tmp = phi::XPUPlace(device);
new (&self) phi::XPUStreamHandle(place_tmp);
#else
PADDLE_THROW(common::errors::Unavailable(
"Class XPUStream can only be initialized on the XPU "
"platform."));
#endif
},
py::arg("device") = -1);
py::class_<phi::XPUEventHandle>(m, "XPUEvent", R"DOC(
The handle of the XPU event.

Examples:
.. code-block:: python

>>> # doctest: +REQUIRES(env:XPU)
>>> import paddle
>>> event = paddle.device.xpu.Event()

)DOC")
#ifdef PADDLE_WITH_XPU
.def(
"record",
[](phi::XPUEventHandle &self, phi::XPUStreamHandle *stream) {
if (stream == nullptr) {
auto *dev_ctx = phi::get_xpu_context();
auto stream_handle = dev_ctx->get_current_stream_handle();
self.record(stream_handle->raw_stream());
} else {
self.record(stream->raw_stream());
}
},
py::arg("stream") = nullptr)
.def("query", [](phi::XPUEventHandle &self) { return self.query(); })
.def("elapsed_time",
[](phi::XPUEventHandle &self) {
PADDLE_THROW(common::errors::Unavailable(
"XPUEvent elapsed_time is not supported now"));
})
.def("synchronize", [](phi::XPUEventHandle &self) { self.synchronize(); })
#endif
.def("__init__", [](phi::XPUEventHandle &self) {
#ifdef PADDLE_WITH_XPU
new (&self) phi::XPUEventHandle();
#else
PADDLE_THROW(common::errors::Unavailable(
"Class XPUEvent can only be initialized on the XPU platform."));
#endif
});
#ifdef PADDLE_WITH_XPU
py::class_<XPUStream>(m, "XPUStream", R"DOC(
The handle of the CUDA stream.
py::class_<phi::XPUCUDAStream>(m, "XPUCUDAStream", R"DOC(
The handle of the XPU stream.

Parameters:
device(paddle.CUDAPlace()|int|None, optional): The device which wanted to allocate the stream.
device(paddle.XPUPlace()|int|None, optional): The device which wanted to allocate the stream.
If device is None or negative integer, device will be the current device.
If device is positive integer, it must less than the device count. Default: None.
priority(int|None, optional): The priority of stream. The priority can be 1(high) or 2(normal).
Expand All @@ -114,16 +289,16 @@ void BindXpuStream(py::module *m_ptr) {
Examples:
.. code-block:: python

>>> # doctest: +REQUIRES(env:GPU)
>>> # doctest: +REQUIRES(env:XPU)
>>> import paddle
>>> s1 = paddle.device.cuda.Stream(paddle.CUDAPlace(0), 1)
>>> s2 = paddle.device.cuda.Stream(0, 1)
>>> s3 = paddle.device.cuda.Stream()
>>> s1 = paddle.device.xpu.Stream(paddle.XPUPlace(0), 1)
>>> s2 = paddle.device.xpu.Stream(0, 1)
>>> s3 = paddle.device.xpu.Stream()

)DOC")
.def(
"synchronize",
[](XPUStream &self) { xpu_wait(self); },
[](phi::XPUCUDAStream &self) { self.Synchronize(); },
R"DOC(
Waits for stream tasks to complete.

Expand All @@ -135,7 +310,25 @@ void BindXpuStream(py::module *m_ptr) {
>>> s = paddle.device.cuda.Stream(paddle.CUDAPlace(0), 1)
>>> s.synchronize()

)DOC");
)DOC")
.def("__init__",
[](phi::XPUCUDAStream &self, phi::XPUPlace *place, int priority) {
if (priority != 1 && priority != 2) {
PADDLE_THROW(common::errors::InvalidArgument(
"Priority should be 1(high) or 2(normal) "));
}
auto stream_flag =
phi::XPUCUDAStream::StreamFlag::kStreamNonBlocking;
if (place == nullptr) {
int curr_device_id = platform::GetXPUCurrentDeviceId();
auto place_tmp = phi::XPUPlace(curr_device_id);
new (&self)
phi::XPUCUDAStream(place_tmp, priority - 2, stream_flag);
} else {
new (&self)
phi::XPUCUDAStream(*place, priority - 2, stream_flag);
}
});
#endif
}
} // namespace pybind
Expand Down
7 changes: 6 additions & 1 deletion paddle/fluid/pybind/xpu_streams_py.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
#include "pybind11/stl.h"

#ifdef PADDLE_WITH_XPU
#include "paddle/phi/backends/xpu/xpu_context.h"
#include "paddle/phi/core/xpu_cuda_stream.h"
#include "xpu/runtime.h"
#include "xpu/runtime_ex.h"

#else
namespace phi {
class XPUCUDAStream {};
class XPUStreamHandle {};
class XPUEventHandle {};
} // namespace phi
#endif

Expand All @@ -32,7 +36,8 @@ namespace py = pybind11;
namespace paddle {
namespace platform {
#ifdef PADDLE_WITH_XPU
XPUStream get_current_stream(int device_id = -1);
phi::XPUStreamHandle* get_current_stream(int device_id = -1);
phi::XPUStreamHandle* set_current_stream(int idx);
#endif
} // namespace platform
namespace pybind {
Expand Down
9 changes: 9 additions & 0 deletions paddle/phi/api/include/tensor.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ using gpuStream_t = cudaStream_t;
using gpuStream_t = hipStream_t;
#endif

#ifdef PADDLE_WITH_XPU
#include "xpu/runtime.h"
#include "xpu/runtime_ex.h"
#endif

#ifdef PADDLE_WITH_CUSTOM_DEVICE
#include "paddle/phi/backends/stream.h"
#endif
Expand Down Expand Up @@ -434,6 +439,10 @@ class PADDLE_API Tensor final {
* @return gpuStream_t
*/
gpuStream_t stream() const;
#elif defined(PADDLE_WITH_XPU)

void record_stream(XPUStream stream) const;

#elif defined(PADDLE_WITH_CUSTOM_DEVICE)
/**
* @brief Get the stream where the tensor is currently located
Expand Down
10 changes: 10 additions & 0 deletions paddle/phi/api/lib/tensor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ limitations under the License. */
#include "paddle/phi/core/tensor_meta.h"
#include "paddle/phi/core/tensor_utils.h"

#include "paddle/phi/core/memory/malloc.h"

namespace paddle {

using DeviceContextPool = experimental::DeviceContextPool;
Expand Down Expand Up @@ -397,6 +399,14 @@ Tensor Tensor::slice(int64_t begin_idx, int64_t end_idx) const {

const std::shared_ptr<phi::TensorBase> &Tensor::impl() const { return impl_; }

#ifdef PADDLE_WITH_XPU

void Tensor::record_stream(XPUStream stream) const {
paddle::memory::RecordStream(
std::dynamic_pointer_cast<phi::DenseTensor>(impl_)->Holder(), stream);
}

#endif
void Tensor::set_impl(const std::shared_ptr<phi::TensorBase> &impl) {
impl_ = impl;
}
Expand Down
Loading