Skip to content

Commit c493295

Browse files
authored
[SYCL] Implement queue::ext_oneapi_empty() API to get queue status (#7583)
1 parent 2359d94 commit c493295

16 files changed

+256
-11
lines changed

sycl/doc/extensions/proposed/sycl_ext_oneapi_queue_empty.asciidoc renamed to sycl/doc/extensions/supported/sycl_ext_oneapi_queue_empty.asciidoc

+8-5
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,14 @@ SYCL specification refer to that revision.
4242

4343
== Status
4444

45-
This is a proposed extension specification, intended to gather community
46-
feedback. Interfaces defined in this specification may not be implemented yet
47-
or may be in a preliminary state. The specification itself may also change in
48-
incompatible ways before it is finalized. *Shipping software products should
49-
not rely on APIs defined in this specification.*
45+
This extension is supported by {dpcpp} on all backends except OpenCL.
46+
[NOTE]
47+
====
48+
Currently support for OpenCL backend is limited, API introduced by this extension
49+
can be called only for in-order queues which doesn't have `discard_events` property.
50+
Exception is thrown if new API is called on other type of queue. OpenCL currently
51+
doesn't have an API to get queue status.
52+
====
5053

5154

5255
== Overview

sycl/include/sycl/detail/pi.h

+7-2
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,11 @@
5858
// piDeviceGetInfo.
5959
// 11.17 Added new PI_EXT_ONEAPI_QUEUE_PRIORITY_LOW and
6060
// PI_EXT_ONEAPI_QUEUE_PRIORITY_HIGH queue properties.
61+
// 11.18 Add new parameter name PI_EXT_ONEAPI_QUEUE_INFO_EMPTY to
62+
// _pi_queue_info.
6163

6264
#define _PI_H_VERSION_MAJOR 11
63-
#define _PI_H_VERSION_MINOR 16
65+
#define _PI_H_VERSION_MINOR 18
6466

6567
#define _PI_STRING_HELPER(a) #a
6668
#define _PI_CONCAT(a, b) _PI_STRING_HELPER(a.b)
@@ -331,7 +333,10 @@ typedef enum {
331333
PI_QUEUE_INFO_DEVICE_DEFAULT = 0x1095,
332334
PI_QUEUE_INFO_PROPERTIES = 0x1093,
333335
PI_QUEUE_INFO_REFERENCE_COUNT = 0x1092,
334-
PI_QUEUE_INFO_SIZE = 0x1094
336+
PI_QUEUE_INFO_SIZE = 0x1094,
337+
// Return 'true' if all commands previously submitted to the queue have
338+
// completed, otherwise return 'false'.
339+
PI_EXT_ONEAPI_QUEUE_INFO_EMPTY = 0x2096
335340
} _pi_queue_info;
336341

337342
typedef enum {

sycl/include/sycl/feature_test.hpp.in

+1
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ __SYCL_INLINE_VER_NAMESPACE(_V1) {
6767
#define SYCL_EXT_ONEAPI_BACKEND_LEVEL_ZERO 3
6868
#define SYCL_EXT_ONEAPI_USM_DEVICE_READ_ONLY 1
6969
#define SYCL_EXT_ONEAPI_KERNEL_PROPERTIES 1
70+
#define SYCL_EXT_ONEAPI_QUEUE_EMPTY 1
7071
#define SYCL_EXT_ONEAPI_USER_DEFINED_REDUCTIONS 1
7172
#cmakedefine01 SYCL_BUILD_PI_CUDA
7273
#if SYCL_BUILD_PI_CUDA

sycl/include/sycl/queue.hpp

+6
Original file line numberDiff line numberDiff line change
@@ -1272,6 +1272,12 @@ class __SYCL_EXPORT queue {
12721272
/// \return the backend associated with this queue.
12731273
backend get_backend() const noexcept;
12741274

1275+
/// Allows to check status of the queue (completed vs noncompleted).
1276+
///
1277+
/// \return returns true if all enqueued commands in the queue have been
1278+
/// completed, otherwise returns false.
1279+
bool ext_oneapi_empty() const;
1280+
12751281
private:
12761282
pi_native_handle getNative() const;
12771283

sycl/plugins/cuda/pi_cuda.cpp

+21
Original file line numberDiff line numberDiff line change
@@ -2543,6 +2543,27 @@ pi_result cuda_piQueueGetInfo(pi_queue command_queue, pi_queue_info param_name,
25432543
case PI_QUEUE_INFO_PROPERTIES:
25442544
return getInfo(param_value_size, param_value, param_value_size_ret,
25452545
command_queue->properties_);
2546+
case PI_EXT_ONEAPI_QUEUE_INFO_EMPTY: {
2547+
try {
2548+
bool IsReady = command_queue->all_of([](CUstream s) -> bool {
2549+
const CUresult ret = cuStreamQuery(s);
2550+
if (ret == CUDA_SUCCESS)
2551+
return true;
2552+
2553+
if (ret == CUDA_ERROR_NOT_READY)
2554+
return false;
2555+
2556+
PI_CHECK_ERROR(ret);
2557+
return false;
2558+
});
2559+
return getInfo(param_value_size, param_value, param_value_size_ret,
2560+
IsReady);
2561+
} catch (pi_result err) {
2562+
return err;
2563+
} catch (...) {
2564+
return PI_ERROR_OUT_OF_RESOURCES;
2565+
}
2566+
}
25462567
default:
25472568
__SYCL_PI_HANDLE_UNKNOWN_PARAM_NAME(param_name);
25482569
}

sycl/plugins/cuda/pi_cuda.hpp

+22
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,28 @@ struct _pi_queue {
499499
return is_last_command && !has_been_synchronized(stream_token);
500500
}
501501

502+
template <typename T> bool all_of(T &&f) {
503+
{
504+
std::lock_guard compute_guard(compute_stream_mutex_);
505+
unsigned int end =
506+
std::min(static_cast<unsigned int>(compute_streams_.size()),
507+
num_compute_streams_);
508+
if (!std::all_of(compute_streams_.begin(), compute_streams_.begin() + end,
509+
f))
510+
return false;
511+
}
512+
{
513+
std::lock_guard transfer_guard(transfer_stream_mutex_);
514+
unsigned int end =
515+
std::min(static_cast<unsigned int>(transfer_streams_.size()),
516+
num_transfer_streams_);
517+
if (!std::all_of(transfer_streams_.begin(),
518+
transfer_streams_.begin() + end, f))
519+
return false;
520+
}
521+
return true;
522+
}
523+
502524
template <typename T> void for_each_stream(T &&f) {
503525
{
504526
std::lock_guard<std::mutex> compute_guard(compute_stream_mutex_);

sycl/plugins/hip/pi_hip.cpp

+15
Original file line numberDiff line numberDiff line change
@@ -2420,6 +2420,21 @@ pi_result hip_piQueueGetInfo(pi_queue command_queue, pi_queue_info param_name,
24202420
case PI_QUEUE_INFO_PROPERTIES:
24212421
return getInfo(param_value_size, param_value, param_value_size_ret,
24222422
command_queue->properties_);
2423+
case PI_EXT_ONEAPI_QUEUE_INFO_EMPTY: {
2424+
bool IsReady = command_queue->all_of([](hipStream_t s) -> bool {
2425+
const hipError_t ret = hipStreamQuery(s);
2426+
if (ret == hipSuccess)
2427+
return true;
2428+
2429+
if (ret == hipErrorNotReady)
2430+
return false;
2431+
2432+
PI_CHECK_ERROR(ret);
2433+
return false;
2434+
});
2435+
return getInfo(param_value_size, param_value, param_value_size_ret,
2436+
IsReady);
2437+
}
24232438
default:
24242439
__SYCL_PI_HANDLE_UNKNOWN_PARAM_NAME(param_name);
24252440
}

sycl/plugins/hip/pi_hip.hpp

+22
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,28 @@ struct _pi_queue {
478478
return is_last_command && !has_been_synchronized(stream_token);
479479
}
480480

481+
template <typename T> bool all_of(T &&f) {
482+
{
483+
std::lock_guard compute_guard(compute_stream_mutex_);
484+
unsigned int end =
485+
std::min(static_cast<unsigned int>(compute_streams_.size()),
486+
num_compute_streams_);
487+
if (!std::all_of(compute_streams_.begin(), compute_streams_.begin() + end,
488+
f))
489+
return false;
490+
}
491+
{
492+
std::lock_guard transfer_guard(transfer_stream_mutex_);
493+
unsigned int end =
494+
std::min(static_cast<unsigned int>(transfer_streams_.size()),
495+
num_transfer_streams_);
496+
if (!std::all_of(transfer_streams_.begin(),
497+
transfer_streams_.begin() + end, f))
498+
return false;
499+
}
500+
return true;
501+
}
502+
481503
template <typename T> void for_each_stream(T &&f) {
482504
{
483505
std::lock_guard<std::mutex> compute_guard(compute_stream_mutex_);

sycl/plugins/level_zero/pi_level_zero.cpp

+79
Original file line numberDiff line numberDiff line change
@@ -3660,6 +3660,83 @@ pi_result piQueueGetInfo(pi_queue Queue, pi_queue_info ParamName,
36603660
case PI_QUEUE_INFO_DEVICE_DEFAULT:
36613661
die("PI_QUEUE_INFO_DEVICE_DEFAULT in piQueueGetInfo not implemented\n");
36623662
break;
3663+
case PI_EXT_ONEAPI_QUEUE_INFO_EMPTY: {
3664+
// We can exit early if we have in-order queue.
3665+
if (Queue->isInOrderQueue()) {
3666+
if (!Queue->LastCommandEvent)
3667+
return ReturnValue(pi_bool{true});
3668+
3669+
// We can check status of the event only if it isn't discarded otherwise
3670+
// it may be reset (because we are free to reuse such events) and
3671+
// zeEventQueryStatus will hang.
3672+
// TODO: use more robust way to check that ZeEvent is not owned by
3673+
// LastCommandEvent.
3674+
if (!Queue->LastCommandEvent->IsDiscarded) {
3675+
ze_result_t ZeResult = ZE_CALL_NOCHECK(
3676+
zeEventQueryStatus, (Queue->LastCommandEvent->ZeEvent));
3677+
if (ZeResult == ZE_RESULT_NOT_READY) {
3678+
return ReturnValue(pi_bool{false});
3679+
} else if (ZeResult != ZE_RESULT_SUCCESS) {
3680+
return mapError(ZeResult);
3681+
}
3682+
return ReturnValue(pi_bool{true});
3683+
}
3684+
// For immediate command lists we have to check status of the event
3685+
// because immediate command lists are not associated with level zero
3686+
// queue. Conservatively return false in this case because last event is
3687+
// discarded and we can't check its status.
3688+
if (Queue->Device->useImmediateCommandLists())
3689+
return ReturnValue(pi_bool{false});
3690+
}
3691+
3692+
// If we have any open command list which is not empty then return false
3693+
// because it means that there are commands which are not even submitted for
3694+
// execution yet.
3695+
using IsCopy = bool;
3696+
if (Queue->hasOpenCommandList(IsCopy{true}) ||
3697+
Queue->hasOpenCommandList(IsCopy{false}))
3698+
return ReturnValue(pi_bool{false});
3699+
3700+
for (const auto &QueueGroup :
3701+
{Queue->ComputeQueueGroup, Queue->CopyQueueGroup}) {
3702+
if (Queue->Device->useImmediateCommandLists()) {
3703+
// Immediate command lists are not associated with any Level Zero queue,
3704+
// that's why we have to check status of events in each immediate
3705+
// command list. Start checking from the end and exit early if some
3706+
// event is not completed.
3707+
for (const auto &ImmCmdList : QueueGroup.ImmCmdLists) {
3708+
if (ImmCmdList == Queue->CommandListMap.end())
3709+
continue;
3710+
3711+
auto EventList = ImmCmdList->second.EventList;
3712+
for (auto It = EventList.crbegin(); It != EventList.crend(); It++) {
3713+
ze_result_t ZeResult =
3714+
ZE_CALL_NOCHECK(zeEventQueryStatus, ((*It)->ZeEvent));
3715+
if (ZeResult == ZE_RESULT_NOT_READY) {
3716+
return ReturnValue(pi_bool{false});
3717+
} else if (ZeResult != ZE_RESULT_SUCCESS) {
3718+
return mapError(ZeResult);
3719+
}
3720+
}
3721+
}
3722+
} else {
3723+
for (const auto &ZeQueue : QueueGroup.ZeQueues) {
3724+
if (!ZeQueue)
3725+
continue;
3726+
// Provide 0 as the timeout parameter to immediately get the status of
3727+
// the Level Zero queue.
3728+
ze_result_t ZeResult = ZE_CALL_NOCHECK(zeCommandQueueSynchronize,
3729+
(ZeQueue, /* timeout */ 0));
3730+
if (ZeResult == ZE_RESULT_NOT_READY) {
3731+
return ReturnValue(pi_bool{false});
3732+
} else if (ZeResult != ZE_RESULT_SUCCESS) {
3733+
return mapError(ZeResult);
3734+
}
3735+
}
3736+
}
3737+
}
3738+
return ReturnValue(pi_bool{true});
3739+
}
36633740
default:
36643741
zePrint("Unsupported ParamName in piQueueGetInfo: ParamName=%d(0x%x)\n",
36653742
ParamName, ParamName);
@@ -6727,6 +6804,8 @@ pi_result _pi_queue::synchronize() {
67276804
ZE_CALL(zeHostSynchronize, (ZeQueue));
67286805
}
67296806

6807+
LastCommandEvent = nullptr;
6808+
67306809
// With the entire queue synchronized, the active barriers must be done so we
67316810
// can remove them.
67326811
for (pi_event &BarrierEvent : ActiveBarriers)

sycl/plugins/opencl/pi_opencl.cpp

+23-1
Original file line numberDiff line numberDiff line change
@@ -505,6 +505,28 @@ pi_result piQueueCreate(pi_context context, pi_device device,
505505
return cast<pi_result>(ret_err);
506506
}
507507

508+
pi_result piQueueGetInfo(pi_queue queue, pi_queue_info param_name,
509+
size_t param_value_size, void *param_value,
510+
size_t *param_value_size_ret) {
511+
if (queue == nullptr) {
512+
return PI_ERROR_INVALID_QUEUE;
513+
}
514+
515+
switch (param_name) {
516+
case PI_EXT_ONEAPI_QUEUE_INFO_EMPTY:
517+
// OpenCL doesn't provide API to check the status of the queue.
518+
return PI_ERROR_INVALID_VALUE;
519+
default:
520+
cl_int CLErr = clGetCommandQueueInfo(
521+
cast<cl_command_queue>(queue), cast<cl_command_queue_info>(param_name),
522+
param_value_size, param_value, param_value_size_ret);
523+
if (CLErr != CL_SUCCESS) {
524+
return cast<pi_result>(CLErr);
525+
}
526+
}
527+
return PI_SUCCESS;
528+
}
529+
508530
pi_result piextQueueCreateWithNativeHandle(pi_native_handle nativeHandle,
509531
pi_context, pi_device,
510532
bool ownNativeHandle,
@@ -1549,7 +1571,7 @@ pi_result piPluginInit(pi_plugin *PluginInit) {
15491571
_PI_CL(piextContextCreateWithNativeHandle, piextContextCreateWithNativeHandle)
15501572
// Queue
15511573
_PI_CL(piQueueCreate, piQueueCreate)
1552-
_PI_CL(piQueueGetInfo, clGetCommandQueueInfo)
1574+
_PI_CL(piQueueGetInfo, piQueueGetInfo)
15531575
_PI_CL(piQueueFinish, clFinish)
15541576
_PI_CL(piQueueFlush, clFlush)
15551577
_PI_CL(piQueueRetain, clRetainCommandQueue)

sycl/source/detail/queue_impl.cpp

+42
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,48 @@ pi_native_handle queue_impl::getNative() const {
400400
return Handle;
401401
}
402402

403+
bool queue_impl::ext_oneapi_empty() const {
404+
// If we have in-order queue where events are not discarded then just check
405+
// the status of the last event.
406+
if (isInOrder() && !MDiscardEvents) {
407+
std::lock_guard Lock(MLastEventMtx);
408+
return MLastEvent.get_info<info::event::command_execution_status>() ==
409+
info::event_command_status::complete;
410+
}
411+
412+
// Check the status of the backend queue if this is not a host queue.
413+
if (!is_host()) {
414+
pi_bool IsReady = false;
415+
getPlugin().call<PiApiKind::piQueueGetInfo>(
416+
MQueues[0], PI_EXT_ONEAPI_QUEUE_INFO_EMPTY, sizeof(pi_bool), &IsReady,
417+
nullptr);
418+
if (!IsReady)
419+
return false;
420+
}
421+
422+
// We may have events like host tasks which are not submitted to the backend
423+
// queue so we need to get their status separately.
424+
std::lock_guard Lock(MMutex);
425+
for (event Event : MEventsShared)
426+
if (Event.get_info<info::event::command_execution_status>() !=
427+
info::event_command_status::complete)
428+
return false;
429+
430+
for (auto EventImplWeakPtrIt = MEventsWeak.begin();
431+
EventImplWeakPtrIt != MEventsWeak.end(); ++EventImplWeakPtrIt)
432+
if (std::shared_ptr<event_impl> EventImplSharedPtr =
433+
EventImplWeakPtrIt->lock())
434+
if (EventImplSharedPtr->is_host() &&
435+
EventImplSharedPtr
436+
->get_info<info::event::command_execution_status>() !=
437+
info::event_command_status::complete)
438+
return false;
439+
440+
// If we didn't exit early above then it means that all events in the queue
441+
// are completed.
442+
return true;
443+
}
444+
403445
} // namespace detail
404446
} // __SYCL_INLINE_VER_NAMESPACE(_V1)
405447
} // namespace sycl

sycl/source/detail/queue_impl.hpp

+4-2
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,8 @@ class queue_impl {
476476
MStreamsServiceEvents.push_back(Event);
477477
}
478478

479+
bool ext_oneapi_empty() const;
480+
479481
protected:
480482
// template is needed for proper unit testing
481483
template <typename HandlerType = handler>
@@ -580,7 +582,7 @@ class queue_impl {
580582
void addEvent(const event &Event);
581583

582584
/// Protects all the fields that can be changed by class' methods.
583-
std::mutex MMutex;
585+
mutable std::mutex MMutex;
584586

585587
DeviceImplPtr MDevice;
586588
const ContextImplPtr MContext;
@@ -611,7 +613,7 @@ class queue_impl {
611613
// This event is employed for enhanced dependency tracking with in-order queue
612614
// Access to the event should be guarded with MLastEventMtx
613615
event MLastEvent;
614-
std::mutex MLastEventMtx;
616+
mutable std::mutex MLastEventMtx;
615617
// Used for in-order queues in pair with MLastEvent
616618
// Host tasks are explicitly synchronized in RT, pi tasks - implicitly by
617619
// backend. Using type to setup explicit sync between host and pi tasks.

sycl/source/queue.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,8 @@ bool queue::is_in_order() const {
203203

204204
backend queue::get_backend() const noexcept { return getImplBackend(impl); }
205205

206+
bool queue::ext_oneapi_empty() const { return impl->ext_oneapi_empty(); }
207+
206208
pi_native_handle queue::getNative() const { return impl->getNative(); }
207209

208210
buffer<detail::AssertHappened, 1> &queue::getAssertHappenedBuffer() {

0 commit comments

Comments
 (0)