Multi-thread of ROS 2 with Callback Group - Part 1: Reentrant

Recommend to read previous article. All the publisher and subscription system is the same, and I will not explain them here.
In the previous article, I wrote about the single threading of callback processing in ROS2. Here, let me cover multi-threaded callback processing. The publisher node and the problem scenario is similar with the previous article article.
In ROS2, almost every operations are processed in callbacks (subscription, services, timers...): they are queued at corresponding events (e.g., subscription callback is queued when message arrived.) and called (executed) by executors (opens in a new tab).
This article is about how ROS 2 provides a utility when executing callbacks on
multiple-threads. First of all, we have MultiThreadedExecutor
where callbacks
are executed on multiple threads:
int main(int argc, char **argv) {
rclcpp::init(argc, argv);
auto node = std::make_shared<MultiThreadReentrantSubscriber>();
rclcpp::executors::MultiThreadedExecutor executor;
executor.add_node(node);
executor.spin();
rclcpp::shutdown();
return 0;
}
How are executor
threading (scheduling) the callbacks defined inside node
?
Callback groups (opens in a new tab)
are the key to understanding:
- Reentrant Callback Group
- Mutually Exclusive Callback Group
For example, we register callbacks into a reentrant callback group by:
MultiThreadReentrantSubscriber() : Node("subscriber_node") {
rclcpp::SubscriptionOptions options;
options.callback_group =
create_callback_group(rclcpp::CallbackGroupType::Reentrant);
short_subscriber_ = create_subscription<std_msgs::msg::String>(
"/short_topic", rclcpp::QoS(10),
std::bind(&MultiThreadReentrantSubscriber::ShortTopicCallback, this,
std::placeholders::_1),
options);
long_subscriber_ = create_subscription<std_msgs::msg::String>(
"/long_topic", rclcpp::QoS(10),
std::bind(&MultiThreadReentrantSubscriber::LongTopicCallback, this,
std::placeholders::_1),
options);
}
This article will investigate the Reentrant Callback Group, and the following article will explain Mutually Exclusive Callback Group.
The full source code of this implementation can be found here (opens in a new tab).
Concept
If we have create a reentrant callback group and assign callbacks to the group, executors will try to assign the incoming callbacks to available callbacks threads in a FIFO manner as the figure shows.

Can you guess what if there's no available thread? For example, callback processing takes a long time, compared to the speed of queueing. Reading to the end will answer this.
Experiment result
Having the publisher (opens in a new tab) sending topics as below:
jbs@jbs:~/ros2_ws$ ros2 run simple_thread_tester publisher_node
[INFO] [1684145121.677229561] [publisher_node]: Short message (seq=0)
[INFO] [1684145121.777211870] [publisher_node]: Short message (seq=1)
[INFO] [1684145121.877112879] [publisher_node]: Short message (seq=2)
...
[INFO] [1684145122.377120627] [publisher_node]: Short message (seq=7)
[INFO] [1684145122.477210436] [publisher_node]: Short message (seq=8)
[INFO] [1684145122.577112846] [publisher_node]: Short message (seq=9)
[INFO] [1684145122.577202746] [publisher_node]: Long message with more characters (seq=0)
[INFO] [1684145122.677122355] [publisher_node]: Short message (seq=10)
[INFO] [1684145122.777128565] [publisher_node]: Short message (seq=11)
...
[INFO] [1684145123.277118304] [publisher_node]: Short message (seq=16)
[INFO] [1684145123.377120107] [publisher_node]: Short message (seq=17)
[INFO] [1684145123.477122111] [publisher_node]: Short message (seq=18)
[INFO] [1684145123.577089714] [publisher_node]: Short message (seq=19)
[INFO] [1684145123.577184314] [publisher_node]: Long message with more characters (seq=1)
[INFO] [1684145123.677160617] [publisher_node]: Short message (seq=20)
[INFO] [1684145123.777177921] [publisher_node]: Short message (seq=21)
Let us assume that the subscriber takes 2s to process each callback, where it adds stars to the both ends of the message, and id of the processing thread id as:
std::string
MultiThreadReentrantSubscriber::ProcessString(const std::string &raw_string) {
std::thread::id this_id = std::this_thread::get_id();
std::ostringstream oss;
oss << std::this_thread::get_id();
std::this_thread::sleep_for(std::chrono::seconds(2));
return "** " + raw_string + " **" + " ( by " + oss.str() + ") ";
};
void MultiThreadReentrantSubscriber::ShortTopicCallback(
const std_msgs::msg::String::SharedPtr msg) {
auto processed_string = ProcessString(msg->data);
processed_short_string_ = processed_string;
RCLCPP_INFO(get_logger(), "Setting processed: %s", processed_string.c_str());
processed_long_string_ = processed_string;
}
Then, the result of the callback will be the below:
jbs@jbs:~/ros2_ws$ ros2 run simple_thread_tester multi_thread_reentrant_subscriber_node
[INFO] [1684145123.678675118] [subscriber_node]: Setting processed: ** Short message (seq=0) ** ( by 140113577211648)
[INFO] [1684145123.778082521] [subscriber_node]: Setting processed: ** Short message (seq=1) ** ( by 140113859917568)
[INFO] [1684145123.877571624] [subscriber_node]: Setting processed: ** Short message (seq=2) ** ( by 140113778571008)
[INFO] [1684145123.977534428] [subscriber_node]: Setting processed: ** Short message (seq=3) ** ( by 140113761785600)
[INFO] [1684145124.078418731] [subscriber_node]: Setting processed: ** Short message (seq=4) ** ( by 140113770178304)
[INFO] [1684145124.177545434] [subscriber_node]: Setting processed: ** Short message (seq=5) ** ( by 140113753392896)
[INFO] [1684145124.277588538] [subscriber_node]: Setting processed: ** Short message (seq=6) ** ( by 140113745000192)
[INFO] [1684145124.377553041] [subscriber_node]: Setting processed: ** Short message (seq=7) ** ( by 140113728214784)
[INFO] [1684145124.477698345] [subscriber_node]: Setting processed: ** Short message (seq=8) ** ( by 140113736607488)
[INFO] [1684145124.577653748] [subscriber_node]: Setting processed: ** Short message (seq=9) ** ( by 140113635960576)
[INFO] [1684145124.577692848] [subscriber_node]: Setting processed: ** Long message with more characters (seq=0) ** ( by 140113644353280)
[INFO] [1684145124.677715452] [subscriber_node]: Setting processed: ** Short message (seq=10) ** ( by 140113619175168)
[INFO] [1684145124.777614455] [subscriber_node]: Setting processed: ** Short message (seq=11) ** ( by 140113627567872)
[INFO] [1684145124.877568058] [subscriber_node]: Setting processed: ** Short message (seq=12) ** ( by 140113610782464)
[INFO] [1684145124.977619262] [subscriber_node]: Setting processed: ** Short message (seq=13) ** ( by 140113602389760)
[INFO] [1684145125.077692265] [subscriber_node]: Setting processed: ** Short message (seq=14) ** ( by 140113585604352)
[INFO] [1684145125.177719769] [subscriber_node]: Setting processed: ** Short message (seq=15) ** ( by 140113593997056)
[INFO] [1684145125.277602572] [subscriber_node]: Setting processed: ** Short message (seq=16) ** ( by 140113876702976)
[INFO] [1684145125.377583375] [subscriber_node]: Setting processed: ** Short message (seq=17) ** ( by 140113976041280)
[INFO] [1684145125.477735379] [subscriber_node]: Setting processed: ** Short message (seq=18) ** ( by 140113868310272)
[INFO] [1684145125.679380286] [subscriber_node]: Setting processed: ** Short message (seq=19) ** ( by 140113577211648)
[INFO] [1684145125.778339289] [subscriber_node]: Setting processed: ** Long message with more characters (seq=1) ** ( by 140113859917568)
[INFO] [1684145125.877828592] [subscriber_node]: Setting processed: ** Short message (seq=20) ** ( by 140113778571008)
[INFO] [1684145125.977725796] [subscriber_node]: Setting processed: ** Short message (seq=21) ** ( by 140113761785600)
[INFO] [1684145126.078575999] [subscriber_node]: Setting processed: ** Short message (seq=22) ** ( by 140113770178304)
Analysis
For the brevity, I denoted the time format 1684145126.078575999
as 26.07
.

Reentrant group was using 20 threads and handled events in FIFO.
From the result, you will find that total 20 callback events (from seq = 0
to
seq=18
of short_topic
(including seq=0
of long_topic
)) are handled in
distinctive threads and seq=19
of short_topic
are assigned to the same
thread 140113577211648
with seq=0
. Ah! Callbacks of the reentrant callback
group are using 20 threads. At this time, there was no round robing as single
thread executor.
When no available thread, events in the queue are waiting..
It was almost the case that every time gaps of subscriber callback ending of
short_topic
was 0.1s
which is the exact publishing rate of short_topic
.
But why 0.2
stride between seq=18
and seq=19
of short_topic
?
[INFO] [1684145125.277602572] [subscriber_node]: Setting processed: ** Short message (seq=16) ** ( by 140113876702976)
[INFO] [1684145125.377583375] [subscriber_node]: Setting processed: ** Short message (seq=17) ** ( by 140113976041280)
[INFO] [1684145125.477735379] [subscriber_node]: Setting processed: ** Short message (seq=18) ** ( by 140113868310272)
[INFO] [1684145125.679380286] [subscriber_node]: Setting processed: ** Short message (seq=19) ** ( by 140113577211648)
seq=18
used the last available thread 140113868310272
, and seq=19
should
wait until the first thread 140113577211648
is freed from seq=0
callback
handling.