当前位置: 代码迷 >> 综合 >> Intel Threading Building Blocks :基本算法参考及使用
  详细解决方案

Intel Threading Building Blocks :基本算法参考及使用

热度:47   发布时间:2024-01-09 03:56:36.0

基本算法(algorithms

Intel TBB提供的大多数并行算法支持泛型。但是这些受支持的类型必须实现必要的概念方法。并行算法可以嵌套,

例如,一个parallel_for的内部可以调用另一个parallel_for。目前版本的TBB4.0)提供的基本算法如下所示:

parallel_for

parallel_reduce

parallel_scan

parallel_do

管道(pipelineparallel_pipeline)

parallel_sort

parallel_invoke

parallel_for

l 摘要

parallel_for是在一个值域执行并行迭代操作的模板函数。

l 语法

template<typenameIndex, typename Func>

Funcparallel_for( Index first, Index_type last, const Func& f

                  [, task_group_context&group] );

 

template<typenameIndex, typename Func>

Funcparallel_for( Index first, Index_type last, 

                  Index step, const Func&f

                  [, task_group_context&group] );

 

template<typenameRange, typename Body> 

voidparallel_for( const Range& range, const Body& body, 

                  [, partitioner[,task_group_context& group]] );

l 头文件

#include “tbb/parallel_for.h”

l 描述

parallel_for(first, last,step, f)表示一个循环的并行执行:

      for(auto i= first; i<last; i+=step) f(i);

注意以下几点:

1、索引类型必须是整形

2、循环不能回环

3、步长(step)必须为正,如果省略了,隐指为1

4、并没有保证迭代操作以并行方式进行

5、较小的迭代等待更大的迭代可能会发生死锁

6、分割策略总是auto_partitioner

 

parallel_for(range, body, partitioner)提供了并行迭代的泛型形式。它表示在区域的每个值,并行执行

bodypartitioner选项指定了分割策略。Range类型必须符合Range概念模型。body必须符合下表的要求:

 

原型

语义

Body::Body(const Body&)

拷贝构造

Body::~Body()

析构

void Body::operator()(Range& range) const

range对象应用body对象

 

采用最后一个模板以及stl中的vector容器改写1-1

例:1-2

#include <iostream>
#include <vector>
#include <tbb/tbb.h>
#include <tbb/blocked_range.h>
#include <tbb/parallel_for.h>using namespace std;
using namespace tbb;typedef vector<int>::iterator IntVecIt;struct body
{void operator()(const blocked_range<IntVecIt>&r)const{for(auto i = r.begin(); i!=r.end(); i++)cout<<*i<<' ';}
};int main()
{vector<int> vec;for(int i=0; i<10; i++)vec.push_back(i);parallel_for(blocked_range< IntVecIt>(vec.begin(), vec.end()), body());return 0;
}

parallel_reduce

l 摘要

parallel_reduce模板在一个区域迭代,将由各个任务计算得到的部分结果合并,得到最终结果。

parallel_reduce对区域(range)类型的要求与parallel_for一样。body类型需要分割构造函数以及一个

join方法。body的分割构造函数拷贝运行循环体需要的只读数据,并分配并归操作中初始化并归变量

的标志元素。join方法会组合并归操作中各任务的结果。 

l 语法

template<typenameRange, typename Value, 

        typename Func, typename Reduction>

Value parallel_reduce(const Range& range, const Value& identity,

                    const Func& func,const Reduction& reduction,

                    [, partitioner[,task_group_context& group]] );

 

template<typenameRange, typename Body> 

void parallel_reduce(const Range& range, const Body& body

                     [, partitioner[,task_group_context& group]] );

l 头文件

#include “tbb/parallel_reduce.h”

l 描述

parallel_reduce模板有两种形式。函数形式是为方便与lambda表达式一起使用而设计。

第二种形式是为了最小化数据拷贝。

      下面的表格总结了第一种形式中的identity,func,reduction的类型要求要求:

原型

摘要

Value Identity

Func::operator()的左标识元素

Value Func::operator()(const Range& range, const Value& x)

累计从初始值x开始的子区域的结果

Value Reduction::operator()(const Value& x, const Value& y);

合并xy的结果

      第二种形式parallel_reducerangebody)对range中的每个值执行body的并行并归。

Range类型必须符合Range类型要求。body必须符合下表的要求:

原型

摘要

Body::Body(Body&, split)

分割构造函数。必须能跟operator()join()并发运行。

Body::~Body()

析构函数

void Body::operator()(const Range& )

累计子区域的结果

void Body::join(Body& rhs)

将结果结合。rhs中的结果将合并到this中。

parallel_reduce使用分割构造函数来为每个线程生成一个或多个body的拷贝。当它拷贝

body的时候,也许bodyoperator()或者join()正在并发运行。要确保这种并发运行

下的安全。典型应用中,这种安全要求不会消耗你太多的精力。

l example

下面的例子将一个容器内的数值累加。

1-3

#include <iostream>
#include <tbb/parallel_reduce.h>
#include <tbb/blocked_range.h>
#include <vector> using namespace std;
using namespace tbb;int main()
{vector<int> vec;for(int i=0; i<100; i++)vec.push_back(i);int result = parallel_reduce(blocked_range<vector<int>::iterator>(vec.begin(), vec.end()),0,[](const blocked_range<vector<int>::iterator>& r, int init)->int{for(auto a = r.begin(); a!=r.end(); a++)init+=*a;return init;},[](int x, int y)->int{return x+y;});cout<<"result:"<<result<<endl;return 0;}

parallel_scan


l  摘要

并行计算前束(prefix)的函数模板。即输入一个数组,生成一个数组,其中每个元素的值

都是原数组中在此元素之前的元素的某个运算符的结果的累积。比如求和:

输入:[2, 8,  9, -4,  1, 3, -2,  7]

生成:[0, 2, 10, 19, 15, 16, 19, 17]

l 语法

template<typename Range, typename Body> 

void parallel_scan( const Range& range, Body& body );

 

template<typename Range, typename Body> 

void parallel_scan( const Range& range, Body& body, const

auto_partitioner& );

 

template<typename Range, typename Body> 

void parallel_scan( const Range& range, Body& body, const

simple_partitioner& );

l 头文件

#include “tbb/parallel_scan.h”

l 描述

数学里对于并行前束的定义如下:

?为左标识元素id?的关联运算符。在队列X0,X1,…Xn-1执行?并行前束得到队列Y0,Y1,Y2,…Yn-1:

y0= id? ?x0

yi=yi-1? xi

parallel_scan<Range,Body>以泛型形式实现并行前束。它的要求如下:

伪签名

语义

void Body::operator()(const Range& r, pre_scan tag)

累积归纳区域r

void Body::operator()(const Range& r, final_scan tag)

归纳区域r以及计算扫描结果

Body::Body(Body& b, split)

分割b以便thisb能被单独累积归纳。*this对象即本表下行的对象a

void Body::reverse_join(Body& a)

a的归纳结果合并到thisthis是先前从a的分割构造函数中创建的。*this对象即本表上一行中的对象b

void Body::assign(Body& b)

b的归纳结果赋给this

 

l example 

#include <tbb/parallel_scan.h>
#include <tbb/blocked_range.h>
#include <iostream>
using namespace tbb;
using namespace std; template<typename T>
class Body
{T _sum;T* const _y;const T* const _x;
public:Body(T y[], const T x[]):_sum(0), _x(x), _y(y){}T get_sum() const {return _sum;}template<typename Tag>void operator()(const blocked_range<int>& r, Tag){T temp = _sum;for(int i = r.begin(); i< r.end(); i++){temp+=_x[i];if(Tag::is_final_scan())_y[i] = temp;}_sum = temp;}Body(Body&b, split):_x(b._x), _y(b._y), _sum(0){}void reverse_join(Body& a){_sum+=a._sum;}void assign(Body& b){_sum = b._sum;}};int main()
{int x[10] = {0,1,2,3,4,5,6,7,8,9};int y[10];Body<int> body(y,x);parallel_scan(blocked_range<int>(0, 10), body);cout<<"sum:"<<body.get_sum()<<endl;return 0;
}

parallel_do


l 摘要

并行处理工作项的模板函数

l 语法

template<typename InputIterator, typename Body>

void parallel_do( InputIterator first, InputIteratorlast,

Body body[,task_group_context& group] );

l 头文件

#include "tbb/parallel_do.h"

l 描述

parallel_do(first, last,body)在对处于半开放区间[first, last)的元素应用函数对象body(不见得并行运行)。如果body重载的()函数的第二个参数(类型为parallel_do_feeder)不为空,那么可以增加另外的工作项。当对输入队列或者通过parallel_do_feeder::add方法添加的所有项x执行的bodyx)都返回后,函数结束。其中的parallel_do_feeder允许parallel_dobody添加额外的工作项,只有parallel_do才能创建或者销毁parallel_do_feeder对象。其他的代码对parallel_do_feeder唯一能做的事就是调用它的add方法。

对于Body类型的需求如下:

伪签名

语义

Body::operator()( cv-qualifiers T& item,

              Parallel_do_feeder<T>& feeder) const

或者

Body::operator()(cv-qualifiers T& item) const

处理item。模板parallel_do也许会为同一个this(不能是同一个item)并行调用operator()。如果有第二个参数,允许在函数中另行添加工作项。

T(const T&)

拷贝工作项

~T::T()

销毁工作项

 

      如果所有来自输入流的元素不能随机访问,那么parallel_do中的并行就不具备可扩展性。为达到可扩展性,可按

如下方式之一处理:

 

ü 使用随机迭代器指定输入流

ü 对诸如body经常增加不止一项任务的行为设计自己的算法

ü parallel_for来替换

 

为了提高速度,B::operator()的粒度至少要约10万个时钟周期。否则,parallel_do的内在开销就会影响有效工作。算法可以传递一个task_group_context对象,这样它的任务可以在此组内执行。默认情况下,算法在它自己的有界组中执行。

l example 

#include <tbb/parallel_do.h>
#include <iostream>
#include <vector>
using namespace std;
using namespace tbb; struct t_test
{string msg;int ref;void operator()()const{cout<<msg<<endl;}
};template <typename T>
struct body_test
{void operator()(T* t, parallel_do_feeder<T*>& feeder) const{(*t)();if(t->ref == 0){t->msg = "added msg";feeder.add(t);t->ref++;}}
};    int main()
{t_test *pt = new t_test;pt->ref = 0;pt->msg = "original msg";vector<t_test*> vec;vec.push_back(pt);parallel_do(vec.begin(), vec.end(), body_test<t_test>());delete pt;return 0;
}

pipleline


class pipeline

{

public:

pipeline();

~pipeline();

void add_filter( filter& f );

void run( size_t max_number_of_live_tokens

                    [,task_group_context& group] );

void clear();

};

 

可按以下步骤使用pipeline类:

1、filter继承类ff的构造函数传递给基类filter的构造函数一个参数,来指定它的模式

2、重载虚方法filter::operator()来实现过滤器对元素处理,并返回一个将被下一个过滤器处理的元素指针。如果流里没有其他的要处理的元素,返回空值。最后一个过滤器的返回值将被忽略。

3、生成pipeline类的实例

4、生成过滤器f的实例,并将它们按先后顺序加给pipeline。一个过滤器的实例一次只能加给一个pipeline。同一时间,一个过滤器禁止成为多个pipeline的成员。

5、调用pipeline::run方法。参数max_number_of_live_tokens指定了能并发运行的阶段数量上限。较高的值会以更多的内存消耗为代价来增加并发性。 

函数parallel_pipeline提供了一种强类型的面向lambda的方式来建立并运行管道。 


过滤器基类

filter 

class filter

{

public:

enum mode

{

parallel = implementation-defined,

serial_in_order = implementation-defined,

serial_out_of_order =implementation-defined

};

bool is_serial() const;

bool is_ordered() const;

virtual void* operator()( void* item ) = 0;

virtual void finalize( void* item ) {}

virtual ~filter();

protected:

filter( mode );

};

 

过滤器模式有三种模式:parallelserial_in_order,serial_out_of_order

? parallel过滤器能不按特定的顺序并行处理多个工作项

? serial_out_of_order过滤器不按特定的顺序每次处理一个工作项

? serial_in_order过滤器每次处理一个工作项。管道中的所有serial_in_order过滤器都按同样的顺序处理工作项。

由于parallel过滤器支持并行加速,所以推荐使用。如果必须使用serial过滤器,那么serial_out_of_order类型的过滤器是优先考虑的,因为他在处理顺序上的约束较少。

 

线程绑定过滤器

thread_bound_filter

classthread_bound_filter: public filter

{

protected:

thread_bound_filter(mode filter_mode);

public:

enum result_type

{

success,

item_not_available,

end_of_stream

};

result_type try_process_item();

result_type process_item();

}; 

管道中过滤器的抽象基类,线程必须显式为其提供服务。当一个过滤器必须由某个指定线程执行的时候会派上用场。服务于thread_bound_filter的线程不能是调用pipeline::run()的线程。 

example: 

#include<iostream>#include <tbb/pipeline.h>#include<tbb/compat/thread>#include<tbb/task_scheduler_init.h>using namespacestd;
using namespacetbb;
char input[] ="abcdefg\n";classinputfilter:public filter
{char *_ptr;
public:void *operator()(void *){if(*_ptr){cout<<"input:"<<*_ptr<<endl;return _ptr++;}else   return 0;}inputfilter():filter(serial_in_order),_ptr(input){}
};classoutputfilter: public thread_bound_filter
{
public:void *operator()(void *item){cout<<*(char*)item;return 0;}outputfilter():thread_bound_filter(serial_in_order){}
}; voidrun_pipeline(pipeline *p)
{p->run(8);
} int main()
{inputfilter inf;outputfilter ouf;pipeline p;p.add_filter(inf);p.add_filter(ouf);//由于主线程服务于继承自thread_bound_filter的outputfilter,所以pipeline要运行在另一个单独的线程thread t(run_pipeline, &p);while(ouf.process_item()!=thread_bound_filter::end_of_stream)continue;t.join();return 0;
}


 

 

  相关解决方案