51 template <
typename T1,
typename T2>
54 using T1Stamped = std::pair<unsigned long, T1>;
55 using T2Stamped = std::pair<unsigned long, T2>;
58 std::mutex publish_mutex_;
59 std::deque<T1Stamped> queueT1;
60 std::deque<T2Stamped> queueT2;
62 using CallbackFunction = std::function<void(T1, T2,
unsigned long,
unsigned long)>;
64 std::map<int, CallbackFunction> cb_;
65 int callback_counter = 0;
71 std::unique_lock<std::mutex> publish_lock (publish_mutex_);
72 cb_[callback_counter] = callback;
73 return callback_counter++;
79 std::unique_lock<std::mutex> publish_lock (publish_mutex_);
84 add0 (
const T1& t,
unsigned long time)
87 queueT1.push_back (T1Stamped (time, t));
93 add1 (
const T2& t,
unsigned long time)
96 queueT2.push_back (T2Stamped (time, t));
106 std::unique_lock<std::mutex> lock1 (mutex1_);
107 std::unique_lock<std::mutex> lock2 (mutex2_);
109 for (
const auto& cb: cb_)
113 cb.second.operator()(queueT1.front ().second, queueT2.front ().second, queueT1.front ().first, queueT2.front ().first);
117 queueT1.pop_front ();
118 queueT2.pop_front ();
125 std::unique_lock<std::mutex> publish_lock (publish_mutex_);
127 std::unique_lock<std::mutex> lock1 (mutex1_);
128 if (queueT1.empty ())
130 T1Stamped t1 = queueT1.front ();
133 std::unique_lock<std::mutex> lock2 (mutex2_);
134 if (queueT2.empty ())
136 T2Stamped t2 = queueT2.front ();
139 bool do_publish =
false;
141 if (t1.first <= t2.first)
144 while (queueT1.size () > 1 && queueT1[1].first <= t2.first)
145 queueT1.pop_front ();
147 if (queueT1.size () > 1)
149 if ( (t2.first << 1) > (queueT1[0].first + queueT1[1].first) )
150 queueT1.pop_front ();
159 while (queueT2.size () > 1 && (queueT2[1].first <= t1.first) )
160 queueT2.pop_front ();
162 if (queueT2.size () > 1)
164 if ( (t1.first << 1) > queueT2[0].first + queueT2[1].first )
165 queueT2.pop_front ();