用 C++ 寫線程池是怎樣一種體驗?

沒試過這麼牛逼的事,想試試!求經驗~求指導


有兩種線程池,不知道你問的是哪一種。

第一種是適合把多線程當作非同步使用的,比如Windows API或者C#里的那個。調用者直接扔一個functor過去就可以了,等到需要返回值的時候同步一下。

第二種是主線程需要多次spawn出很多子線程的情況。這經常需要詳細控制線程個數,並且主線程會等待子線程都完成之後才繼續。

話說,前面兩位正好答了這兩種,哈。

總的來說有了std::thread,不管你要哪一種,要構建一個work的thread pool,都不是件難事,甚至比mem pool還容易很多。但有些細節挺難纏,比如程序退出的時候何時殺線程(thread和condition是有析構順序的,亂殺會死鎖)。


std::vector& thread_pool;

就是這種體驗


線程池,最簡單的就是生產者消費者模型了。池裡的每條線程,都是消費者,他們消費並處理一個個的任務。生產者可以是demulitplier或者等在select/epoll/kqueue之類的系統api上的,然後他把收集到底任務發出去。

更複雜的,也不過就是,沒有單獨的生產者,相應的邏輯混合在消費者那邊等任務的代碼里。

找點例子代碼,照著改改,沒多難。

另外,Windows上的IOCP把等待、排隊、線程管理都包了,也挺方便的。


目前的Java程序員,對於C++寫線程池還是很有興趣的,Java的JDK本身就提供了很優良的線程池ThreadPoolExecutor以及ScheduledThreadPoolExecutor,用Java也可以實現很多符合自己需求的線程池

一般來說實現一個線程池主要包括以下4個組成部分:

1)線程管理器:用於創建並管理線程池。事實上線程管理器都有可能包括4)甚至2)3)

2)工作線程:線程池中實際執行任務的線程。在初始化線程時會預先創建好固定數目的線程在池中,這些初始化的線程一般處於空閑狀態,一般不佔用CPU,佔用較小的內存空間。

3)任務介面:每個任務必須實現的介面,當線程池的任務隊列中有可執行任務時,被空閑的工作線程調去執行(線程的閑與忙是通過互斥量實現的,跟前面文章中的設置標誌位差不多),把任務抽象出來形成介面,可以做到線程池與具體的任務無關。

4)任務隊列:用來存放沒有處理的任務,提供一種緩衝機制,實現這種結構有好幾種方法,常用的是隊列,主要運用先進先出原理,另外一種是鏈表之類的數據結構,可以動態的為它分配內存空間,應用中比較靈活。

以Java的ThreadPoolExecutor為例,
線程管理器--------public class ThreadPoolExecutor extends AbstractExecutorService
工作線程---------- private final class Worker implements Runnable

private final HashSet& workers = new HashSet&();

任務介面------------ public interface Runnable
(提交的Callable對象也被封裝成Runnable對象最後)
任務隊列------------ private final BlockingQueue& workQueue;

ThreadPoolExecutor這個線程管理器 就是包括了工作線程和任務隊列的,任務介面是java.lang.Runnable介面

後面補上對ThreadPoolExecutor中Work線程分析的部分,ps:公司用的是1.6 家裡裝的是1.7 發現

Doug Lea

對ThreadPoolExecutor進行了重寫,寫的面目全非。。。。。,先分析1.6的吧

http://www.cnblogs.com/elvinni/p/4162726.html

上面是1.7的源碼分析

一般線程池的寫法都可以分為上面的四個部分。

下面介紹一個基於

Linux/Unix下的POSIX規範的C++線程池

也不是我寫的,只是提供給大家鑒賞一下,多多學習吧

http://blog.csdn.net/zhoubl668/article/details/8927090

Thread.h

#ifndef __THREAD_H
#define __THREAD_H

#include &
#include &
#include &

using namespace std;

/**
* 執行任務的類,設置任務數據並執行
*/
class CTask
{
protected:
string m_strTaskName; /** 任務的名稱 */
void* m_ptrData; /** 要執行的任務的具體數據 */
public:
CTask(){}
CTask(string taskName)
{
m_strTaskName = taskName;
m_ptrData = NULL;
}
virtual int Run()= 0;
void SetData(void* data); /** 設置任務數據 */

public:
virtual ~CTask(){}
};

/**
* 線程池管理類的實現
*/
class CThreadPool
{
private:
static vector& m_vecTaskList; /** 任務列表 */
static bool shutdown; /** 線程退出標誌 */
int m_iThreadNum; /** 線程池中啟動的線程數 */
pthread_t *pthread_id; /** 線程文件描述符數組*/

static pthread_mutex_t m_pthreadMutex; /** 線程同步鎖 */
static pthread_cond_t m_pthreadCond; /** 線程同步的條件變數 */

protected:
static void* ThreadFunc(void * threadData); /** 新線程的線程回調函數 */
static int MoveToIdle(pthread_t tid); /** 線程執行結束後,把自己放入到空閑線程中 */
static int MoveToBusy(pthread_t tid); /** 移入到忙碌線程中去 */

int Create(); /** 創建線程池中的線程 */

public:
CThreadPool(int threadNum = 10);
int AddTask(CTask *task); /** 把任務添加到任務隊列中 */
int StopAll(); /** 使線程池中的線程退出 */
int getTaskSize(); /** 獲取當前任務隊列中的任務數 */
};

#endif

//Thread.cpp
#include "Thread.h"
#include &

void CTask::SetData(void * data)
{
m_ptrData = data;
}

vector& CThreadPool::m_vecTaskList; //任務列表
bool CThreadPool::shutdown = false;

pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;

/**
* 線程池管理類構造函數
*/
CThreadPool::CThreadPool(int threadNum)
{
this-&>m_iThreadNum = threadNum;
cout &<&< "I will create " &<&< threadNum &<&< " threads" &<&< endl; Create(); } /** * 線程回調函數 */ void* CThreadPool::ThreadFunc(void* threadData) { pthread_t tid = pthread_self(); while (1) { pthread_mutex_lock(m_pthreadMutex); while (m_vecTaskList.size() == 0 !shutdown) { pthread_cond_wait(m_pthreadCond, m_pthreadMutex); } if (shutdown) { pthread_mutex_unlock(m_pthreadMutex); printf("thread %lu will exit/n", pthread_self()); pthread_exit(NULL); } printf("tid %lu run/n", tid); vector&::iterator iter = m_vecTaskList.begin();

/**
* 取出一個任務並處理之
*/
CTask* task = *iter;
if (iter != m_vecTaskList.end())
{
task = *iter;
m_vecTaskList.erase(iter);
}

pthread_mutex_unlock(m_pthreadMutex);

task-&>Run(); /** 執行任務 */
printf("tid:%lu idle/n", tid);
}
return (void*)0;
}

/**
* 往任務隊列裡邊添加任務並發出線程同步信號
*/
int CThreadPool::AddTask(CTask *task)
{
pthread_mutex_lock(m_pthreadMutex);
this-&>m_vecTaskList.push_back(task);
pthread_mutex_unlock(m_pthreadMutex);
pthread_cond_signal(m_pthreadCond);
return 0;
}

/**
* 創建線程
*/
int CThreadPool::Create()
{
pthread_id = (pthread_t*)malloc(sizeof(pthread_t) * m_iThreadNum);
for(int i = 0; i &< m_iThreadNum; i++) { pthread_create(pthread_id[i], NULL, ThreadFunc, NULL); } return 0; } /** * 停止所有線程 */ int CThreadPool::StopAll() { /** 避免重複調用 */ if (shutdown) { return -1; } printf("Now I will end all threads!!/n"); /** 喚醒所有等待線程,線程池要銷毀了 */ shutdown = true; pthread_cond_broadcast(m_pthreadCond); /** 阻塞等待線程退出,否則就成殭屍了 */ for (int i = 0; i &< m_iThreadNum; i++) { pthread_join(pthread_id[i], NULL); } free(pthread_id); pthread_id = NULL; /** 銷毀條件變數和互斥體 */ pthread_mutex_destroy(m_pthreadMutex); pthread_cond_destroy(m_pthreadCond); return 0; } /** * 獲取當前隊列中任務數 */ int CThreadPool::getTaskSize() { return m_vecTaskList.size(); }

#include "Thread.h"
#include &

class CMyTask: public CTask
{
public:
CMyTask(){}

inline int Run()
{
printf("%s/n", (char*)this-&>m_ptrData);
sleep(10);
return 0;
}
};

int main()
{
CMyTask taskObj;

char szTmp[] = "this is the first thread running";
taskObj.SetData((void*)szTmp);
CThreadPool threadPool(10);

for(int i = 0; i &< 20; i++) { threadPool.AddTask(taskObj); } while(1) { printf("there are still %d tasks need to handle/n", threadPool.getTaskSize()); if (threadPool.getTaskSize() == 0) { if (threadPool.StopAll() == -1) { printf("Now I will exit from main/n"); exit(0); } } sleep(2); } return 0; }

/**
* 執行任務的類,設置任務數據並執行
*/
class CTask

CTask類就是3)任務介面 ,該類又run方法用於執行任務,實現此定義的類都是任務對象

/**
* 線程池管理類的實現
*/
class CThreadPool

CThreadPool就是1)線程池管理類 負責創建及管理線程

static vector& m_vecTaskList; /** 任務列表 */

CThreadPool對象的成員變數就是4)任務隊列,存在於線程池管理類中

關於2)任務執行線程 就是Linux的p_thread了,由線程池管理類的create方法創建,任務執行線程具體從任務列表當中拿出任務去執行的過程

1)pthread_mutex_lock(m_pthreadMutex); 鎖住互斥變數鎖

2) 檢查任務List大小是否為空 並且檢查線程池是否關閉,若線程池未關閉,且任務列表為空,則在條件變數處wait

while (m_vecTaskList.size() == 0 !shutdown)
{
pthread_cond_wait(m_pthreadCond, m_pthreadMutex);
}

3)處理關閉

if (shutdown)
{
pthread_mutex_unlock(m_pthreadMutex);
printf("thread %lu will exit/n", pthread_self());
pthread_exit(NULL);
}

4)取出一個任務並處理

printf("tid %lu run/n", tid);
vector&::iterator iter = m_vecTaskList.begin();

/**
* 取出一個任務並處理之
*/
CTask* task = *iter;
if (iter != m_vecTaskList.end())
{
task = *iter;
m_vecTaskList.erase(iter);
}

5)unlock互斥鎖

pthread_mutex_unlock(m_pthreadMutex);

6)注意:是取任務的過程加鎖,執行任務的過程不用加鎖

task-&>Run(); /** 執行任務 */
printf("tid:%lu idle/n", tid);


不知道題主是不是已經自己動手實現了一個,恰巧最近我實現了一個linux下C++三個版本(C++98、C++03/C++0x、C++11)的線程池使用對比,

移步 lizhenghn123/zl_threadpool · GitHub


線程池的實現,請參照

多線程編程―線程池的實現 - 自強不息 厚德載物 - 51CTO技術博客


這有啥體驗?寫就是了,關鍵要明白要幹啥 值不值得或能不能做 不是任何場景都能用的 有些場景 線程池就是不如傳統的直接開銷線程


這麼久過去了,不知道題主的線程池搞的咋樣了?如果需要的話,我這裡有一個linux/C上的線程池實現可以參考,不過有點簡陋就是了。另外,線程池的實現,網上應該是一搜一大堆。

非常精簡的Linux線程池實現(一)——使用互斥鎖和條件變數


說實話,沒什麼體驗,感覺就好像吃了頓盒飯一樣。普通的不能再普通了。


推薦閱讀:

說話只說一半是一種怎樣的體驗?
被驚艷到是一種什麼體驗?
在莉莉絲遊戲公司工作是一種什麼樣的體驗?
參加 Global Game Jam 2017(全球遊戲創作節) 是一種怎樣的體驗?
特别有气质是一种怎样的体验?

TAG:C | 多線程編程 | 線程池 | X是種怎樣的體驗 |