Skip to content

Commit ff27bb1

Browse files
Revert "fix: use condition variables instead of busy waits in worker threads"
This reverts commit 4406889. Signed-off-by: Compute-Runtime-Validation <compute-runtime-validation@intel.com>
1 parent 24055f5 commit ff27bb1

23 files changed

+172
-313
lines changed

shared/source/command_stream/command_stream_receiver.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -673,6 +673,13 @@ void CommandStreamReceiver::downloadAllocation(GraphicsAllocation &gfxAllocation
673673
}
674674
}
675675

676+
void CommandStreamReceiver::startControllingDirectSubmissions() {
677+
auto controller = this->executionEnvironment.directSubmissionController.get();
678+
if (controller) {
679+
controller->startControlling();
680+
}
681+
}
682+
676683
bool CommandStreamReceiver::enqueueWaitForPagingFence(uint64_t pagingFenceValue) {
677684
auto controller = this->executionEnvironment.directSubmissionController.get();
678685
if (this->isAnyDirectSubmissionEnabled() && controller) {

shared/source/command_stream/command_stream_receiver.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,8 @@ class CommandStreamReceiver : NEO::NonCopyableAndNonMovableClass {
352352

353353
uint32_t getRootDeviceIndex() const { return rootDeviceIndex; }
354354

355+
MOCKABLE_VIRTUAL void startControllingDirectSubmissions();
356+
355357
MOCKABLE_VIRTUAL bool isAnyDirectSubmissionEnabled() const {
356358
return this->isDirectSubmissionEnabled() || isBlitterDirectSubmissionEnabled();
357359
}

shared/source/command_stream/command_stream_receiver_hw_base.inl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1304,7 +1304,7 @@ SubmissionStatus CommandStreamReceiverHw<GfxFamily>::flushSmallTask(LinearStream
13041304
this->latestSentTaskCount = taskCount + 1;
13051305
auto submissionStatus = flushHandler(batchBuffer, getResidencyAllocations());
13061306
if (submissionStatus == SubmissionStatus::success) {
1307-
++taskCount;
1307+
taskCount++;
13081308
}
13091309
return submissionStatus;
13101310
}
@@ -1473,6 +1473,7 @@ inline bool CommandStreamReceiverHw<GfxFamily>::initDirectSubmission() {
14731473
if (directSubmissionController) {
14741474
directSubmissionController->registerDirectSubmission(this);
14751475
}
1476+
this->startControllingDirectSubmissions();
14761477
if (this->isUpdateTagFromWaitEnabled()) {
14771478
this->overrideDispatchPolicy(DispatchMode::immediateDispatch);
14781479
}
@@ -1485,7 +1486,6 @@ inline bool CommandStreamReceiverHw<GfxFamily>::initDirectSubmission() {
14851486
}
14861487
}
14871488
}
1488-
14891489
return ret;
14901490
}
14911491

shared/source/direct_submission/direct_submission_controller.cpp

Lines changed: 40 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -66,40 +66,50 @@ void DirectSubmissionController::startThread() {
6666
}
6767

6868
void DirectSubmissionController::stopThread() {
69-
{
70-
std::lock_guard<std::mutex> lock(condVarMutex);
71-
keepControlling.store(false);
72-
condVar.notify_one();
73-
}
69+
runControlling.store(false);
70+
keepControlling.store(false);
7471
if (directSubmissionControllingThread) {
7572
directSubmissionControllingThread->join();
7673
directSubmissionControllingThread.reset();
7774
}
7875
}
7976

77+
void DirectSubmissionController::startControlling() {
78+
this->runControlling.store(true);
79+
}
80+
8081
void *DirectSubmissionController::controlDirectSubmissionsState(void *self) {
8182
auto controller = reinterpret_cast<DirectSubmissionController *>(self);
8283

84+
while (!controller->runControlling.load()) {
85+
if (!controller->keepControlling.load()) {
86+
return nullptr;
87+
}
88+
std::unique_lock<std::mutex> lock(controller->condVarMutex);
89+
controller->handlePagingFenceRequests(lock, false);
90+
91+
auto isControllerNotified = controller->sleep(lock);
92+
if (isControllerNotified) {
93+
controller->handlePagingFenceRequests(lock, false);
94+
}
95+
}
96+
8397
controller->timeSinceLastCheck = controller->getCpuTimestamp();
8498
controller->lastHangCheckTime = std::chrono::high_resolution_clock::now();
85-
86-
while (controller->keepControlling.load()) {
99+
while (true) {
100+
if (!controller->keepControlling.load()) {
101+
return nullptr;
102+
}
87103
std::unique_lock<std::mutex> lock(controller->condVarMutex);
88-
controller->wait(lock);
89-
controller->handlePagingFenceRequests(lock);
90-
controller->sleep(lock);
91-
controller->handlePagingFenceRequests(lock);
104+
controller->handlePagingFenceRequests(lock, true);
105+
106+
auto isControllerNotified = controller->sleep(lock);
107+
if (isControllerNotified) {
108+
controller->handlePagingFenceRequests(lock, true);
109+
}
92110
lock.unlock();
93111
controller->checkNewSubmissions();
94112
}
95-
96-
return nullptr;
97-
}
98-
99-
void DirectSubmissionController::notifyNewSubmission(const CommandStreamReceiver *csr) {
100-
++activeSubmissionsCount;
101-
directSubmissions[const_cast<CommandStreamReceiver *>(csr)].isActive = true;
102-
condVar.notify_one();
103113
}
104114

105115
void DirectSubmissionController::checkNewSubmissions() {
@@ -111,11 +121,9 @@ void DirectSubmissionController::checkNewSubmissions() {
111121
std::lock_guard<std::mutex> lock(this->directSubmissionsMutex);
112122
bool shouldRecalculateTimeout = false;
113123
std::optional<TaskCountType> bcsTaskCount{};
114-
for (auto &[csr, state] : directSubmissions) {
115-
if (!state.isActive) {
116-
continue;
117-
}
118-
124+
for (auto &directSubmission : this->directSubmissions) {
125+
auto csr = directSubmission.first;
126+
auto &state = directSubmission.second;
119127
auto isBcs = EngineHelpers::isBcs(csr->getOsContext().getEngineType());
120128
if (timeoutMode == TimeoutElapsedMode::bcsOnly && !isBcs) {
121129
continue;
@@ -135,10 +143,8 @@ void DirectSubmissionController::checkNewSubmissions() {
135143
auto lock = csr->obtainUniqueOwnership();
136144
if (!isCsrIdleDetectionEnabled || (isDirectSubmissionIdle(csr, lock) && isCopyEngineIdle)) {
137145
csr->stopDirectSubmission(false, false);
138-
state.isActive = false;
139146
state.isStopped = true;
140147
shouldRecalculateTimeout = true;
141-
--activeSubmissionsCount;
142148
}
143149
state.taskCount = csr->peekTaskCount();
144150
} else {
@@ -276,13 +282,13 @@ void DirectSubmissionController::recalculateTimeout() {
276282
}
277283

278284
void DirectSubmissionController::enqueueWaitForPagingFence(CommandStreamReceiver *csr, uint64_t pagingFenceValue) {
279-
std::lock_guard lock(condVarMutex);
285+
std::lock_guard lock(this->condVarMutex);
280286
pagingFenceRequests.push({csr, pagingFenceValue});
281287
condVar.notify_one();
282288
}
283289

284290
void DirectSubmissionController::drainPagingFenceQueue() {
285-
std::lock_guard lock(condVarMutex);
291+
std::lock_guard lock(this->condVarMutex);
286292

287293
while (!pagingFenceRequests.empty()) {
288294
auto request = pagingFenceRequests.front();
@@ -291,13 +297,18 @@ void DirectSubmissionController::drainPagingFenceQueue() {
291297
}
292298
}
293299

294-
void DirectSubmissionController::handlePagingFenceRequests(std::unique_lock<std::mutex> &lock) {
300+
void DirectSubmissionController::handlePagingFenceRequests(std::unique_lock<std::mutex> &lock, bool checkForNewSubmissions) {
295301
UNRECOVERABLE_IF(!lock.owns_lock())
296302
while (!pagingFenceRequests.empty()) {
297303
auto request = pagingFenceRequests.front();
298304
pagingFenceRequests.pop();
299305
lock.unlock();
306+
300307
request.csr->unblockPagingFenceSemaphore(request.pagingFenceValue);
308+
if (checkForNewSubmissions) {
309+
checkNewSubmissions();
310+
}
311+
301312
lock.lock();
302313
}
303314
}

shared/source/direct_submission/direct_submission_controller.h

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -61,26 +61,24 @@ class DirectSubmissionController {
6161
void unregisterDirectSubmission(CommandStreamReceiver *csr);
6262

6363
void startThread();
64+
void startControlling();
6465
void stopThread();
6566

6667
static bool isSupported();
6768

6869
void enqueueWaitForPagingFence(CommandStreamReceiver *csr, uint64_t pagingFenceValue);
6970
void drainPagingFenceQueue();
70-
void notifyNewSubmission(const CommandStreamReceiver *csr);
7171

7272
protected:
7373
struct DirectSubmissionState {
7474
DirectSubmissionState(DirectSubmissionState &&other) noexcept {
75-
isActive = other.isActive.load();
7675
isStopped = other.isStopped.load();
7776
taskCount = other.taskCount.load();
7877
}
7978
DirectSubmissionState &operator=(const DirectSubmissionState &other) {
8079
if (this == &other) {
8180
return *this;
8281
}
83-
this->isActive = other.isActive.load();
8482
this->isStopped = other.isStopped.load();
8583
this->taskCount = other.taskCount.load();
8684
return *this;
@@ -92,20 +90,15 @@ class DirectSubmissionController {
9290
DirectSubmissionState(const DirectSubmissionState &other) = delete;
9391
DirectSubmissionState &operator=(DirectSubmissionState &&other) = delete;
9492

95-
std::atomic_bool isActive{false};
9693
std::atomic_bool isStopped{true};
9794
std::atomic<TaskCountType> taskCount{0};
9895
};
9996

10097
static void *controlDirectSubmissionsState(void *self);
101-
MOCKABLE_VIRTUAL void checkNewSubmissions();
98+
void checkNewSubmissions();
10299
bool isDirectSubmissionIdle(CommandStreamReceiver *csr, std::unique_lock<std::recursive_mutex> &csrLock);
103100
bool isCopyEngineOnDeviceIdle(uint32_t rootDeviceIndex, std::optional<TaskCountType> &bcsTaskCount);
104101
MOCKABLE_VIRTUAL bool sleep(std::unique_lock<std::mutex> &lock);
105-
bool waitPredicate() { return !keepControlling || !pagingFenceRequests.empty() || activeSubmissionsCount; }
106-
MOCKABLE_VIRTUAL void wait(std::unique_lock<std::mutex> &lock) {
107-
condVar.wait(lock, [&]() { return waitPredicate(); });
108-
}
109102
MOCKABLE_VIRTUAL SteadyClock::time_point getCpuTimestamp();
110103
MOCKABLE_VIRTUAL void overrideDirectSubmissionTimeouts(const ProductHelper &productHelper);
111104

@@ -114,7 +107,7 @@ class DirectSubmissionController {
114107
void updateLastSubmittedThrottle(QueueThrottle throttle);
115108
size_t getTimeoutParamsMapKey(QueueThrottle throttle, bool acLineStatus);
116109

117-
MOCKABLE_VIRTUAL void handlePagingFenceRequests(std::unique_lock<std::mutex> &lock);
110+
void handlePagingFenceRequests(std::unique_lock<std::mutex> &lock, bool checkForNewSubmissions);
118111
MOCKABLE_VIRTUAL TimeoutElapsedMode timeoutElapsed();
119112
std::chrono::microseconds getSleepValue() const { return std::chrono::microseconds(this->timeout / this->bcsTimeoutDivisor); }
120113

@@ -125,7 +118,7 @@ class DirectSubmissionController {
125118

126119
std::unique_ptr<Thread> directSubmissionControllingThread;
127120
std::atomic_bool keepControlling = true;
128-
std::atomic_uint activeSubmissionsCount = 0;
121+
std::atomic_bool runControlling = false;
129122

130123
SteadyClock::time_point timeSinceLastCheck{};
131124
SteadyClock::time_point lastTerminateCpuTimestamp{};
@@ -143,4 +136,4 @@ class DirectSubmissionController {
143136

144137
std::queue<WaitForPagingFenceRequest> pagingFenceRequests;
145138
};
146-
} // namespace NEO
139+
} // namespace NEO

shared/source/direct_submission/direct_submission_hw.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
#include "shared/source/command_stream/command_stream_receiver.h"
1111

1212
namespace NEO {
13-
DirectSubmissionInputParams::DirectSubmissionInputParams(const CommandStreamReceiver &commandStreamReceiver) : csr(commandStreamReceiver), osContext(commandStreamReceiver.getOsContext()), rootDeviceEnvironment(commandStreamReceiver.peekRootDeviceEnvironment()), rootDeviceIndex(commandStreamReceiver.getRootDeviceIndex()) {
13+
DirectSubmissionInputParams::DirectSubmissionInputParams(const CommandStreamReceiver &commandStreamReceiver) : osContext(commandStreamReceiver.getOsContext()), rootDeviceEnvironment(commandStreamReceiver.peekRootDeviceEnvironment()), rootDeviceIndex(commandStreamReceiver.getRootDeviceIndex()) {
1414
memoryManager = commandStreamReceiver.getMemoryManager();
1515
globalFenceAllocation = commandStreamReceiver.getGlobalFenceAllocation();
1616
workPartitionAllocation = commandStreamReceiver.getWorkPartitionAllocation();

shared/source/direct_submission/direct_submission_hw.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ class MemoryOperationsHandler;
5050

5151
struct DirectSubmissionInputParams : NonCopyableClass {
5252
DirectSubmissionInputParams(const CommandStreamReceiver &commandStreamReceiver);
53-
const CommandStreamReceiver &csr;
5453
OsContext &osContext;
5554
const RootDeviceEnvironment &rootDeviceEnvironment;
5655
MemoryManager *memoryManager = nullptr;
@@ -223,7 +222,6 @@ class DirectSubmissionHw {
223222
uint64_t gpuVaForPagingFenceSemaphore = 0u;
224223
uint64_t relaxedOrderingQueueSizeLimitValueVa = 0;
225224

226-
const CommandStreamReceiver &csr;
227225
OsContext &osContext;
228226
const uint32_t rootDeviceIndex;
229227
MemoryManager *memoryManager = nullptr;

shared/source/direct_submission/direct_submission_hw.inl

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
#include "shared/source/command_stream/submissions_aggregator.h"
1111
#include "shared/source/debug_settings/debug_settings_manager.h"
1212
#include "shared/source/device/device.h"
13-
#include "shared/source/direct_submission/direct_submission_controller.h"
1413
#include "shared/source/direct_submission/direct_submission_hw.h"
1514
#include "shared/source/direct_submission/relaxed_ordering_helper.h"
1615
#include "shared/source/execution_environment/execution_environment.h"
@@ -41,7 +40,7 @@ namespace NEO {
4140

4241
template <typename GfxFamily, typename Dispatcher>
4342
DirectSubmissionHw<GfxFamily, Dispatcher>::DirectSubmissionHw(const DirectSubmissionInputParams &inputParams)
44-
: ringBuffers(RingBufferUse::initialRingBufferCount), csr(inputParams.csr), osContext(inputParams.osContext), rootDeviceIndex(inputParams.rootDeviceIndex), rootDeviceEnvironment(inputParams.rootDeviceEnvironment) {
43+
: ringBuffers(RingBufferUse::initialRingBufferCount), osContext(inputParams.osContext), rootDeviceIndex(inputParams.rootDeviceIndex), rootDeviceEnvironment(inputParams.rootDeviceEnvironment) {
4544
memoryManager = inputParams.memoryManager;
4645
globalFenceAllocation = inputParams.globalFenceAllocation;
4746
hwInfo = inputParams.rootDeviceEnvironment.getHardwareInfo();
@@ -589,9 +588,6 @@ template <typename GfxFamily, typename Dispatcher>
589588
bool DirectSubmissionHw<GfxFamily, Dispatcher>::submitCommandBufferToGpu(bool needStart, uint64_t gpuAddress, size_t size, bool needWait, const ResidencyContainer *allocationsForResidency) {
590589
if (needStart) {
591590
this->ringStart = this->submit(gpuAddress, size, allocationsForResidency);
592-
if (auto controller = rootDeviceEnvironment.executionEnvironment.directSubmissionController.get()) {
593-
controller->notifyNewSubmission(&csr);
594-
}
595591
return this->ringStart;
596592
} else {
597593
if (needWait) {

shared/source/memory_manager/unified_memory_manager.cpp

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ bool SVMAllocsManager::SvmAllocationCache::insert(size_t size, void *ptr, SvmAll
6868
return false;
6969
}
7070

71-
std::unique_lock<std::mutex> lock(this->mtx);
71+
std::lock_guard<std::mutex> lock(this->mtx);
7272
if (svmData->device ? svmData->device->shouldLimitAllocationsReuse() : memoryManager->shouldLimitAllocationsReuse()) {
7373
return false;
7474
}
@@ -100,11 +100,8 @@ bool SVMAllocsManager::SvmAllocationCache::insert(size_t size, void *ptr, SvmAll
100100
}
101101
svmData->isSavedForReuse = true;
102102
allocations.emplace(std::lower_bound(allocations.begin(), allocations.end(), size), size, ptr, svmData, waitForCompletion);
103-
empty = false;
104-
if (auto usmReuseCleaner = this->memoryManager->peekExecutionEnvironment().unifiedMemoryReuseCleaner.get()) {
105-
lock.unlock();
106-
usmReuseCleaner->startThread();
107-
usmReuseCleaner->notifySvmAllocationsCacheUpdate();
103+
if (memoryManager->peekExecutionEnvironment().unifiedMemoryReuseCleaner) {
104+
memoryManager->peekExecutionEnvironment().unifiedMemoryReuseCleaner->startThread();
108105
}
109106
}
110107
if (enablePerformanceLogging) {
@@ -114,7 +111,6 @@ bool SVMAllocsManager::SvmAllocationCache::insert(size_t size, void *ptr, SvmAll
114111
.operationType = CacheOperationType::insert,
115112
.isSuccess = isSuccess});
116113
}
117-
118114
return isSuccess;
119115
}
120116

@@ -186,7 +182,6 @@ void *SVMAllocsManager::SvmAllocationCache::get(size_t size, const UnifiedMemory
186182
svmAllocsManager->reinsertToAllocsForIndirectAccess(*allocationIter->svmData);
187183
}
188184
allocations.erase(allocationIter);
189-
empty = allocations.empty();
190185
return allocationPtr;
191186
}
192187
}
@@ -221,7 +216,6 @@ void SVMAllocsManager::SvmAllocationCache::trim() {
221216
svmAllocsManager->freeSVMAllocImpl(cachedAllocationInfo.allocation, FreePolicyType::blocking, cachedAllocationInfo.svmData);
222217
}
223218
this->allocations.clear();
224-
empty = true;
225219
}
226220

227221
void SVMAllocsManager::SvmAllocationCache::cleanup() {
@@ -306,7 +300,6 @@ void SVMAllocsManager::SvmAllocationCache::trimOldAllocs(std::chrono::high_resol
306300
if (trimAll) {
307301
std::erase_if(allocations, SvmCacheAllocationInfo::isMarkedForDelete);
308302
}
309-
empty = allocations.empty();
310303
}
311304

312305
SvmAllocationData *SVMAllocsManager::MapBasedAllocationTracker::get(const void *ptr) {

shared/source/memory_manager/unified_memory_manager.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,6 @@ class SVMAllocsManager {
221221
static bool allocUtilizationAllows(size_t requestedSize, size_t reuseCandidateSize);
222222
static bool alignmentAllows(void *ptr, size_t alignment);
223223
bool isInUse(SvmCacheAllocationInfo &cacheAllocInfo);
224-
bool isEmpty() { return empty; };
225224
void *get(size_t size, const UnifiedMemoryProperties &unifiedMemoryProperties);
226225
void trim();
227226
void trimOldAllocs(std::chrono::high_resolution_clock::time_point trimTimePoint, bool trimAll);
@@ -235,7 +234,6 @@ class SVMAllocsManager {
235234
MemoryManager *memoryManager = nullptr;
236235
bool enablePerformanceLogging = false;
237236
bool requireUpdatingAllocsForIndirectAccess = false;
238-
std::atomic_bool empty = true;
239237
};
240238

241239
enum class FreePolicyType : uint32_t {

0 commit comments

Comments
 (0)