Multi Thread of ROS 2 with Callback group - Part 2: Mutually exclusive

In the previous article, we discussed what is Reentrant group and the thread scheduling behind the scene. Today, we will look into the other callback group: Mutually Exclusive callback group. I assume that you have a preliminary understanding of the callback groups.
Key takeaways
1. Callbacks in a same MutuallyExclusive
group are NOT exactly same with SingleThreadedExecutor
.
In the official documentation (opens in a new tab), it is said that:
Mutually Exclusive Callback Group prevents its callbacks from being executed in parallel - essentially making it as if the callbacks in the group were executed by a SingleThreadedExecutor.
Then, some might expect that the callbacks in a same MutuallyExclusive
group
are working exactly the same as SingleThreadedExecutor
processes the
callbacks. For a delayed processing of the callbacks, this is not true.
In my previous article, callbacks in a
SingleThreadedExecutor
are processed in round robin (a.k.a., not FIFO) if the
callbacks take more time than the speed of event happening. But
MutuallyExclusive
group was not found to show this round-robin behavior.
2. Of course, callbacks in different Mutually Exclusive groups are not thread-safe.
If you are expecting from rclcpp
that this will keep thread-safety between
different MutuallyExclusive
groups, then do not. You should use mutex, and
define critical sections.
Testbed: publisher and subscriber nodes
The overall scenario is very similar to the previous article. You can find all the codes here (opens in a new tab)
Publisher node
This
node (opens in a new tab)
publishes short_topic
at 10Hz and long_topic
at 1Hz. The example run
output is:
jbs@jbs:~/ros2_ws$ ros2 run simple_thread_tester publisher_node
[INFO] [1684487967.263839506] [publisher_node]: Short message (seq=0)
[INFO] [1684487967.363765263] [publisher_node]: Short message (seq=1)
...
[INFO] [1684487968.063749259] [publisher_node]: Short message (seq=8)
[INFO] [1684487968.163744316] [publisher_node]: Short message (seq=9)
[INFO] [1684487968.163889616] [publisher_node]: Long message with more characters (seq=0)
[INFO] [1684487968.263754273] [publisher_node]: Short message (seq=10)
Subscriber node
This node (opens in a new tab) has three callbacks:
-
(Setters) two subscription callbacks for
short_topic
andlong_topic
messages. This callbacks sets the incoming messages into member variablesprocessed_short_string_
andprocessed_long_string_
ofMultiThreadMutuallyExclusiveSubscriber
node.processed_*_string_
is created by adding starts to the both ends of received messages. For exepriment, we assume that coping a character takes 0.1s. Thus, processing callbacks requires more than their publish rates. (0.1s and 1s.)multi_thread_mutually_exclusive_subscriber_node.cc... void MultiThreadMutuallyExclusiveSubscriber::ProcessString( const std::string &raw_string, std::string &output) { output = "** "; for (char c : raw_string) { output.push_back(c); std::this_thread::sleep_for(std::chrono::milliseconds(100)); } output += " **"; }; ... void MultiThreadMutuallyExclusiveSubscriber::LongTopicCallback( const std_msgs::msg::String::SharedPtr msg) { ... ProcessString(msg->data, processed_long_string_); RCLCPP_INFO(get_logger(), "Setting processed: %s", processed_long_string_.c_str()); ... }
-
(Getter) a timer callback which prints the member variables.
void MultiThreadMutuallyExclusiveSubscriber::TimerCallback() { ... RCLCPP_INFO(get_logger(), "Getting processed strings: \n [long] %s \n [short] %s", processed_long_string_.c_str(), processed_short_string_.c_str()); ... }
Example prints are:
[INFO] [1684486266.634665189] [subscriber_node]: Getting processed strings: [long] ** Long message with more characters (seq=31) ** [short] ** Short message (seq=384) **
Testing
1. Putting callbacks together in a same MutuallyExclusive
group
For this test, let us assume that TimerCallback()
is not used, having only two
subscription callbacks.
class MultiThreadMutuallyExclusiveSubscriber : public rclcpp::Node {
public:
MultiThreadMutuallyExclusiveSubscriber(rclcpp::NodeOptions node_options)
: Node("subscriber_node",
node_options.allow_undeclared_parameters(true)) {
rclcpp::SubscriptionOptions options;
options.callback_group =
create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
short_subscriber_ = create_subscription<std_msgs::msg::String>(
"/short_topic", rclcpp::QoS(10),
std::bind(&MultiThreadMutuallyExclusiveSubscriber::ShortTopicCallback,
this, std::placeholders::_1),
options);
long_subscriber_ = create_subscription<std_msgs::msg::String>(
"/long_topic", rclcpp::QoS(10),
std::bind(&MultiThreadMutuallyExclusiveSubscriber::LongTopicCallback,
this, std::placeholders::_1),
options);
...
We register the node into MultiThreadedExecutor
.
int main(int argc, char **argv) {
rclcpp::init(argc, argv);
rclcpp::NodeOptions option;
auto node = std::make_shared<MultiThreadMutuallyExclusiveSubscriber>(option);
rclcpp::executors::MultiThreadedExecutor executor;
executor.add_node(node);
executor.spin();
rclcpp::shutdown();
return 0;
}
Results
Publisher node output the below:
jbs@jbs:~/ros2_ws$ ros2 run simple_thread_tester publisher_node
[INFO] [1684497083.896133935] [publisher_node]: Short message (seq=0)
[INFO] [1684497083.996157929] [publisher_node]: Short message (seq=1)
[INFO] [1684497084.096092124] [publisher_node]: Short message (seq=2)
...
[INFO] [1684497084.596053297] [publisher_node]: Short message (seq=7)
[INFO] [1684497084.696097192] [publisher_node]: Short message (seq=8)
[INFO] [1684497084.796039187] [publisher_node]: Short message (seq=9)
[INFO] [1684497084.796129187] [publisher_node]: Long message with more characters (seq=0)
[INFO] [1684497084.896058682] [publisher_node]: Short message (seq=10)
[INFO] [1684497084.996097976] [publisher_node]: Short message (seq=11)
[INFO] [1684497085.096098071] [publisher_node]: Short message (seq=12)
[INFO] [1684497085.196101466] [publisher_node]: Short message (seq=13)
...
[INFO] [1684497085.796172334] [publisher_node]: Short message (seq=19)
[INFO] [1684497085.796503034] [publisher_node]: Long message with more characters (seq=1)
[INFO] [1684497085.896102029] [publisher_node]: Short message (seq=20)
[INFO] [1684497085.996114123] [publisher_node]: Short message (seq=21)
...
[INFO] [1684497086.596048091] [publisher_node]: Short message (seq=27)
[INFO] [1684497086.696198386] [publisher_node]: Short message (seq=28)
...
[INFO] [1684497087.196084460] [publisher_node]: Short message (seq=33)
[INFO] [1684497087.296135654] [publisher_node]: Short message (seq=34)
...
This is subscription result:
jbs@jbs:~/ros2_ws$ ros2 run simple_thread_tester multi_thread_mutually_exclusive_subscriber_node
[INFO] [1684497086.000256023] [subscriber_node]: Setting processed: ** Short message (seq=0) **
[INFO] [1684497088.205206606] [subscriber_node]: Setting processed: ** Short message (seq=12) **
[INFO] [1684497090.409344367] [subscriber_node]: Setting processed: ** Short message (seq=34) **
[INFO] [1684497092.613088724] [subscriber_node]: Setting processed: ** Short message (seq=56) **
[INFO] [1684497094.816876581] [subscriber_node]: Setting processed: ** Short message (seq=78) **
[INFO] [1684497097.121857358] [subscriber_node]: Setting processed: ** Short message (seq=100) **
[INFO] [1684497099.426175711] [subscriber_node]: Setting processed: ** Short message (seq=123) **
[INFO] [1684497101.730305265] [subscriber_node]: Setting processed: ** Short message (seq=146) **
[INFO] [1684497104.034430018] [subscriber_node]: Setting processed: ** Short message (seq=169) **
Is is very clear: callback for long_topic
was hardly handled. This is very
different from the round-robin results of SingleThreadedExecutor
(please
ignore timestamps):
jbs@jbs:~/ros2_ws$ ros2 run simple_thread_tester single_thread_subscriber_node
[INFO] [1684494333.628047147] [subscriber_node]: Setting processed: ** Short message (seq=0) **
[INFO] [1684494335.628859766] [subscriber_node]: Setting processed: ** Short message (seq=11) **
[INFO] [1684494337.629064684] [subscriber_node]: Setting processed: ** Long message with more characters (seq=0) **
[INFO] [1684494339.629355003] [subscriber_node]: Setting processed: ** Short message (seq=51) **
[INFO] [1684494341.629804743] [subscriber_node]: Setting processed: ** Long message with more characters (seq=1) **
[INFO] [1684494343.630044844] [subscriber_node]: Setting processed: ** Short message (seq=91) **
[INFO] [1684494345.630372444] [subscriber_node]: Setting processed: ** Long message with more characters (seq=2) **
[INFO] [1684494347.630739145] [subscriber_node]: Setting processed: ** Short message (seq=131) **
Apparently, the first result suggests that MultiThreadedExecutor
for
MutuallyExclusive
callback groups processes only short_topic
as its callback
has more chances to lock the mutex than long_topic
. It is working as if
SingleThreadedExecutor
only for short_topic
:
short topic seq=0
was published and subscribed at 83s.short topic seq=0
was finished for processing at 86s.short topic seq=12
was received at 85 and the first in the queue at 86 (queue size = 10).
This skipping behavior was reported in the
issues (opens in a new tab), where a contributor
Barry-Xu-2018
said (opens in a new tab)
MultiThreadedExecutor
can
clear or (skip) (opens in a new tab)
some waiting entities due to mutex related issues.
2. Putting callbacks in different MutuallyExclusive
groups
Now, we consider the timer callback to check thread safety (setters from subscribers vs getter from timer callback), and assign subscribers to different groups as below:
class MultiThreadMutuallyExclusiveSubscriber : public rclcpp::Node {
public:
MultiThreadMutuallyExclusiveSubscriber(rclcpp::NodeOptions node_options)
: Node("subscriber_node",
node_options.allow_undeclared_parameters(true)) {
rclcpp::SubscriptionOptions options;
options.callback_group =
create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
short_subscriber_ = create_subscription<std_msgs::msg::String>(
"/short_topic", rclcpp::QoS(10),
std::bind(&MultiThreadMutuallyExclusiveSubscriber::ShortTopicCallback,
this, std::placeholders::_1),
options);
options.callback_group =
create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
long_subscriber_ = create_subscription<std_msgs::msg::String>(
"/long_topic", rclcpp::QoS(10),
std::bind(&MultiThreadMutuallyExclusiveSubscriber::LongTopicCallback,
this, std::placeholders::_1),
options);
timer_ = create_wall_timer(
1s, std::bind(&MultiThreadMutuallyExclusiveSubscriber::TimerCallback,
this));
Not assigning any callback_group
to create_wall_timer
equals to the default
MutuallyExclusive
callback group which is another callback group.
If you just run multi_thread_reentrant_subscriber_node
as: (opens in a new tab)
ros2 run simple_thread_tester multi_thread_reentrant_subscriber_node
Initially it will not use mutexes. Thus, what we have is:
jbs@jbs:~/ros2_ws$ ros2 run simple_thread_tester multi_thread_mutually_exclusive_subscriber_node
[INFO] [1684499585.741976759] [subscriber_node]: Getting processed strings:
[long]
[short]
[INFO] [1684499586.741949677] [subscriber_node]: Getting processed strings:
[long]
[short]
...
[INFO] [1684499589.926415389] [subscriber_node]: Setting processed: ** Short message (seq=0) **
[INFO] [1684499592.830046581] [subscriber_node]: Setting processed: ** Long message with more characters (seq=0) **
[INFO] [1684499592.830167081] [subscriber_node]: Getting processed strings:
[INFO] [1684499596.937670591] [subscriber_node]: Setting processed: ** Long message with more characters (seq=1) **
[INFO] [1684499596.937803491] [subscriber_node]: Getting processed strings:
[long] ** Long message with more characters (seq=1) **
[short] ** Short message (seq=12) **
[INFO] [1684499599.141354135] [subscriber_node]: Setting processed: ** Short message (seq=63) **
[INFO] [1684499601.044588832] [subscriber_node]: Setting processed: ** Long message with more characters (seq=2) **
...
[INFO] [1684499727.951774404] [subscriber_node]: Getting processed strings:
[long]
[short]
[INFO] [1684499728.951765511] [subscriber_node]: Getting processed strings:
[long] ** L
[short] ** Short mess
[INFO] [1684499729.951769118] [subscriber_node]: Getting processed strings:
[long] ** Long messag
[short] ** Short message (seq=0
[INFO] [1684499730.100030734] [subscriber_node]: Setting processed: ** Short message (seq=0) **
[INFO] [1684499730.951765526] [subscriber_node]: Getting processed strings:
[long] ** Long message with mor
[short] ** Short mes
[INFO] [1684499731.951739133] [subscriber_node]: Getting processed strings:
[long] ** Long message with more characte
[short] ** Short message (seq=
[INFO] [1684499732.302576970] [subscriber_node]: Setting processed: ** Short message (seq=12) **
In conclusion, thread safety is not guaranteed. We now use two mutexes to protect thread safety as below:
void MultiThreadMutuallyExclusiveSubscriber::ShortTopicCallback(
const std_msgs::msg::String::SharedPtr msg) {
std::shared_ptr<std::unique_lock<std::mutex>> lock;
if (use_mutex)
mutex_list.short_topic_mutex.lock();
ProcessString(msg->data, processed_short_string_);
RCLCPP_INFO(get_logger(), "Setting processed: %s",
processed_short_string_.c_str());
if (use_mutex)
mutex_list.short_topic_mutex.unlock();
}
This code is intended for a thread safety experiment, and it is a bad design.
We should minimize the critical sections and not include ProcessString(...);
in the mutex zone.
subscriber_node
will use mutex by param set
:
ros2 param set /subscriber_node use_mutex true
The result? Thread safe!:
jbs@jbs:~/ros2_ws$ ros2 run simple_thread_tester multi_thread_mutually_exclusive_subscriber_node
[INFO] [1684500853.761733245] [subscriber_node]: Getting processed strings:
[long]
[short]
[INFO] [1684500854.761738832] [subscriber_node]: Getting processed strings:
[long]
[short]
[short]
[INFO] [1684500858.252912414] [subscriber_node]: will use mutex
[INFO] [1684500858.761723597] [subscriber_node]: Getting processed strings:
[long]
[short]
[INFO] [1684500859.761786863] [subscriber_node]: Getting processed strings:
[long]
[short]
[INFO] [1684500860.761909929] [subscriber_node]: Getting processed strings:
[INFO] [1684500870.203406757] [subscriber_node]: Setting processed: ** Long message with more characters (seq=1) **
[INFO] [1684500870.203533957] [subscriber_node]: Getting processed strings:
[long] ** Long message with more characters (seq=1) **
[short] ** Short message (seq=12) **
[INFO] [1684500872.407167598] [subscriber_node]: Setting processed: ** Short message (seq=63) **
[INFO] [1684500874.310072249] [subscriber_node]: Setting processed: ** Long message with more characters (seq=2) **
[INFO] [1684500874.310223049] [subscriber_node]: Getting processed strings:
[long] ** Long message with more characters (seq=2) **
[short] ** Short message (seq=63) **
[INFO] [1684500876.613996191] [subscriber_node]: Setting processed: ** Short message (seq=104) **
[INFO] [1684500878.417606122] [subscriber_node]: Setting processed: ** Long message with more characters (seq=3) **
[INFO] [1684500878.417995222] [subscriber_node]: Getting processed strings:
[long] ** Long message with more characters (seq=3) **
[short] ** Short message (seq=104) **
[INFO] [1684500880.722269197] [subscriber_node]: Setting processed: ** Short message (seq=146) **
[INFO] [1684500882.524751487] [subscriber_node]: Setting processed: ** Long message with more characters (seq=7) **
[INFO] [1684500882.524907086] [subscriber_node]: Getting processed strings:
[long] ** Long message with more characters (seq=7) **