42 my_token_ready =
false;
53 friend class tbb::pipeline;
76 void grow( size_type minimum_size );
80 static const size_type initial_buffer_size = 4;
104 array(NULL), my_sem(NULL), array_size(0),
105 low_token(0), high_token(0),
106 is_ordered(is_ordered_), is_bound(is_bound_),
107 end_of_input_tls_allocated(false) {
108 grow(initial_buffer_size);
110 if(is_bound) create_sema(0);
121 if(end_of_input_tls_allocated) {
140 bool was_empty = !array[low_token&(array_size-1)].
is_valid;
148 token = high_token++;
150 if( token!=low_token || is_bound || force_put ) {
153 if( token-low_token>=array_size )
154 grow( token-low_token+1 );
156 array[token&(array_size-1)] = info_;
157 if(was_empty && is_bound) {
173 template<
typename StageTask>
179 if( !is_ordered || token==low_token ) {
181 task_info& item = array[++low_token & (array_size-1)];
188 spawner.spawn_stage_task(wakee);
191 #if __TBB_TASK_GROUP_CONTEXT 192 void clear(
filter* my_filter ) {
195 for( size_type i=0; i<array_size; ++i, ++t ){
196 task_info& temp = array[t&(array_size-1)];
210 task_info& item = array[low_token&(array_size-1)];
215 if (advance) low_token++;
234 while( new_size<minimum_size )
241 for(
size_type i=0; i<old_size; ++i, ++t )
242 new_array[t&(new_size-1)] = old_array[t&(old_size-1)];
251 friend class tbb::pipeline;
261 my_pipeline(pipeline),
262 my_filter(pipeline.filter_list),
270 my_pipeline(pipeline),
277 my_filter = my_pipeline.filter_list;
282 #if __TBB_TASK_GROUP_CONTEXT 286 __TBB_ASSERT(is_cancelled(),
"Trying to finalize the task that wasn't cancelled");
291 #endif // __TBB_TASK_GROUP_CONTEXT 292 void spawn_stage_task(
const task_info& info)
305 if( my_filter->is_serial() ) {
307 if(
my_object || ( my_filter->object_may_be_null() && !my_pipeline.end_of_input) )
309 if( my_filter->is_ordered() ) {
310 my_token = my_pipeline.token_counter++;
313 if( my_pipeline.has_thread_bound_filters )
314 my_pipeline.token_counter++;
316 if( !my_filter->next_filter_in_pipeline ) {
318 goto process_another_stage;
321 if( --my_pipeline.input_tokens>0 )
322 spawn( *
new( allocate_additional_child_of(*
parent()) )
stage_task( my_pipeline ) );
325 my_pipeline.end_of_input =
true;
329 if( my_pipeline.end_of_input )
332 if( my_pipeline.has_thread_bound_filters )
333 my_pipeline.token_counter++;
336 if( --my_pipeline.input_tokens>0 )
337 spawn( *
new( allocate_additional_child_of(*
parent()) )
stage_task( my_pipeline ) );
339 if( !
my_object && (!my_filter->object_may_be_null() || my_filter->my_input_buffer->my_tls_end_of_input()) )
341 my_pipeline.end_of_input =
true;
343 if( my_pipeline.has_thread_bound_filters )
344 my_pipeline.token_counter--;
352 if( my_filter->is_serial() )
353 my_filter->my_input_buffer->note_done(
my_token, *
this);
355 my_filter = my_filter->next_filter_in_pipeline;
358 if( my_filter->is_serial() ) {
360 if( my_filter->my_input_buffer->put_token(*
this) ){
362 if( my_filter->is_bound() ) {
365 my_filter = my_filter->next_filter_in_pipeline;
366 }
while( my_filter && my_filter->is_bound() );
368 if( my_filter && my_filter->my_input_buffer->return_item(*
this, !my_filter->is_serial()))
369 goto process_another_stage;
377 size_t ntokens_avail = ++my_pipeline.input_tokens;
378 if(my_pipeline.filter_list->is_bound() ) {
379 if(ntokens_avail == 1) {
380 my_pipeline.filter_list->my_input_buffer->sema_V();
385 || my_pipeline.end_of_input ) {
388 ITT_NOTIFY( sync_acquired, &my_pipeline.input_tokens );
392 process_another_stage:
396 recycle_as_continuation();
405 if( !my_pipeline.end_of_input )
406 if( !my_pipeline.filter_list->is_bound() )
407 if( my_pipeline.input_tokens > 0 ) {
408 recycle_as_continuation();
410 return new( allocate_child() )
stage_task( my_pipeline );
412 if( do_segment_scanning ) {
416 filter* first_suitable_filter = current_filter;
417 while( current_filter ) {
420 if( !my_pipeline.end_of_input || current_filter->
has_more_work())
430 task* t =
new( allocate_child() )
stage_task( my_pipeline, current_filter, info );
436 __TBB_ASSERT( refcnt <=
int(my_pipeline.token_counter),
"token counting error" );
440 set_ref_count( refcnt );
443 recycle_as_continuation();
447 if( !current_filter ) {
448 if( !my_pipeline.end_of_input ) {
449 recycle_as_continuation();
452 current_filter = first_suitable_filter;
458 first_suitable_filter = first_suitable_filter->
next_segment;
459 current_filter = first_suitable_filter;
464 if( !my_pipeline.end_of_input ) {
465 recycle_as_continuation();
483 if( subfilter->prev_filter_in_pipeline->is_bound() && !subfilter->is_bound() ) {
484 do_segment_scanning =
true;
486 head_of_previous_segment = subfilter;
493 #if _MSC_VER && !defined(__INTEL_COMPILER) 496 #pragma warning (disable: 4127) 504 my_pipeline(_pipeline)
507 #if __TBB_TASK_GROUP_CONTEXT 508 if (my_pipeline.end_counter->is_cancelled())
509 my_pipeline.clear_filters();
511 my_pipeline.end_counter = NULL;
517 void pipeline::inject_token(
task& ) {
521 #if __TBB_TASK_GROUP_CONTEXT 522 void pipeline::clear_filters() {
531 pipeline::pipeline() :
536 has_thread_bound_filters(
false)
542 pipeline::~pipeline() {
546 void pipeline::clear() {
548 for(
filter* f = filter_list; f; f=next ) {
557 f->my_pipeline = NULL;
560 f->next_segment = NULL;
562 filter_list = filter_end = NULL;
565 void pipeline::add_filter(
filter& filter_ ) {
570 __TBB_ASSERT( !end_counter,
"invocation of add_filter on running pipeline" );
575 if ( filter_list == NULL)
576 filter_list = &filter_;
580 filter_end = &filter_;
583 filter_end =
reinterpret_cast<filter*
>(&filter_list);
585 *
reinterpret_cast<filter**
>(filter_end) = &filter_;
587 *
reinterpret_cast<filter**
>(filter_end) = NULL;
592 has_thread_bound_filters =
true;
616 void pipeline::remove_filter(
filter& filter_ ) {
619 __TBB_ASSERT( !end_counter,
"invocation of remove_filter on running pipeline" );
620 if (&filter_ == filter_list)
626 if (&filter_ == filter_end)
642 void pipeline::run(
size_t max_number_of_live_tokens
647 __TBB_ASSERT( max_number_of_live_tokens>0,
"pipeline::run must have at least one token" );
648 __TBB_ASSERT( !end_counter,
"pipeline already running?" );
651 end_of_input =
false;
653 if(has_thread_bound_filters) {
655 if(filter_list->is_bound()) {
656 filter_list->my_input_buffer->sema_V();
659 #if __TBB_TASK_GROUP_CONTEXT 667 if(has_thread_bound_filters) {
670 f->my_input_buffer->sema_V();
677 #if __TBB_TASK_GROUP_CONTEXT 678 void pipeline::run(
size_t max_number_of_live_tokens ) {
686 run(max_number_of_live_tokens, context);
689 #endif // __TBB_TASK_GROUP_CONTEXT 693 __TBB_ASSERT(my_input_buffer,
"has_more_work() called for filter with no input buffer");
700 my_pipeline->remove_filter(*
this);
712 my_pipeline->end_of_input =
true;
714 __TBB_ASSERT(my_input_buffer->end_of_input_tls_allocated, NULL);
715 my_input_buffer->set_my_tls_end_of_input();
720 return internal_process_item(
true);
724 return internal_process_item(
false);
728 __TBB_ASSERT(my_pipeline != NULL,
"It's not supposed that process_item is called for a filter that is not in a pipeline.");
732 if( my_pipeline->end_of_input && !has_more_work() )
733 return end_of_stream;
735 if( !prev_filter_in_pipeline ) {
736 if( my_pipeline->end_of_input )
737 return end_of_stream;
738 while( my_pipeline->input_tokens == 0 ) {
740 return item_not_available;
741 my_input_buffer->sema_P();
745 __TBB_ASSERT(my_pipeline->input_tokens > 0,
"Token failed in thread-bound filter");
746 my_pipeline->input_tokens--;
748 info.
my_token = my_pipeline->token_counter;
751 my_pipeline->token_counter++;
753 my_pipeline->end_of_input =
true;
754 return end_of_stream;
757 while( !my_input_buffer->has_item() ) {
759 return item_not_available;
761 my_input_buffer->sema_P();
762 if( my_pipeline->end_of_input && !has_more_work() ) {
763 return end_of_stream;
766 if( !my_input_buffer->return_item(info,
true) ) {
771 if( next_filter_in_pipeline ) {
772 if ( !next_filter_in_pipeline->my_input_buffer->put_token(info,
true) ) {
773 __TBB_ASSERT(
false,
"Couldn't put token after thread-bound buffer");
776 size_t ntokens_avail = ++(my_pipeline->input_tokens);
777 if( my_pipeline->filter_list->is_bound() ) {
778 if( ntokens_avail == 1 ) {
779 my_pipeline->filter_list->my_input_buffer->sema_V();
virtual __TBB_EXPORTED_METHOD ~filter()
Destroy filter.
task * execute() __TBB_override
The virtual task execution method.
bool end_of_input_tls_allocated
semaphore * my_sem
for thread-bound filter, semaphore for waiting, NULL otherwise.
task_info * array
Array of deferred tasks that cannot yet start executing.
bool my_token_ready
False until my_token is set.
size_type array_size
Size of array.
#define __TBB_TASK_GROUP_CONTEXT
Token high_token
Used for out of order buffer, and for assigning my_token if is_ordered and my_token not already assig...
void __TBB_EXPORTED_METHOD set_end_of_input()
bool return_item(task_info &info, bool advance)
return an item, invalidate the queued item, but only advance if the filter
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t new_size
void note_done(Token token, StageTask &spawner)
Note that processing of a token is finished.
filter * next_segment
Pointer to the next "segment" of filters, or NULL if not required.
void set_my_tls_end_of_input()
static filter * not_in_pipeline()
Value used to mark "not in pipeline".
pipeline_cleaner(pipeline &_pipeline)
bool is_ordered() const
True if filter must receive stream in order.
#define __TBB_PIPELINE_VERSION(x)
static internal::allocate_root_proxy allocate_root()
Returns proxy for overloaded new that allocates a root task.
Represents acquisition of a mutex.
void __TBB_EXPORTED_FUNC handle_perror(int error_code, const char *aux_info)
Throws std::runtime_error with what() returning error_code description prefixed with aux_info...
Base class for types that should not be copied or assigned.
bool has_more_work()
has the filter not yet processed all the tokens it will ever see?
bool is_valid
True if my_object is valid.
filter * prev_filter_in_pipeline
Pointer to previous filter in the pipeline.
virtual void finalize(void *)
Destroys item if pipeline was cancelled.
stage_task(pipeline &pipeline)
Construct stage_task for first stage in a pipeline.
bool is_serial() const
True if filter is serial.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id parent
pipeline_root_task(pipeline &pipeline)
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
internal::input_buffer * my_input_buffer
Buffer for incoming tokens, or NULL if not required.
A stage in a pipeline served by a user thread.
bool has_item()
true if the current low_token is valid.
filter * next_filter_in_pipeline
Pointer to next filter in the pipeline.
Base class for user-defined tasks.
bool object_may_be_null()
true if an input filter can emit null
~input_buffer()
Destroy the buffer.
void grow(size_type minimum_size)
Resize "array".
A lock that occupies a single byte.
stage_task(pipeline &pipeline, filter *filter_, const task_info &info)
Construct stage_task for a subsequent stage in a pipeline.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p sync_releasing
Edsger Dijkstra's counting semaphore.
task * execute() __TBB_override
Should be overridden by derived classes.
bool is_bound
True for thread-bound filter, false otherwise.
spin_mutex array_mutex
Serializes updates.
A buffer of input items for a filter.
bool is_bound() const
True if filter is thread-bound.
Token low_token
Lowest token that can start executing.
Token my_token
Invalid unless a task went through an ordered stage.
bool put_token(task_info &info_, bool force_put=false)
Put a token into the buffer.
void create_sema(size_t initial_tokens)
static void spawn_root_and_wait(task &root)
Spawn task allocated by allocate_root, wait for it to complete, and deallocate it.
auto first(Container &c) -> decltype(begin(c))
result_type internal_process_item(bool is_blocking)
Internal routine for item processing.
pipeline * my_pipeline
Pointer to the pipeline.
const unsigned char my_filter_mode
Storage for filter mode and dynamically checked implementation version.
void push_back(task &task)
Push task onto back of list.
This structure is used to store task information in a input buffer.
static const unsigned char exact_exception_propagation
7th bit defines exception propagation mode expected by the application.
end_of_input_tls_t end_of_input_tls
result_type __TBB_EXPORTED_METHOD process_item()
Wait until a data item becomes available, and invoke operator() on that item.
Used to form groups of tasks.
bool is_ordered
True for ordered filter, false otherwise.
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
result_type __TBB_EXPORTED_METHOD try_process_item()
If a data item is available, invoke operator() on that item.
#define ITT_NOTIFY(name, obj)
void poison_pointer(T *__TBB_atomic &)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
void reset()
Set to initial state (no object, no token)
basic_tls< intptr_t > end_of_input_tls_t
for parallel filters that accepts NULLs, thread-local flag for reaching end_of_input ...
bool my_at_start
True if this task has not yet read the input.
bool my_tls_end_of_input()
static const unsigned char version_mask
input_buffer(bool is_ordered_, bool is_bound_)
Construct empty buffer.
void reset()
Roughly equivalent to the constructor of input stage task.