Skip to content

Commit 41fcf55

Browse files
r1violletjbachorik
andauthored
Thread filter optim (#238)
* Thread filter optim - Reserve padded slots for thread context to avoid false sharing. - Introduce a register thread, unregister thread operation to retrieve slots IDs - Manage a free list of SlotIDs - Store the slot of a given thread within the profiledThread object * Exterminate the last remnants of false sharing The free list was causing false sharing. * Adjust ThreadEnd hook If the TLS cleanup fires before the JVMTI hook, we want to ensure that we don't crash while retrieving the ProfiledThread * Thread filter bench - Start the profiler to ensure we have valid thread objects - Add asserts around missing thread object Many thanks for the reviews from Zhengyu. --------- Co-authored-by: Jaroslav Bachorik <j.bachorik@gmail.com>
1 parent fab8fdf commit 41fcf55

File tree

17 files changed

+1340
-361
lines changed

17 files changed

+1340
-361
lines changed

ddprof-lib/src/main/cpp/flightRecorder.cpp

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ char *Recording::_jvm_flags = NULL;
319319
char *Recording::_java_command = NULL;
320320

321321
Recording::Recording(int fd, Arguments &args)
322-
: _fd(fd), _thread_set(), _method_map() {
322+
: _fd(fd), _method_map() {
323323

324324
args.save(_args);
325325
_chunk_start = lseek(_fd, 0, SEEK_END);
@@ -331,6 +331,8 @@ Recording::Recording(int fd, Arguments &args)
331331
_bytes_written = 0;
332332

333333
_tid = OS::threadId();
334+
_active_index.store(0, std::memory_order_relaxed);
335+
334336
VM::jvmti()->GetAvailableProcessors(&_available_processors);
335337

336338
writeHeader(_buf);
@@ -1060,11 +1062,18 @@ void Recording::writeExecutionModes(Buffer *buf) {
10601062
}
10611063

10621064
void Recording::writeThreads(Buffer *buf) {
1063-
addThread(_tid);
1064-
std::vector<int> threads;
1065-
threads.reserve(_thread_set.size());
1066-
_thread_set.collect(threads);
1067-
_thread_set.clear();
1065+
int old_index = _active_index.fetch_xor(1, std::memory_order_acq_rel);
1066+
// After flip: new samples go into the new active set
1067+
// We flush from old_index (the previous active set)
1068+
1069+
std::unordered_set<int> threads;
1070+
threads.insert(_tid);
1071+
1072+
for (int i = 0; i < CONCURRENCY_LEVEL; ++i) {
1073+
// Collect thread IDs from the fixed-size table into the main set
1074+
_thread_ids[i][old_index].collect(threads);
1075+
_thread_ids[i][old_index].clear();
1076+
}
10681077

10691078
Profiler *profiler = Profiler::instance();
10701079
ThreadInfo *t_info = &profiler->_thread_info;
@@ -1073,15 +1082,15 @@ void Recording::writeThreads(Buffer *buf) {
10731082

10741083
buf->putVar64(T_THREAD);
10751084
buf->putVar64(threads.size());
1076-
for (int i = 0; i < threads.size(); i++) {
1085+
for (auto tid : threads) {
10771086
const char *thread_name;
10781087
jlong thread_id;
1079-
std::pair<std::shared_ptr<std::string>, u64> info = t_info->get(threads[i]);
1088+
std::pair<std::shared_ptr<std::string>, u64> info = t_info->get(tid);
10801089
if (info.first) {
10811090
thread_name = info.first->c_str();
10821091
thread_id = info.second;
10831092
} else {
1084-
snprintf(name_buf, sizeof(name_buf), "[tid=%d]", threads[i]);
1093+
snprintf(name_buf, sizeof(name_buf), "[tid=%d]", tid);
10851094
thread_name = name_buf;
10861095
thread_id = 0;
10871096
}
@@ -1091,9 +1100,9 @@ void Recording::writeThreads(Buffer *buf) {
10911100
(thread_id == 0 ? length + 1 : 2 * length) -
10921101
3 * 10; // 3x max varint length
10931102
flushIfNeeded(buf, required);
1094-
buf->putVar64(threads[i]);
1103+
buf->putVar64(tid);
10951104
buf->putUtf8(thread_name, length);
1096-
buf->putVar64(threads[i]);
1105+
buf->putVar64(tid);
10971106
if (thread_id == 0) {
10981107
buf->put8(0);
10991108
} else {
@@ -1443,7 +1452,11 @@ void Recording::recordCpuLoad(Buffer *buf, float proc_user, float proc_system,
14431452
flushIfNeeded(buf);
14441453
}
14451454

1446-
void Recording::addThread(int tid) { _thread_set.add(tid); }
1455+
// assumption is that we hold the lock (with lock_index)
1456+
void Recording::addThread(int lock_index, int tid) {
1457+
int active = _active_index.load(std::memory_order_acquire);
1458+
_thread_ids[lock_index][active].insert(tid);
1459+
}
14471460

14481461
Error FlightRecorder::start(Arguments &args, bool reset) {
14491462
const char *file = args.file();
@@ -1600,7 +1613,7 @@ void FlightRecorder::recordEvent(int lock_index, int tid, u64 call_trace_id,
16001613
break;
16011614
}
16021615
_rec->flushIfNeeded(buf);
1603-
_rec->addThread(tid);
1616+
_rec->addThread(lock_index, tid);
16041617
}
16051618
}
16061619

ddprof-lib/src/main/cpp/flightRecorder.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#define _FLIGHTRECORDER_H
99

1010
#include <map>
11+
#include <unordered_set>
1112

1213
#include <limits.h>
1314
#include <string.h>
@@ -24,6 +25,7 @@
2425
#include "mutex.h"
2526
#include "objectSampler.h"
2627
#include "threadFilter.h"
28+
#include "threadIdTable.h"
2729
#include "vmEntry.h"
2830

2931
const u64 MAX_JLONG = 0x7fffffffffffffffULL;
@@ -117,9 +119,13 @@ class Recording {
117119
static char *_java_command;
118120

119121
RecordingBuffer _buf[CONCURRENCY_LEVEL];
122+
// we have several tables to avoid lock contention
123+
// we have a second dimension to allow a switch in the active table
124+
ThreadIdTable _thread_ids[CONCURRENCY_LEVEL][2];
125+
std::atomic<int> _active_index{0}; // 0 or 1 globally
126+
120127
int _fd;
121128
off_t _chunk_start;
122-
ThreadFilter _thread_set;
123129
MethodMap _method_map;
124130

125131
Arguments _args;
@@ -248,7 +254,8 @@ class Recording {
248254
LockEvent *event);
249255
void recordCpuLoad(Buffer *buf, float proc_user, float proc_system,
250256
float machine_total);
251-
void addThread(int tid);
257+
258+
void addThread(int lock_index, int tid);
252259
};
253260

254261
class Lookup {

ddprof-lib/src/main/cpp/javaApi.cpp

Lines changed: 89 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#include <assert.h>
1818

19+
#include "arch_dd.h"
1920
#include "context.h"
2021
#include "counters.h"
2122
#include "engine.h"
@@ -124,19 +125,103 @@ Java_com_datadoghq_profiler_JavaProfiler_getSamples(JNIEnv *env,
124125
return (jlong)Profiler::instance()->total_samples();
125126
}
126127

128+
// some duplication between add and remove, though we want to avoid having an extra branch in the hot path
129+
extern "C" DLLEXPORT void JNICALL
130+
Java_com_datadoghq_profiler_JavaProfiler_filterThreadAdd0(JNIEnv *env,
131+
jobject unused) {
132+
ProfiledThread *current = ProfiledThread::current();
133+
if (unlikely(current == nullptr)) {
134+
assert(false);
135+
return;
136+
}
137+
int tid = current->tid();
138+
if (unlikely(tid < 0)) {
139+
return;
140+
}
141+
ThreadFilter *thread_filter = Profiler::instance()->threadFilter();
142+
if (unlikely(!thread_filter->enabled())) {
143+
return;
144+
}
145+
146+
int slot_id = current->filterSlotId();
147+
if (unlikely(slot_id == -1)) {
148+
// Thread doesn't have a slot ID yet (e.g., main thread), so register it
149+
// Happens when we are not enabled before thread start
150+
slot_id = thread_filter->registerThread();
151+
current->setFilterSlotId(slot_id);
152+
}
153+
154+
if (unlikely(slot_id == -1)) {
155+
return; // Failed to register thread
156+
}
157+
thread_filter->add(tid, slot_id);
158+
}
159+
160+
extern "C" DLLEXPORT void JNICALL
161+
Java_com_datadoghq_profiler_JavaProfiler_filterThreadRemove0(JNIEnv *env,
162+
jobject unused) {
163+
ProfiledThread *current = ProfiledThread::current();
164+
if (unlikely(current == nullptr)) {
165+
assert(false);
166+
return;
167+
}
168+
int tid = current->tid();
169+
if (unlikely(tid < 0)) {
170+
return;
171+
}
172+
ThreadFilter *thread_filter = Profiler::instance()->threadFilter();
173+
if (unlikely(!thread_filter->enabled())) {
174+
return;
175+
}
176+
177+
int slot_id = current->filterSlotId();
178+
if (unlikely(slot_id == -1)) {
179+
// Thread doesn't have a slot ID yet - nothing to remove
180+
return;
181+
}
182+
thread_filter->remove(slot_id);
183+
}
184+
185+
// Backward compatibility for existing code
127186
extern "C" DLLEXPORT void JNICALL
128187
Java_com_datadoghq_profiler_JavaProfiler_filterThread0(JNIEnv *env,
129188
jobject unused,
130189
jboolean enable) {
131-
int tid = ProfiledThread::currentTid();
132-
if (tid < 0) {
190+
ProfiledThread *current = ProfiledThread::current();
191+
if (unlikely(current == nullptr)) {
192+
assert(false);
193+
return;
194+
}
195+
int tid = current->tid();
196+
if (unlikely(tid < 0)) {
133197
return;
134198
}
135199
ThreadFilter *thread_filter = Profiler::instance()->threadFilter();
200+
if (unlikely(!thread_filter->enabled())) {
201+
return;
202+
}
203+
204+
int slot_id = current->filterSlotId();
205+
if (unlikely(slot_id == -1)) {
206+
if (enable) {
207+
// Thread doesn't have a slot ID yet, so register it
208+
assert(thread_filter->enabled() && "ThreadFilter should be enabled when trying to register thread");
209+
slot_id = thread_filter->registerThread();
210+
current->setFilterSlotId(slot_id);
211+
} else {
212+
// Thread doesn't have a slot ID yet - nothing to remove
213+
return;
214+
}
215+
}
216+
217+
if (unlikely(slot_id == -1)) {
218+
return; // Failed to register thread
219+
}
220+
136221
if (enable) {
137-
thread_filter->add(tid);
222+
thread_filter->add(tid, slot_id);
138223
} else {
139-
thread_filter->remove(tid);
224+
thread_filter->remove(slot_id);
140225
}
141226
}
142227

@@ -408,27 +493,6 @@ Java_com_datadoghq_profiler_JVMAccess_healthCheck0(JNIEnv *env,
408493
return true;
409494
}
410495

411-
extern "C" DLLEXPORT jlong JNICALL
412-
Java_com_datadoghq_profiler_ActiveBitmap_bitmapAddressFor0(JNIEnv *env,
413-
jclass unused,
414-
jint tid) {
415-
u64* bitmap = Profiler::instance()->threadFilter()->bitmapAddressFor((int)tid);
416-
return (jlong)bitmap;
417-
}
418-
419-
extern "C" DLLEXPORT jboolean JNICALL
420-
Java_com_datadoghq_profiler_ActiveBitmap_isActive0(JNIEnv *env,
421-
jclass unused,
422-
jint tid) {
423-
return Profiler::instance()->threadFilter()->accept((int)tid) ? JNI_TRUE : JNI_FALSE;
424-
}
425-
426-
extern "C" DLLEXPORT jlong JNICALL
427-
Java_com_datadoghq_profiler_ActiveBitmap_getActiveCountAddr0(JNIEnv *env,
428-
jclass unused) {
429-
return (jlong)Profiler::instance()->threadFilter()->addressOfSize();
430-
}
431-
432496
// Static variable to track the current published context
433497
static otel_process_ctx_result* current_published_context = nullptr;
434498

ddprof-lib/src/main/cpp/profiler.cpp

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,12 @@ void Profiler::addRuntimeStub(const void *address, int length,
104104

105105
void Profiler::onThreadStart(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) {
106106
ProfiledThread::initCurrentThread();
107-
108-
int tid = ProfiledThread::currentTid();
107+
ProfiledThread *current = ProfiledThread::current();
108+
int tid = current->tid();
109109
if (_thread_filter.enabled()) {
110-
_thread_filter.remove(tid);
110+
int slot_id = _thread_filter.registerThread();
111+
current->setFilterSlotId(slot_id);
112+
_thread_filter.remove(slot_id); // Remove from filtering initially
111113
}
112114
updateThreadName(jvmti, jni, thread, true);
113115

@@ -116,16 +118,33 @@ void Profiler::onThreadStart(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) {
116118
}
117119

118120
void Profiler::onThreadEnd(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) {
119-
int tid = ProfiledThread::currentTid();
120-
if (_thread_filter.enabled()) {
121-
_thread_filter.remove(tid);
121+
ProfiledThread *current = ProfiledThread::current();
122+
int tid = -1;
123+
124+
if (current != nullptr) {
125+
// ProfiledThread is alive - do full cleanup and use efficient tid access
126+
int slot_id = current->filterSlotId();
127+
tid = current->tid();
128+
129+
if (_thread_filter.enabled()) {
130+
_thread_filter.unregisterThread(slot_id);
131+
current->setFilterSlotId(-1);
132+
}
133+
134+
ProfiledThread::release();
135+
} else {
136+
// ProfiledThread already cleaned up - try to get tid from JVMTI as fallback
137+
tid = VMThread::nativeThreadId(jni, thread);
138+
if (tid < 0) {
139+
// No ProfiledThread AND can't get tid from JVMTI - nothing we can do
140+
return;
141+
}
122142
}
123-
updateThreadName(jvmti, jni, thread, true);
124-
143+
144+
// These can run if we have a valid tid
145+
updateThreadName(jvmti, jni, thread, false); // false = not self
125146
_cpu_engine->unregisterThread(tid);
126-
// unregister here because JNI callers generally don't know about thread exits
127147
_wall_engine->unregisterThread(tid);
128-
ProfiledThread::release();
129148
}
130149

131150
int Profiler::registerThread(int tid) {
@@ -1152,6 +1171,16 @@ Error Profiler::start(Arguments &args, bool reset) {
11521171
}
11531172

11541173
_thread_filter.init(args._filter);
1174+
1175+
// Minor optim: Register the current thread (start thread won't be called)
1176+
if (_thread_filter.enabled()) {
1177+
ProfiledThread *current = ProfiledThread::current();
1178+
if (current != nullptr) {
1179+
int slot_id = _thread_filter.registerThread();
1180+
current->setFilterSlotId(slot_id);
1181+
_thread_filter.remove(slot_id); // Remove from filtering initially (matches onThreadStart behavior)
1182+
}
1183+
}
11551184

11561185
_cpu_engine = selectCpuEngine(args);
11571186
_wall_engine = selectWallEngine(args);

ddprof-lib/src/main/cpp/thread.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,12 @@ class ProfiledThread : public ThreadLocalData {
4848
u32 _wall_epoch;
4949
u64 _call_trace_id;
5050
u32 _recording_epoch;
51+
int _filter_slot_id; // Slot ID for thread filtering
5152
UnwindFailures _unwind_failures;
5253

5354
ProfiledThread(int buffer_pos, int tid)
5455
: ThreadLocalData(), _pc(0), _span_id(0), _crash_depth(0), _buffer_pos(buffer_pos), _tid(tid), _cpu_epoch(0),
55-
_wall_epoch(0), _call_trace_id(0), _recording_epoch(0) {};
56+
_wall_epoch(0), _call_trace_id(0), _recording_epoch(0), _filter_slot_id(-1) {};
5657

5758
void releaseFromBuffer();
5859

@@ -125,6 +126,9 @@ class ProfiledThread : public ThreadLocalData {
125126
}
126127

127128
static void signalHandler(int signo, siginfo_t *siginfo, void *ucontext);
129+
130+
int filterSlotId() { return _filter_slot_id; }
131+
void setFilterSlotId(int slotId) { _filter_slot_id = slotId; }
128132
};
129133

130134
#endif // _THREAD_H

0 commit comments

Comments
 (0)