17 #ifndef __TBB__flow_graph_join_impl_H 18 #define __TBB__flow_graph_join_impl_H 20 #ifndef __TBB_flow_graph_H 21 #error Do not #include this internal file directly; use public TBB headers instead. 39 template<
typename KeyType>
43 virtual task * increment_key_count(current_key_type
const & ,
bool ) = 0;
50 template<
typename TupleType,
typename PortType >
52 tbb::flow::get<N-1>( my_input ).set_join_node_pointer(port);
55 template<
typename TupleType >
57 tbb::flow::get<N-1>( my_input ).consume();
61 template<
typename TupleType >
63 tbb::flow::get<N-1>( my_input ).
release();
66 template <
typename TupleType>
69 release_my_reservation(my_input);
72 template<
typename InputTuple,
typename OutputTuple >
73 static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
74 if ( !tbb::flow::get<N-1>( my_input ).reserve( tbb::flow::get<N-1>( out ) ) )
return false;
76 release_my_reservation( my_input );
82 template<
typename InputTuple,
typename OutputTuple>
83 static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
84 bool res = tbb::flow::get<N-1>(my_input).get_item(tbb::flow::get<N-1>(out) );
88 template<
typename InputTuple,
typename OutputTuple>
89 static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
90 return get_my_item(my_input, out);
93 template<
typename InputTuple>
96 tbb::flow::get<N-1>(my_input).reset_port();
99 template<
typename InputTuple>
101 reset_my_port(my_input);
104 template<
typename InputTuple,
typename KeyFuncTuple>
106 tbb::flow::get<N-1>(my_input).set_my_key_func(tbb::flow::get<N-1>(my_key_funcs));
107 tbb::flow::get<N-1>(my_key_funcs) = NULL;
111 template<
typename KeyFuncTuple>
113 if(tbb::flow::get<N-1>(other_inputs).get_my_key_func()) {
114 tbb::flow::get<N-1>(my_inputs).set_my_key_func(tbb::flow::get<N-1>(other_inputs).get_my_key_func()->clone());
119 template<
typename InputTuple>
122 tbb::flow::get<N-1>(my_input).reset_receiver(f);
125 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 126 template<
typename InputTuple>
127 static inline void extract_inputs(InputTuple &my_input) {
129 tbb::flow::get<N-1>(my_input).extract_receiver();
137 template<
typename TupleType,
typename PortType >
139 tbb::flow::get<0>( my_input ).set_join_node_pointer(port);
142 template<
typename TupleType >
144 tbb::flow::get<0>( my_input ).consume();
147 template<
typename TupleType >
149 tbb::flow::get<0>( my_input ).
release();
152 template<
typename TupleType>
154 release_my_reservation(my_input);
157 template<
typename InputTuple,
typename OutputTuple >
158 static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
159 return tbb::flow::get<0>( my_input ).reserve( tbb::flow::get<0>( out ) );
162 template<
typename InputTuple,
typename OutputTuple>
163 static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
164 return tbb::flow::get<0>(my_input).get_item(tbb::flow::get<0>(out));
167 template<
typename InputTuple,
typename OutputTuple>
168 static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
169 return get_my_item(my_input, out);
172 template<
typename InputTuple>
174 tbb::flow::get<0>(my_input).reset_port();
177 template<
typename InputTuple>
179 reset_my_port(my_input);
182 template<
typename InputTuple,
typename KeyFuncTuple>
184 tbb::flow::get<0>(my_input).set_my_key_func(tbb::flow::get<0>(my_key_funcs));
185 tbb::flow::get<0>(my_key_funcs) = NULL;
188 template<
typename KeyFuncTuple>
190 if(tbb::flow::get<0>(other_inputs).get_my_key_func()) {
191 tbb::flow::get<0>(my_inputs).set_my_key_func(tbb::flow::get<0>(other_inputs).get_my_key_func()->clone());
194 template<
typename InputTuple>
196 tbb::flow::get<0>(my_input).reset_receiver(f);
199 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 200 template<
typename InputTuple>
201 static inline void extract_inputs(InputTuple &my_input) {
202 tbb::flow::get<0>(my_input).extract_receiver();
208 template<
typename T >
213 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 214 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
215 typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
220 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 221 , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
232 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 234 predecessor_list_type *plist;
238 type(char(t)), my_arg(const_cast<T*>(&e)) {}
240 my_pred(const_cast<predecessor_type *>(&s)) {}
250 bool no_predecessors;
253 op_list = op_list->
next;
254 switch(current->
type) {
256 no_predecessors = my_predecessors.empty();
257 my_predecessors.add(*(current->
my_pred));
258 if ( no_predecessors ) {
259 (
void) my_join->decrement_port_count(
true);
264 my_predecessors.remove(*(current->
my_pred));
265 if(my_predecessors.empty()) my_join->increment_port_count();
272 else if ( my_predecessors.try_reserve( *(current->
my_arg) ) ) {
276 if ( my_predecessors.empty() ) {
277 my_join->increment_port_count();
284 my_predecessors.try_release( );
289 my_predecessors.try_consume( );
292 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 294 my_predecessors.internal_add_built_predecessor(*(current->
my_pred));
298 my_predecessors.internal_delete_built_predecessor(*(current->
my_pred));
302 current->cnt_val = my_predecessors.predecessor_count();
306 my_predecessors.copy_predecessors(*(current->plist));
315 template<
typename R,
typename B >
friend class run_and_put_task;
323 return my_join->graph_ref;
331 my_predecessors.set_owner(
this );
339 my_predecessors.set_owner(
this );
350 my_aggregator.
execute(&op_data);
357 my_aggregator.
execute(&op_data);
364 my_aggregator.
execute(&op_data);
371 my_aggregator.
execute(&op_data);
377 my_aggregator.
execute(&op_data);
380 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 381 built_predecessors_type &built_predecessors()
__TBB_override {
return my_predecessors.built_predecessors(); }
382 void internal_add_built_predecessor(predecessor_type &src)
__TBB_override {
384 my_aggregator.
execute(&op_data);
387 void internal_delete_built_predecessor(predecessor_type &src)
__TBB_override {
389 my_aggregator.
execute(&op_data);
394 my_aggregator.
execute(&op_data);
395 return op_data.cnt_val;
401 my_aggregator.
execute(&op_data);
404 void extract_receiver() {
405 my_predecessors.built_predecessors().receiver_extract(*
this);
413 my_predecessors.reset();
415 __TBB_ASSERT(!(f&rf_clear_edges) || my_predecessors.empty(),
"port edges not removed");
419 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 420 friend class get_graph_helper;
435 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 436 typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
437 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
442 enum op_type { get__item, res_port, try__put_task
443 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 444 , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
453 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 454 predecessor_type *pred;
456 predecessor_list_type *plist;
461 type(char(t)), my_val(e)
466 type(char(t)), my_arg(const_cast<T*>(p))
484 op_list = op_list->
next;
485 switch(current->
type) {
486 case try__put_task: {
488 was_empty = this->buffer_empty();
489 this->push_back(current->
my_val);
490 if (was_empty) rtask = my_join->decrement_port_count(
false);
498 if(!this->buffer_empty()) {
499 *(current->
my_arg) = this->front();
507 __TBB_ASSERT(this->my_item_valid(this->my_head),
"No item to reset");
508 this->destroy_front();
509 if(this->my_item_valid(this->my_head)) {
510 (
void)my_join->decrement_port_count(
true);
514 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 516 my_built_predecessors.add_edge(*(current->pred));
520 my_built_predecessors.delete_edge(*(current->pred));
524 current->cnt_val = my_built_predecessors.edge_count();
528 my_built_predecessors.copy_edges(*(current->plist));
538 template<
typename R,
typename B >
friend class run_and_put_task;
543 my_aggregator.
execute(&op_data);
550 return my_join->graph_ref;
574 my_aggregator.
execute(&op_data);
582 my_aggregator.
execute(&op_data);
586 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 587 built_predecessors_type &built_predecessors()
__TBB_override {
return my_built_predecessors; }
589 void internal_add_built_predecessor(predecessor_type &
p)
__TBB_override {
592 my_aggregator.
execute(&op_data);
595 void internal_delete_built_predecessor(predecessor_type &p)
__TBB_override {
598 my_aggregator.
execute(&op_data);
603 my_aggregator.
execute(&op_data);
604 return op_data.cnt_val;
610 my_aggregator.
execute(&op_data);
613 void extract_receiver() {
615 my_built_predecessors.receiver_extract(*
this);
622 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 624 my_built_predecessors.clear();
629 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 630 friend class get_graph_helper;
634 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 635 edge_container<predecessor_type> my_built_predecessors;
649 template<
typename K >
657 template<
class TraitsType >
659 public receiver<typename TraitsType::T>,
660 public hash_buffer< typename TraitsType::K, typename TraitsType::T, typename TraitsType::TtoK,
661 typename TraitsType::KHash > {
672 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 673 typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
674 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
680 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 681 , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
690 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 691 predecessor_type *pred;
693 predecessor_list_type *plist;
697 type(char(t)), my_val(e) {}
700 type(char(t)), my_arg(const_cast<input_type*>(p)) {}
713 op_list = op_list->
next;
714 switch(current->
type) {
716 bool was_inserted = this->insert_with_key(current->
my_val);
723 if(!this->find_with_key(my_join->current_key, *(current->
my_arg))) {
724 __TBB_ASSERT(
false,
"Failed to find item corresponding to current_key.");
730 this->delete_with_key(my_join->current_key);
733 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 735 my_built_predecessors.add_edge(*(current->pred));
739 my_built_predecessors.delete_edge(*(current->pred));
743 current->cnt_val = my_built_predecessors.edge_count();
747 my_built_predecessors.copy_edges(*(current->plist));
756 template<
typename R,
typename B >
friend class run_and_put_task;
762 my_aggregator.
execute(&op_data);
764 rtask = my_join->increment_key_count((*(this->get_key_func()))(v),
false);
772 return my_join->graph_ref;
801 my_aggregator.
execute(&op_data);
805 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 806 built_predecessors_type &built_predecessors()
__TBB_override {
return my_built_predecessors; }
808 void internal_add_built_predecessor(predecessor_type &
p)
__TBB_override {
811 my_aggregator.
execute(&op_data);
814 void internal_delete_built_predecessor(predecessor_type &p)
__TBB_override {
817 my_aggregator.
execute(&op_data);
822 my_aggregator.
execute(&op_data);
823 return op_data.cnt_val;
829 my_aggregator.
execute(&op_data);
837 my_aggregator.
execute(&op_data);
841 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 842 void extract_receiver() {
843 buffer_type::reset();
844 my_built_predecessors.receiver_extract(*
this);
849 buffer_type::reset();
850 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 852 my_built_predecessors.clear();
860 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 861 edge_container<predecessor_type> my_built_predecessors;
865 using namespace graph_policy_namespace;
867 template<
typename JP,
typename InputTuple,
typename OutputTuple>
871 template<
typename JP,
typename InputTuple,
typename OutputTuple>
874 template<
typename InputTuple,
typename OutputTuple>
883 ports_with_no_inputs = N;
888 ports_with_no_inputs = N;
892 void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
895 ++ports_with_no_inputs;
900 if(ports_with_no_inputs.fetch_and_decrement() == 1) {
902 task *rtask =
new ( task::allocate_additional_child_of( *(this->
graph_ref.root_task()) ) )
904 if(!handle_task)
return rtask;
917 ports_with_no_inputs = N;
921 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 924 ports_with_no_inputs = N;
932 return !ports_with_no_inputs;
936 if(ports_with_no_inputs)
return false;
952 template<
typename InputTuple,
typename OutputTuple>
961 ports_with_no_items = N;
966 ports_with_no_items = N;
971 void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
974 ports_with_no_items = N;
980 if(ports_with_no_items.fetch_and_decrement() == 1) {
982 task *rtask =
new ( task::allocate_additional_child_of( *(this->
graph_ref.root_task()) ) )
984 if(!handle_task)
return rtask;
1002 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1011 return !ports_with_no_items;
1015 if(ports_with_no_items)
return false;
1033 template<
typename InputTuple,
typename OutputTuple,
typename K,
typename KHash>
1037 typename tbb::internal::strip<K>::type&,
1038 count_element<typename tbb::internal::strip<K>::type>,
1039 internal::type_to_key_function_body<
1040 count_element<typename tbb::internal::strip<K>::type>,
1041 typename tbb::internal::strip<K>::type& >,
1070 enum op_type { res_count, inc_count, may_succeed, try_make };
1082 my_output(NULL), bypass_t(NULL), enqueue_task(q_task) {}
1084 enqueue_task(true) {}
1100 this->current_key = t;
1101 this->delete_with_key(this->current_key);
1103 this->push_back(l_out);
1105 rtask =
new ( task::allocate_additional_child_of( *(this->
graph_ref.root_task()) ) )
1117 __TBB_ASSERT(
false,
"should have had something to push");
1123 key_matching_FE_operation *current;
1126 op_list = op_list->next;
1127 switch(current->type) {
1130 this->destroy_front();
1135 count_element_type *
p = 0;
1136 unref_key_type &t = current->my_val;
1137 bool do_enqueue = current->enqueue_task;
1138 if(!(this->find_ref_with_key(t,p))) {
1139 count_element_type ev;
1142 this->insert_with_key(ev);
1143 if(!(this->find_ref_with_key(t,p))) {
1144 __TBB_ASSERT(
false,
"should find key after inserting it");
1148 task *rtask = fill_output_buffer(t,
true, do_enqueue);
1149 __TBB_ASSERT(!rtask || !do_enqueue,
"task should not be returned");
1150 current->bypass_t = rtask;
1159 if(this->buffer_empty()) {
1163 *(current->my_output) = this->front();
1173 template<
typename FunctionTuple>
1174 join_node_FE(graph &g, FunctionTuple &TtoK_funcs) : forwarding_base_type(g), my_node(NULL) {
1178 TtoK_function_body_type *cfb =
new TtoK_function_body_leaf_type(key_to_count_func());
1179 this->set_key_func(cfb);
1183 output_buffer_type() {
1188 TtoK_function_body_type *cfb =
new TtoK_function_body_leaf_type(key_to_count_func());
1189 this->set_key_func(cfb);
1193 void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
1196 key_matching_FE_operation op_data(res_count);
1197 my_aggregator.
execute(&op_data);
1204 key_matching_FE_operation op_data(t, handle_task, inc_count);
1205 my_aggregator.
execute(&op_data);
1206 return op_data.bypass_t;
1221 key_to_count_buffer_type::reset();
1222 output_buffer_type::reset();
1225 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1229 key_to_count_buffer_type::reset();
1230 output_buffer_type::reset();
1237 key_matching_FE_operation op_data(may_succeed);
1238 my_aggregator.
execute(&op_data);
1245 key_matching_FE_operation op_data(&out,try_make);
1246 my_aggregator.
execute(&op_data);
1263 template<
typename JP,
typename InputTuple,
typename OutputTuple>
1265 public sender<OutputTuple> {
1267 using graph_node::my_graph;
1273 using input_ports_type::tuple_build_may_succeed;
1274 using input_ports_type::try_to_make_tuple;
1275 using input_ports_type::tuple_accepted;
1276 using input_ports_type::tuple_rejected;
1277 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1278 typedef typename sender<output_type>::built_successors_type built_successors_type;
1279 typedef typename sender<output_type>::successor_list_type successor_list_type;
1285 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1286 , add_blt_succ, del_blt_succ, blt_succ_cnt, blt_succ_cpy
1297 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1299 successor_list_type *slist;
1304 my_arg(const_cast<output_type*>(&e)), bypass_t(NULL) {}
1306 my_succ(const_cast<successor_type *>(&s)), bypass_t(NULL) {}
1319 op_list = op_list->
next;
1320 switch(current->
type) {
1322 my_successors.register_successor(*(current->
my_succ));
1324 task *rtask =
new ( task::allocate_additional_child_of(*(my_graph.root_task())) )
1328 forwarder_busy =
true;
1334 my_successors.remove_successor(*(current->
my_succ));
1338 if(tuple_build_may_succeed()) {
1339 if(try_to_make_tuple(*(current->
my_arg))) {
1347 case do_fwrd_bypass: {
1348 bool build_succeeded;
1349 task *last_task = NULL;
1351 if(tuple_build_may_succeed()) {
1353 build_succeeded = try_to_make_tuple(out);
1354 if(build_succeeded) {
1355 task *new_task = my_successors.try_put_task(out);
1362 build_succeeded =
false;
1365 }
while(build_succeeded);
1369 forwarder_busy =
false;
1372 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1374 my_successors.internal_add_built_successor(*(current->
my_succ));
1378 my_successors.internal_delete_built_successor(*(current->
my_succ));
1382 current->cnt_val = my_successors.successor_count();
1386 my_successors.copy_successors(*(current->slist));
1395 join_node_base(graph &g) : graph_node(g), input_ports_type(g), forwarder_busy(false) {
1396 my_successors.set_owner(
this);
1397 input_ports_type::set_my_node(
this);
1402 graph_node(other.graph_node::my_graph), input_ports_type(other),
1403 sender<OutputTuple>(), forwarder_busy(false), my_successors() {
1404 my_successors.set_owner(
this);
1405 input_ports_type::set_my_node(
this);
1409 template<
typename FunctionTuple>
1410 join_node_base(graph &g, FunctionTuple f) : graph_node(g), input_ports_type(g, f), forwarder_busy(false) {
1411 my_successors.set_owner(
this);
1412 input_ports_type::set_my_node(
this);
1418 my_aggregator.
execute(&op_data);
1424 my_aggregator.
execute(&op_data);
1430 my_aggregator.
execute(&op_data);
1434 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1435 built_successors_type &built_successors()
__TBB_override {
return my_successors.built_successors(); }
1437 void internal_add_built_successor( successor_type &r)
__TBB_override {
1439 my_aggregator.
execute(&op_data);
1442 void internal_delete_built_successor( successor_type &r)
__TBB_override {
1444 my_aggregator.
execute(&op_data);
1449 my_aggregator.
execute(&op_data);
1450 return op_data.cnt_val;
1456 my_aggregator.
execute(&op_data);
1460 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1462 input_ports_type::extract();
1463 my_successors.built_successors().sender_extract(*
this);
1470 input_ports_type::reset(f);
1480 my_aggregator.
execute(&op_data);
1487 template<
int N,
template<
class>
class PT,
typename OutputTuple,
typename JP>
1492 template<
int N,
typename OutputTuple,
typename K,
typename KHash>
1507 template<
int N,
template<
class>
class PT,
typename OutputTuple,
typename JP>
1519 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1520 template <
typename K,
typename T>
1521 struct key_from_message_body {
1522 K operator()(
const T& t)
const {
1524 return key_from_message<K>(t);
1528 template <
typename K,
typename T>
1529 struct key_from_message_body<K&,T> {
1530 const K& operator()(
const T& t)
const {
1532 return key_from_message<const K&>(t);
1539 template<
typename OutputTuple,
typename K,
typename KHash>
1541 join_base<2,key_matching_port,OutputTuple,key_matching<K,KHash> >
::type {
1553 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1555 func_initializer_type(
1561 template<
typename Body0,
typename Body1>
1563 func_initializer_type(
1572 template<
typename OutputTuple,
typename K,
typename KHash>
1574 join_base<3,key_matching_port,OutputTuple,key_matching<K,KHash> >
::type {
1588 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1590 func_initializer_type(
1597 template<
typename Body0,
typename Body1,
typename Body2>
1599 func_initializer_type(
1609 template<
typename OutputTuple,
typename K,
typename KHash>
1611 join_base<4,key_matching_port,OutputTuple,key_matching<K,KHash> >
::type {
1627 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1629 func_initializer_type(
1637 template<
typename Body0,
typename Body1,
typename Body2,
typename Body3>
1639 func_initializer_type(
1650 template<
typename OutputTuple,
typename K,
typename KHash>
1652 join_base<5,key_matching_port,OutputTuple,key_matching<K,KHash> >
::type {
1670 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1672 func_initializer_type(
1681 template<
typename Body0,
typename Body1,
typename Body2,
typename Body3,
typename Body4>
1682 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4) : base_type(g,
1683 func_initializer_type(
1695 #if __TBB_VARIADIC_MAX >= 6 1696 template<
typename OutputTuple,
typename K,
typename KHash>
1697 class unfolded_join_node<6,key_matching_port,OutputTuple,key_matching<K,KHash> > :
public 1698 join_base<6,key_matching_port,OutputTuple,key_matching<K,KHash> >
::type {
1718 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1720 func_initializer_type(
1730 template<
typename Body0,
typename Body1,
typename Body2,
typename Body3,
typename Body4,
typename Body5>
1731 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4, Body5 body5)
1732 : base_type(g, func_initializer_type(
1746 #if __TBB_VARIADIC_MAX >= 7 1747 template<
typename OutputTuple,
typename K,
typename KHash>
1748 class unfolded_join_node<7,key_matching_port,OutputTuple,key_matching<K,KHash> > :
public 1749 join_base<7,key_matching_port,OutputTuple,key_matching<K,KHash> >
::type {
1769 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p >
func_initializer_type;
1771 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1773 func_initializer_type(
1784 template<
typename Body0,
typename Body1,
typename Body2,
typename Body3,
typename Body4,
1785 typename Body5,
typename Body6>
1786 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1787 Body5 body5, Body6 body6) : base_type(g, func_initializer_type(
1798 unfolded_join_node(
const unfolded_join_node &other) : base_type(other) {}
1802 #if __TBB_VARIADIC_MAX >= 8 1803 template<
typename OutputTuple,
typename K,
typename KHash>
1804 class unfolded_join_node<8,key_matching_port,OutputTuple,key_matching<K,KHash> > :
public 1805 join_base<8,key_matching_port,OutputTuple,key_matching<K,KHash> >
::type {
1827 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p >
func_initializer_type;
1829 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1831 func_initializer_type(
1843 template<
typename Body0,
typename Body1,
typename Body2,
typename Body3,
typename Body4,
1844 typename Body5,
typename Body6,
typename Body7>
1845 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1846 Body5 body5, Body6 body6, Body7 body7) : base_type(g, func_initializer_type(
1858 unfolded_join_node(
const unfolded_join_node &other) : base_type(other) {}
1862 #if __TBB_VARIADIC_MAX >= 9 1863 template<
typename OutputTuple,
typename K,
typename KHash>
1864 class unfolded_join_node<9,key_matching_port,OutputTuple,key_matching<K,KHash> > :
public 1865 join_base<9,key_matching_port,OutputTuple,key_matching<K,KHash> >
::type {
1889 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p >
func_initializer_type;
1891 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1893 func_initializer_type(
1906 template<
typename Body0,
typename Body1,
typename Body2,
typename Body3,
typename Body4,
1907 typename Body5,
typename Body6,
typename Body7,
typename Body8>
1908 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1909 Body5 body5, Body6 body6, Body7 body7, Body8 body8) : base_type(g, func_initializer_type(
1922 unfolded_join_node(
const unfolded_join_node &other) : base_type(other) {}
1926 #if __TBB_VARIADIC_MAX >= 10 1927 template<
typename OutputTuple,
typename K,
typename KHash>
1928 class unfolded_join_node<10,key_matching_port,OutputTuple,key_matching<K,KHash> > :
public 1929 join_base<10,key_matching_port,OutputTuple,key_matching<K,KHash> >
::type {
1955 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p, f9_p >
func_initializer_type;
1957 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1959 func_initializer_type(
1973 template<
typename Body0,
typename Body1,
typename Body2,
typename Body3,
typename Body4,
1974 typename Body5,
typename Body6,
typename Body7,
typename Body8,
typename Body9>
1975 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1976 Body5 body5, Body6 body6, Body7 body7, Body8 body8, Body9 body9) : base_type(g, func_initializer_type(
1990 unfolded_join_node(
const unfolded_join_node &other) : base_type(other) {}
1995 template<
size_t N,
typename JNT>
1997 return tbb::flow::get<N>(jn.input_ports());
2001 #endif // __TBB__flow_graph_join_impl_H
internal::aggregating_functor< class_type, join_node_base_operation > handler_type
type_to_key_func_type * get_my_key_func()
item_buffer< output_type > output_buffer_type
bool get_item(input_type &v)
join_node_FE(const join_node_FE &other)
graph & graph_reference() const __TBB_override
tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p > func_initializer_type
atomic< size_t > ports_with_no_inputs
join_node_FE(graph &g, FunctionTuple &TtoK_funcs)
static void reset_inputs(InputTuple &my_input, reset_flags f)
internal::aggregating_functor< class_type, reserving_port_operation > handler_type
bool tuple_build_may_succeed()
matching_forwarding_base(graph &g)
tbb::flow::tuple_element< 2, OutputTuple >::type T2
void reset_receiver(reset_flags f) __TBB_override
void set_join_node_pointer(forwarding_base *join)
internal::type_to_key_function_body< T3, K > * f3_p
internal::type_to_key_function_body< T2, K > * f2_p
aggregator< handler_type, key_matching_port_operation > my_aggregator
virtual task * decrement_port_count(bool handle_task)=0
static void reset_inputs(InputTuple &my_input, reset_flags f)
static void consume_reservations(TupleType &my_input)
tbb::flow::tuple< f0_p, f1_p > func_initializer_type
join_node_base(const join_node_base &other)
static bool get_my_item(InputTuple &my_input, OutputTuple &out)
void handle_operations(queueing_port_operation *op_list)
void increment_port_count() __TBB_override
task * try_put_task(const T &v) __TBB_override
static void release_reservations(TupleType &my_input)
join_node_base_operation(const output_type &e, op_type t)
task * increment_key_count(unref_key_type const &t, bool handle_task) __TBB_override
forwarding_base * my_join
join_node_base< key_matching< key_type, key_hash_compare >, InputTuple, OutputTuple > base_node_type
join_node_base< reserving, InputTuple, OutputTuple > base_node_type
join_node_base(graph &g, FunctionTuple f)
wrap_key_tuple_elements< 3, key_matching_port, key_matching< K, KHash >, OutputTuple >::type input_ports_type
key_matching_FE_operation(const unref_key_type &e, bool q_task, op_type t)
queueing_port< T > class_type
static void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs)
join_node_base_operation(const successor_type &s, op_type t)
internal::type_to_key_function_body< T1, K > * f1_p
internal::join_node_base< JP, typename wrap_tuple_elements< N, PT, OutputTuple >::type, OutputTuple > type
queueing_port(const queueing_port &)
copy constructor
static bool reserve(InputTuple &my_input, OutputTuple &out)
broadcast_cache< output_type, null_rw_mutex > my_successors
internal::type_to_key_function_body< T1, K > * f1_p
reserving_port_operation(const predecessor_type &s, op_type t)
wrap_tuple_elements< N, PT, OutputTuple >::type input_ports_type
void spawn_in_graph_arena(tbb::flow::interface10::graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
tbb::flow::tuple_element< N, typename JNT::input_ports_type >::type & input_port(JNT &jn)
templated function to refer to input ports of the join node
join_node_base< key_matching< K, KHash >, input_ports_type, output_type > base_type
queueing_port_operation(const T *p, op_type t)
join_node_base_operation(op_type t)
input_type & input_ports()
queueing_port_operation(const T &e, op_type t)
reserving_port< T > class_type
key_matching_FE_operation(op_type t)
wrap_key_tuple_elements< 5, key_matching_port, key_matching< K, KHash >, OutputTuple >::type input_ports_type
tbb::internal::strip< key_type >::type noref_key_type
join_node_base< JP, InputTuple, OutputTuple > class_type
input_type & input_ports()
tbb::flow::tuple_element< 1, OutputTuple >::type T1
bool tuple_build_may_succeed()
key_matching< K, KHash > key_traits_type
key_matching_FE_operation(output_type *p, op_type t)
void reset_receiver(reset_flags f) __TBB_override
static void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs)
tbb::flow::tuple_element< 1, OutputTuple >::type T1
task * try_put_task(const T &) __TBB_override
key_matching_port_operation(const input_type &e, op_type t)
key_matching_port_operation(op_type t)
key_matching_port_operation(const input_type *p, op_type t)
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
matching_forwarding_base< key_type > forwarding_base_type
tbb::flow::tuple_element< 2, OutputTuple >::type T2
internal::join_node_base< key_traits_type, typename wrap_key_tuple_elements< N, key_matching_port, key_traits_type, OutputTuple >::type, OutputTuple > type
hash_buffer< unref_key_type &, count_element_type, TtoK_function_body_type, key_hash_compare > key_to_count_buffer_type
unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2)
bool try_to_make_tuple(output_type &out)
aggregator< handler_type, queueing_port_operation > my_aggregator
void suppress_unused_warning(const T1 &)
Utility template function to prevent "unused" warnings by various compilers.
task * fill_output_buffer(unref_key_type &t, bool should_enqueue, bool handle_task)
task * try_put_task(const input_type &v) __TBB_override
Put item to successor; return task to run the successor if possible.
unfolded_join_node(const unfolded_join_node &other)
static void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs)
internal::aggregating_functor< class_type, key_matching_FE_operation > handler_type
void __TBB_store_with_release(volatile T &location, V value)
bool is_graph_active(tbb::flow::interface10::graph &g)
unfolded_join_node(const unfolded_join_node &other)
join_node_FE(const join_node_FE &other)
#define __TBB_STATIC_ASSERT(condition, msg)
tbb::flow::tuple_element< 3, OutputTuple >::type T3
void consume()
Complete use of the port.
join_node_FE< key_matching< key_type, key_hash_compare >, InputTuple, OutputTuple > class_type
current_key_type current_key
key_matching_port(const key_matching_port &)
key_matching_port< traits > class_type
hash_buffer< key_type, input_type, type_to_key_func_type, hash_compare_type > buffer_type
internal::aggregating_functor< class_type, key_matching_port_operation > handler_type
static void release_my_reservation(TupleType &my_input)
void release()
Release the port.
count_element< unref_key_type > count_element_type
forwarding_base(graph &g)
bool register_predecessor(predecessor_type &src) __TBB_override
Add a predecessor.
void handle_operations(join_node_base_operation *op_list)
tbb::flow::tuple_element< 3, OutputTuple >::type T3
static bool reserve(InputTuple &my_input, OutputTuple &out)
join_node_base< queueing, InputTuple, OutputTuple > base_node_type
void set_my_key_func(type_to_key_func_type *f)
tbb::internal::strip< key_type >::type unref_key_type
graph & graph_reference() const __TBB_override
join_node_base< key_matching< K, KHash >, input_ports_type, output_type > base_type
task * decrement_port_count(bool) __TBB_override
task * decrement_port_count(bool handle_task) __TBB_override
unfolded_join_node : passes input_ports_type to join_node_base. We build the input port type ...
sender< output_type >::successor_type successor_type
Base class for types that should not be assigned.
static void consume_reservations(TupleType &my_input)
void const char const char int ITT_FORMAT __itt_group_sync p
reserving_port_operation(const T &e, op_type t)
tbb::flow::tuple< f0_p, f1_p, f2_p > func_initializer_type
queueing_port()
Constructor.
reserving_port(const reserving_port &)
static bool get_items(InputTuple &my_input, OutputTuple &out)
void set_join_node_pointer(forwarding_base *join)
void increment_port_count() __TBB_override
A task that calls a node's forward_task function.
internal::type_to_key_function_body_leaf< count_element_type, unref_key_type &, key_to_count_func > TtoK_function_body_leaf_type
void handle_operations(key_matching_FE_operation *op_list)
tbb::internal::strip< KeyType >::type current_key_type
void handle_operations(key_matching_port_operation *op_list)
A cache of successors that are put in a round-robin fashion.
unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4)
static tbb::task *const SUCCESSFULLY_ENQUEUED
input_type & input_ports()
tbb::flow::tuple_element< 1, OutputTuple >::type T1
wrap_key_tuple_elements< 4, key_matching_port, key_matching< K, KHash >, OutputTuple >::type input_ports_type
void reset_node(reset_flags f) __TBB_override
void reset(reset_flags f)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task * task
count_element< K > table_item_type
reserving_port_operation(op_type t)
TraitsType::KHash hash_compare_type
unfolded_join_node(const unfolded_join_node &other)
internal::type_to_key_function_body< T4, K > * f4_p
internal::type_to_key_function_body< T3, K > * f3_p
internal::type_to_key_function_body< count_element_type, unref_key_type & > TtoK_function_body_type
wrap_key_tuple_elements< 2, key_matching_port, key_matching< K, KHash >, OutputTuple >::type input_ports_type
bool try_to_make_tuple(output_type &out)
aggregator< handler_type, reserving_port_operation > my_aggregator
receiver< input_type >::predecessor_type predecessor_type
join_node_FE : implements input port policy
static void release_my_reservation(TupleType &my_input)
join_node_base< key_matching< K, KHash >, input_ports_type, output_type > base_type
unfolded_join_node(const unfolded_join_node &other)
static void reset_my_port(InputTuple &my_input)
unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3)
static bool get_my_item(InputTuple &my_input, OutputTuple &out)
bool reserve(T &v)
Reserve an item from the port.
static void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs)
tbb::flow::tuple_element< 0, OutputTuple >::type T0
bool tuple_build_may_succeed()
void handle_operations(reserving_port_operation *op_list)
aggregator< handler_type, join_node_base_operation > my_aggregator
internal::type_to_key_function_body< T1, K > * f1_p
TraitsType::TtoK type_to_key_func_type
receiver< input_type >::predecessor_type predecessor_type
internal::type_to_key_function_body< T0, K > * f0_p
static void release_reservations(TupleType &my_input)
static bool get_items(InputTuple &my_input, OutputTuple &out)
internal::aggregating_functor< class_type, queueing_port_operation > handler_type
void const char const char int ITT_FORMAT __itt_group_sync s
atomic< size_t > ports_with_no_items
void initialize_handler(handler_type h)
aggregator< handler_type, key_matching_FE_operation > my_aggregator
unfolded_join_node(graph &g, Body0 body0, Body1 body1)
virtual ~forwarding_base()
queueing_port_operation(op_type t)
reserving_port()
Constructor.
predecessor_type * my_pred
join_node_FE< JP, InputTuple, OutputTuple > input_ports_type
untyped_sender predecessor_type
The predecessor type for this node.
reservable_predecessor_cache< T, null_mutex > my_predecessors
static void set_join_node_pointer(TupleType &my_input, PortType *port)
bool try_get(output_type &v) __TBB_override
Request an item from the sender.
void set_join_node_pointer(forwarding_base *join)
record parent for tallying available items
aggregated_operation base class
bool remove_predecessor(predecessor_type &src) __TBB_override
Remove a predecessor.
task * decrement_port_count(bool handle_task) __TBB_override
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
void set_my_node(base_node_type *new_my_node)
bool try_to_make_tuple(output_type &out)
unfolded_join_node(const unfolded_join_node &other)
void increment_port_count() __TBB_override
matching_forwarding_base< key_type > * my_join
internal::type_to_key_function_body< T1, K > * f1_p
tbb::flow::tuple_element< 1, OutputTuple >::type T1
forwarding_base * my_join
void execute(operation_type *op)
key_to_count_functor< unref_key_type > key_to_count_func
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
void reset_receiver(reset_flags f) __TBB_override
tbb::flow::tuple_element< 0, OutputTuple >::type T0
uintptr_t status
Zero value means "wait" status, all other values are "user" specified values and are defined into the...
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
internal::type_to_key_function_body< T0, K > * f0_p
virtual void increment_port_count()=0
static void reset_ports(InputTuple &my_input)
A cache of successors that are broadcast to.
internal::type_to_key_function_body< T2, K > * f2_p
internal::type_to_key_function_body< T0, K > * f0_p
unfolded_join_node(graph &g)
graph & graph_reference() const __TBB_override
static tbb::task * combine_tasks(graph &g, tbb::task *left, tbb::task *right)
join_node_base< key_matching< K, KHash >, input_ports_type, output_type > base_type
field of type K being used for matching.
const K & operator()(const table_item_type &v)
join_node_base< JP, input_ports_type, output_type > base_type
void reset(reset_flags f)
internal::type_to_key_function_body< T0, K > * f0_p
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
tbb::flow::tuple_element< 0, OutputTuple >::type T0
tbb::flow::tuple_element< 2, OutputTuple >::type T2
receiver< input_type >::predecessor_type predecessor_type
tbb::flow::tuple_element< 4, OutputTuple >::type T4
static void set_join_node_pointer(TupleType &my_input, PortType *port)
void set_my_node(base_node_type *new_my_node)
static void reset_my_port(InputTuple &my_input)
tbb::flow::tuple_element< 0, OutputTuple >::type T0
join_node_FE(const join_node_FE &other)
static void reset_ports(InputTuple &my_input)
internal::type_to_key_function_body< T2, K > * f2_p
tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p > func_initializer_type
void reset(reset_flags f)
K key_from_message(const T &t)
void set_my_node(base_node_type *new_my_node)