Skip to content

Commit 5578f83

Browse files
author
Erwin Lejeune
authored
Merge pull request #10 from cmrobotics/add-topic-health
Topic Health Checks
2 parents 0026a71 + c2212d1 commit 5578f83

File tree

1 file changed

+110
-2
lines changed

1 file changed

+110
-2
lines changed

include/cmr_tests_utils/basic_subscriber_node_test.hpp

Lines changed: 110 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,55 @@
77
namespace cmr_tests_utils
88
{
99

10+
typedef struct TopicInfo
11+
{
12+
std::string topic_name;
13+
double frequency;
14+
bool is_alive;
15+
} TopicInfo;
16+
1017
template<class MessageT>
1118
class BasicSubscriberNodeTest: public rclcpp::Node {
1219

1320
public:
1421

1522
BasicSubscriberNodeTest(std::string node_name, std::string topic_name,
16-
rclcpp::QoS qos = rclcpp::SystemDefaultsQoS()): rclcpp::Node(node_name)
23+
rclcpp::QoS qos = rclcpp::SystemDefaultsQoS(),
24+
double liveness_timeout_sec = 2.0): rclcpp::Node(node_name)
1725
{
1826
topic_sub_ = this->create_subscription<MessageT> (
1927
topic_name, qos,
2028
std::bind (&BasicSubscriberNodeTest::topic_callback, this, std::placeholders::_1));
29+
liveness_timeout_sec_ = liveness_timeout_sec;
30+
this->topic_name = topic_name;
31+
32+
topic_info_ = std::make_shared<TopicInfo>();
33+
topic_info_->topic_name = topic_name;
34+
35+
timestamp_check_thread_ = std::thread(&BasicSubscriberNodeTest::update_liveness_, this);
36+
}
37+
38+
~BasicSubscriberNodeTest()
39+
{
40+
is_program_running_.store(false);
41+
if (timestamp_check_thread_.joinable()) timestamp_check_thread_.join();
42+
}
43+
44+
std::shared_ptr<TopicInfo> get_topic_info() const
45+
{
46+
std::lock_guard<std::mutex> lock(topic_info_mutex_);
47+
return topic_info_;
48+
}
49+
50+
bool get_liveness() const
51+
{
52+
return is_alive_.load();
53+
}
54+
55+
double get_frequency() const
56+
{
57+
std::lock_guard<std::mutex> lock(frequency_mutex_);
58+
return frequency_;
2159
}
2260

2361
bool has_data_been_received() const
@@ -38,17 +76,87 @@ class BasicSubscriberNodeTest: public rclcpp::Node {
3876
return *received_msg_;
3977
}
4078

79+
std::string topic_name;
80+
4181
private:
4282

4383
void topic_callback(const std::shared_ptr<MessageT> msg)
44-
{
84+
{
85+
is_alive_.store(true);
86+
87+
{
88+
std::lock_guard<std::mutex> lock(topic_info_mutex_);
89+
topic_info_->is_alive = true;
90+
}
91+
92+
if (!last_message_timestamp_)
93+
{
94+
RCLCPP_INFO (get_logger(), "Discovered [%s] topic...", topic_name.c_str());
95+
last_message_timestamp_ = std::make_shared<std::chrono::time_point<std::chrono::system_clock>>();
96+
*last_message_timestamp_ = std::chrono::system_clock::now();
97+
} else {
98+
auto now = std::chrono::system_clock::now();
99+
std::chrono::duration<double> elapsed_seconds = now - *last_message_timestamp_;
100+
double elapsec_seconds_double = elapsed_seconds.count();
101+
{
102+
std::lock_guard<std::mutex> lock(frequency_mutex_);
103+
frequency_ = 1 / elapsec_seconds_double;
104+
}
105+
106+
{
107+
std::lock_guard<std::mutex> lock(topic_info_mutex_);
108+
topic_info_->frequency = 1 / elapsec_seconds_double;
109+
}
110+
*last_message_timestamp_ = now;
111+
}
112+
45113
std::lock_guard<std::mutex> lock(msg_mutex_);
46114
received_msg_ = msg;
47115
}
48116

117+
void update_liveness_()
118+
{
119+
std::chrono::duration<double> elapsed_seconds;
120+
is_program_running_.store(true);
121+
while (is_program_running_.load())
122+
{
123+
{
124+
std::lock_guard<std::mutex> lock(timestamp_mutex_);
125+
if (!last_message_timestamp_) continue;
126+
127+
auto now = std::chrono::system_clock::now();
128+
elapsed_seconds = now - *last_message_timestamp_;
129+
}
130+
131+
if (elapsed_seconds.count() > liveness_timeout_sec_)
132+
{
133+
is_alive_.store(false);
134+
{
135+
std::lock_guard<std::mutex> lock(topic_info_mutex_);
136+
topic_info_->is_alive = false;
137+
topic_info_->frequency = 0.0;
138+
}
139+
}
140+
std::this_thread::sleep_for(std::chrono::seconds(2));
141+
}
142+
}
143+
49144
typename rclcpp::Subscription<MessageT>::SharedPtr topic_sub_;
145+
50146
std::shared_ptr<MessageT> received_msg_;
147+
std::atomic<bool> is_alive_;
148+
std::shared_ptr<std::chrono::time_point<std::chrono::system_clock>> last_message_timestamp_;
149+
double frequency_;
150+
double liveness_timeout_sec_;
151+
std::shared_ptr<TopicInfo> topic_info_;
152+
153+
std::thread timestamp_check_thread_;
154+
155+
std::atomic<bool> is_program_running_;
51156
mutable std::mutex msg_mutex_;
157+
mutable std::mutex frequency_mutex_;
158+
mutable std::mutex topic_info_mutex_;
159+
mutable std::mutex timestamp_mutex_;
52160
};
53161

54162
}

0 commit comments

Comments
 (0)