Skip to content

Commit 28798c5

Browse files
authored
[ML] Add resource monitoring in CBucketGatherer::addEventData (#2848)
Closes #2525
1 parent df34b90 commit 28798c5

File tree

4 files changed

+79
-60
lines changed

4 files changed

+79
-60
lines changed

docs/CHANGELOG.asciidoc

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,8 @@
3232
=== Enhancements
3333

3434
* Update the PyTorch library to version 2.7.1. (See {ml-pull}2863[#2863].)
35-
36-
== {es} version 9.2.0
37-
38-
=== Enhancements
39-
4035
* Report the actual memory usage of the autodetect process. (See {ml-pull}2846[#2846])
36+
* Improve adherence to memory limits for the bucket gatherer. (See {ml-pull}2848[#2848].)
4137

4238
== {es} version 9.1.0
4339

include/model/CBucketGatherer.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
#include <core/CCompressedDictionary.h>
1616
#include <core/CHashing.h>
17-
#include <core/CLogger.h>
1817
#include <core/CMemoryUsage.h>
1918
#include <core/CoreTypes.h>
2019

@@ -28,7 +27,6 @@
2827

2928
#include <any>
3029
#include <cstdint>
31-
#include <functional>
3230
#include <map>
3331
#include <optional>
3432
#include <string>
@@ -172,6 +170,9 @@ class MODEL_EXPORT CBucketGatherer {
172170
//! redundant except to create a signature that will not be mistaken for
173171
//! a general purpose copy constructor.
174172
CBucketGatherer(bool isForPersistence, const CBucketGatherer& other);
173+
static bool isRecordIncomplete(const CEventData& data);
174+
bool hasValidPersonAndAttributeIds(std::size_t pid, std::size_t cid) const;
175+
bool handleExplicitNull(const CEventData& data, core_t::TTime time, TSizeSizePr pidCid);
175176

176177
virtual ~CBucketGatherer() = default;
177178
//@}
@@ -238,7 +239,7 @@ class MODEL_EXPORT CBucketGatherer {
238239
CResourceMonitor& resourceMonitor) = 0;
239240

240241
//! Record the arrival of \p data at \p time.
241-
bool addEventData(CEventData& data);
242+
bool addEventData(const CEventData& data, const CResourceMonitor& resourceMonitor);
242243

243244
//! Roll time forwards to \p time.
244245
void timeNow(core_t::TTime time);

lib/model/CBucketGatherer.cc

Lines changed: 73 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <maths/common/COrderings.h>
2424

2525
#include <model/CDataGatherer.h>
26+
#include <model/CResourceMonitor.h>
2627

2728
#include <boost/tuple/tuple.hpp>
2829

@@ -233,7 +234,36 @@ CBucketGatherer::CBucketGatherer(bool isForPersistence, const CBucketGatherer& o
233234
}
234235
}
235236

236-
bool CBucketGatherer::addEventData(CEventData& data) {
237+
bool CBucketGatherer::isRecordIncomplete(const CEventData& data) {
238+
return !data.personId() || !data.attributeId() || !data.count();
239+
}
240+
bool CBucketGatherer::hasValidPersonAndAttributeIds(std::size_t const pid,
241+
std::size_t const cid) const {
242+
// Has the person/attribute been deleted from the gatherer?
243+
if (!m_DataGatherer.isPersonActive(pid)) {
244+
LOG_DEBUG(<< "Not adding value for deleted person " << pid);
245+
return false;
246+
}
247+
if (m_DataGatherer.isPopulation() && !m_DataGatherer.isAttributeActive(cid)) {
248+
LOG_DEBUG(<< "Not adding value for deleted attribute " << cid);
249+
return false;
250+
}
251+
return true;
252+
}
253+
bool CBucketGatherer::handleExplicitNull(const CEventData& data,
254+
core_t::TTime const time,
255+
CBucketGatherer::TSizeSizePr const pidCid) {
256+
// If record is explicit null just note that a null record has been seen
257+
// for the given (pid, cid) pair.
258+
if (data.isExplicitNull()) {
259+
TSizeSizePrUSet& bucketExplicitNulls = m_PersonAttributeExplicitNulls.get(time);
260+
bucketExplicitNulls.insert(pidCid);
261+
return true;
262+
}
263+
return false;
264+
}
265+
bool CBucketGatherer::addEventData(const CEventData& data,
266+
const CResourceMonitor& resourceMonitor) {
237267
core_t::TTime const time = data.time();
238268

239269
if (time < this->earliestBucketStartTime()) {
@@ -245,70 +275,62 @@ bool CBucketGatherer::addEventData(CEventData& data) {
245275

246276
this->timeNow(time);
247277

248-
if (!data.personId() || !data.attributeId() || !data.count()) {
249-
// The record was incomplete.
278+
if (isRecordIncomplete(data)) {
250279
return false;
251280
}
252281

253282
std::size_t const pid = *data.personId();
254283
std::size_t const cid = *data.attributeId();
255284
std::size_t const count = *data.count();
256-
if ((pid != CDynamicStringIdRegistry::INVALID_ID) &&
257-
(cid != CDynamicStringIdRegistry::INVALID_ID)) {
258-
// Has the person/attribute been deleted from the gatherer?
259-
if (!m_DataGatherer.isPersonActive(pid)) {
260-
LOG_DEBUG(<< "Not adding value for deleted person " << pid);
261-
return false;
262-
}
263-
if (m_DataGatherer.isPopulation() && !m_DataGatherer.isAttributeActive(cid)) {
264-
LOG_DEBUG(<< "Not adding value for deleted attribute " << cid);
265-
return false;
266-
}
267285

268-
TSizeSizePr const pidCid = std::make_pair(pid, cid);
286+
if ((pid == CDynamicStringIdRegistry::INVALID_ID) ||
287+
(cid == CDynamicStringIdRegistry::INVALID_ID)) {
288+
return true;
289+
}
290+
291+
if (hasValidPersonAndAttributeIds(pid, cid) == false) {
292+
return false;
293+
}
269294

270-
// If record is explicit null just note that a null record has been seen
271-
// for the given (pid, cid) pair.
272-
if (data.isExplicitNull()) {
273-
TSizeSizePrUSet& bucketExplicitNulls =
274-
m_PersonAttributeExplicitNulls.get(time);
275-
bucketExplicitNulls.insert(pidCid);
276-
return true;
277-
}
295+
TSizeSizePr const pidCid = std::make_pair(pid, cid);
278296

279-
TSizeSizePrUInt64UMap& bucketCounts = m_PersonAttributeCounts.get(time);
280-
if (count > 0) {
281-
bucketCounts[pidCid] += count;
282-
}
297+
if (handleExplicitNull(data, time, pidCid)) {
298+
return true;
299+
}
283300

284-
const CEventData::TOptionalStrVec& influences = data.influences();
285-
auto& influencerCounts = m_InfluencerCounts.get(time);
286-
if (influences.size() != influencerCounts.size()) {
287-
LOG_ERROR(<< "Unexpected influences: " << influences << " expected "
288-
<< core::CContainerPrinter::print(this->beginInfluencers(),
289-
this->endInfluencers()));
290-
return false;
291-
}
301+
TSizeSizePrUInt64UMap& bucketCounts = m_PersonAttributeCounts.get(time);
302+
if (count > 0) {
303+
bucketCounts[pidCid] += count;
304+
}
292305

293-
TOptionalStrVec canonicalInfluences(influencerCounts.size());
294-
for (std::size_t i = 0; i < influences.size(); ++i) {
295-
const CEventData::TOptionalStr& influence = influences[i];
296-
if (influence) {
297-
const std::string& inf = *influence;
298-
canonicalInfluences[i] = inf;
299-
if (count > 0) {
300-
influencerCounts[i]
301-
.emplace(boost::unordered::piecewise_construct,
302-
boost::make_tuple(pidCid, inf),
303-
boost::make_tuple(static_cast<std::uint64_t>(0)))
304-
.first->second += count;
305-
}
306+
const CEventData::TOptionalStrVec& influences = data.influences();
307+
auto& influencerCounts = m_InfluencerCounts.get(time);
308+
if (influences.size() != influencerCounts.size()) {
309+
LOG_ERROR(<< "Unexpected influences: " << influences << " expected "
310+
<< core::CContainerPrinter::print(this->beginInfluencers(),
311+
this->endInfluencers()));
312+
return false;
313+
}
314+
315+
TOptionalStrVec canonicalInfluences(influencerCounts.size());
316+
auto updateInfluencer = [&](std::size_t i) {
317+
if (const CEventData::TOptionalStr& influence = influences[i]) {
318+
const std::string& inf = *influence;
319+
canonicalInfluences[i] = inf;
320+
if (count > 0 && resourceMonitor.areAllocationsAllowed()) {
321+
influencerCounts[i]
322+
.emplace(boost::unordered::piecewise_construct,
323+
boost::make_tuple(pidCid, inf),
324+
boost::make_tuple(static_cast<std::uint64_t>(0)))
325+
.first->second += count;
306326
}
307327
}
308-
309-
this->addValue(pid, cid, time, data.values(), count, data.stringValue(),
310-
canonicalInfluences);
328+
};
329+
for (std::size_t i = 0; i < influences.size(); ++i) {
330+
updateInfluencer(i);
311331
}
332+
333+
this->addValue(pid, cid, time, data.values(), count, data.stringValue(), canonicalInfluences);
312334
return true;
313335
}
314336

lib/model/CDataGatherer.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ bool CDataGatherer::addArrival(const TStrCPtrVec& fieldValues,
304304
return false;
305305
}
306306

307-
return m_BucketGatherer->addEventData(data);
307+
return m_BucketGatherer->addEventData(data, resourceMonitor);
308308
}
309309

310310
void CDataGatherer::sampleNow(core_t::TTime sampleBucketStart) {

0 commit comments

Comments
 (0)