用 C++ 寫線程池是怎樣一種體驗?
沒試過這麼牛逼的事,想試試!求經驗~求指導
有兩種線程池,不知道你問的是哪一種。
第一種是適合把多線程當作非同步使用的,比如Windows API或者C#里的那個。調用者直接扔一個functor過去就可以了,等到需要返回值的時候同步一下。
第二種是主線程需要多次spawn出很多子線程的情況。這經常需要詳細控制線程個數,並且主線程會等待子線程都完成之後才繼續。
話說,前面兩位正好答了這兩種,哈。
總的來說有了std::thread,不管你要哪一種,要構建一個work的thread pool,都不是件難事,甚至比mem pool還容易很多。但有些細節挺難纏,比如程序退出的時候何時殺線程(thread和condition是有析構順序的,亂殺會死鎖)。
std::vector&
線程池,最簡單的就是生產者消費者模型了。池裡的每條線程,都是消費者,他們消費並處理一個個的任務。生產者可以是demulitplier或者等在select/epoll/kqueue之類的系統api上的,然後他把收集到底任務發出去。
更複雜的,也不過就是,沒有單獨的生產者,相應的邏輯混合在消費者那邊等任務的代碼里。找點例子代碼,照著改改,沒多難。另外,Windows上的IOCP把等待、排隊、線程管理都包了,也挺方便的。目前的Java程序員,對於C++寫線程池還是很有興趣的,Java的JDK本身就提供了很優良的線程池ThreadPoolExecutor以及ScheduledThreadPoolExecutor,用Java也可以實現很多符合自己需求的線程池
一般來說實現一個線程池主要包括以下4個組成部分:
1)線程管理器:用於創建並管理線程池。事實上線程管理器都有可能包括4)甚至2)3)
2)工作線程:線程池中實際執行任務的線程。在初始化線程時會預先創建好固定數目的線程在池中,這些初始化的線程一般處於空閑狀態,一般不佔用CPU,佔用較小的內存空間。
3)任務介面:每個任務必須實現的介面,當線程池的任務隊列中有可執行任務時,被空閑的工作線程調去執行(線程的閑與忙是通過互斥量實現的,跟前面文章中的設置標誌位差不多),把任務抽象出來形成介面,可以做到線程池與具體的任務無關。
4)任務隊列:用來存放沒有處理的任務,提供一種緩衝機制,實現這種結構有好幾種方法,常用的是隊列,主要運用先進先出原理,另外一種是鏈表之類的數據結構,可以動態的為它分配內存空間,應用中比較靈活。
以Java的ThreadPoolExecutor為例,
線程管理器--------public class ThreadPoolExecutor extends AbstractExecutorService
工作線程---------- private final class Worker implements Runnable
private final HashSet&
任務介面------------ public interface Runnable
(提交的Callable對象也被封裝成Runnable對象最後)
任務隊列------------ private final BlockingQueue&
ThreadPoolExecutor這個線程管理器 就是包括了工作線程和任務隊列的,任務介面是java.lang.Runnable介面
Doug Lea
對ThreadPoolExecutor進行了重寫,寫的面目全非。。。。。,先分析1.6的吧
http://www.cnblogs.com/elvinni/p/4162726.html
上面是1.7的源碼分析
Linux/Unix下的POSIX規範的C++線程池
也不是我寫的,只是提供給大家鑒賞一下,多多學習吧
http://blog.csdn.net/zhoubl668/article/details/8927090
Thread.h
#ifndef __THREAD_H
#define __THREAD_H
#include & using namespace std; /** public: /** static pthread_mutex_t m_pthreadMutex; /** 線程同步鎖 */ protected: int Create(); /** 創建線程池中的線程 */ public: #endif void CTask::SetData(void * data) vector& pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER; /** /** pthread_mutex_unlock(m_pthreadMutex); task-&>Run(); /** 執行任務 */ /** /** class CMyTask: public CTask inline int Run() int main() char szTmp[] = "this is the first thread running"; for(int i = 0; i &< 20; i++)
{
threadPool.AddTask(taskObj);
}
while(1)
{
printf("there are still %d tasks need to handle/n", threadPool.getTaskSize());
if (threadPool.getTaskSize() == 0)
{
if (threadPool.StopAll() == -1)
{
printf("Now I will exit from main/n");
exit(0);
}
}
sleep(2);
}
return 0;
}
CTask類就是3)任務介面 ,該類又run方法用於執行任務,實現此定義的類都是任務對象
#include &
#include &
* 執行任務的類,設置任務數據並執行
*/
class CTask
{
protected:
string m_strTaskName; /** 任務的名稱 */
void* m_ptrData; /** 要執行的任務的具體數據 */
public:
CTask(){}
CTask(string taskName)
{
m_strTaskName = taskName;
m_ptrData = NULL;
}
virtual int Run()= 0;
void SetData(void* data); /** 設置任務數據 */
virtual ~CTask(){}
};
* 線程池管理類的實現
*/
class CThreadPool
{
private:
static vector&
static bool shutdown; /** 線程退出標誌 */
int m_iThreadNum; /** 線程池中啟動的線程數 */
pthread_t *pthread_id; /** 線程文件描述符數組*/
static pthread_cond_t m_pthreadCond; /** 線程同步的條件變數 */
static void* ThreadFunc(void * threadData); /** 新線程的線程回調函數 */
static int MoveToIdle(pthread_t tid); /** 線程執行結束後,把自己放入到空閑線程中 */
static int MoveToBusy(pthread_t tid); /** 移入到忙碌線程中去 */
CThreadPool(int threadNum = 10);
int AddTask(CTask *task); /** 把任務添加到任務隊列中 */
int StopAll(); /** 使線程池中的線程退出 */
int getTaskSize(); /** 獲取當前任務隊列中的任務數 */
};
//Thread.cpp
#include "Thread.h"
#include &
{
m_ptrData = data;
}
bool CThreadPool::shutdown = false;
pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;
* 線程池管理類構造函數
*/
CThreadPool::CThreadPool(int threadNum)
{
this-&>m_iThreadNum = threadNum;
cout &<&< "I will create " &<&< threadNum &<&< " threads" &<&< endl;
Create();
}
/**
* 線程回調函數
*/
void* CThreadPool::ThreadFunc(void* threadData)
{
pthread_t tid = pthread_self();
while (1)
{
pthread_mutex_lock(m_pthreadMutex);
while (m_vecTaskList.size() == 0 !shutdown)
{
pthread_cond_wait(m_pthreadCond, m_pthreadMutex);
}
if (shutdown)
{
pthread_mutex_unlock(m_pthreadMutex);
printf("thread %lu will exit/n", pthread_self());
pthread_exit(NULL);
}
printf("tid %lu run/n", tid);
vector&
* 取出一個任務並處理之
*/
CTask* task = *iter;
if (iter != m_vecTaskList.end())
{
task = *iter;
m_vecTaskList.erase(iter);
}
printf("tid:%lu idle/n", tid);
}
return (void*)0;
}
* 往任務隊列裡邊添加任務並發出線程同步信號
*/
int CThreadPool::AddTask(CTask *task)
{
pthread_mutex_lock(m_pthreadMutex);
this-&>m_vecTaskList.push_back(task);
pthread_mutex_unlock(m_pthreadMutex);
pthread_cond_signal(m_pthreadCond);
return 0;
}
* 創建線程
*/
int CThreadPool::Create()
{
pthread_id = (pthread_t*)malloc(sizeof(pthread_t) * m_iThreadNum);
for(int i = 0; i &< m_iThreadNum; i++)
{
pthread_create(pthread_id[i], NULL, ThreadFunc, NULL);
}
return 0;
}
/**
* 停止所有線程
*/
int CThreadPool::StopAll()
{
/** 避免重複調用 */
if (shutdown)
{
return -1;
}
printf("Now I will end all threads!!/n");
/** 喚醒所有等待線程,線程池要銷毀了 */
shutdown = true;
pthread_cond_broadcast(m_pthreadCond);
/** 阻塞等待線程退出,否則就成殭屍了 */
for (int i = 0; i &< m_iThreadNum; i++)
{
pthread_join(pthread_id[i], NULL);
}
free(pthread_id);
pthread_id = NULL;
/** 銷毀條件變數和互斥體 */
pthread_mutex_destroy(m_pthreadMutex);
pthread_cond_destroy(m_pthreadCond);
return 0;
}
/**
* 獲取當前隊列中任務數
*/
int CThreadPool::getTaskSize()
{
return m_vecTaskList.size();
}
#include "Thread.h"
#include &
{
public:
CMyTask(){}
{
printf("%s/n", (char*)this-&>m_ptrData);
sleep(10);
return 0;
}
};
{
CMyTask taskObj;
taskObj.SetData((void*)szTmp);
CThreadPool threadPool(10); /**
* 執行任務的類,設置任務數據並執行
*/
class CTask
/**
* 線程池管理類的實現
*/
class CThreadPool
CThreadPool就是1)線程池管理類 負責創建及管理線程
static vector&
CThreadPool對象的成員變數就是4)任務隊列,存在於線程池管理類中
關於2)任務執行線程 就是Linux的p_thread了,由線程池管理類的create方法創建,任務執行線程具體從任務列表當中拿出任務去執行的過程1)pthread_mutex_lock(m_pthreadMutex); 鎖住互斥變數鎖2) 檢查任務List大小是否為空 並且檢查線程池是否關閉,若線程池未關閉,且任務列表為空,則在條件變數處waitwhile (m_vecTaskList.size() == 0 !shutdown)
{
pthread_cond_wait(m_pthreadCond, m_pthreadMutex);
}
3)處理關閉
if (shutdown)
{
pthread_mutex_unlock(m_pthreadMutex);
printf("thread %lu will exit/n", pthread_self());
pthread_exit(NULL);
}
4)取出一個任務並處理
printf("tid %lu run/n", tid);
vector&
/**
* 取出一個任務並處理之
*/
CTask* task = *iter;
if (iter != m_vecTaskList.end())
{
task = *iter;
m_vecTaskList.erase(iter);
}
pthread_mutex_unlock(m_pthreadMutex);
task-&>Run(); /** 執行任務 */
printf("tid:%lu idle/n", tid);
不知道題主是不是已經自己動手實現了一個,恰巧最近我實現了一個linux下C++三個版本(C++98、C++03/C++0x、C++11)的線程池使用對比, 移步 lizhenghn123/zl_threadpool · GitHub
線程池的實現,請參照
多線程編程―線程池的實現 - 自強不息 厚德載物 - 51CTO技術博客
這有啥體驗?寫就是了,關鍵要明白要幹啥 值不值得或能不能做 不是任何場景都能用的 有些場景 線程池就是不如傳統的直接開銷線程
這麼久過去了,不知道題主的線程池搞的咋樣了?如果需要的話,我這裡有一個linux/C上的線程池實現可以參考,不過有點簡陋就是了。另外,線程池的實現,網上應該是一搜一大堆。非常精簡的Linux線程池實現(一)——使用互斥鎖和條件變數
說實話,沒什麼體驗,感覺就好像吃了頓盒飯一樣。普通的不能再普通了。
推薦閱讀:
※說話只說一半是一種怎樣的體驗?
※被驚艷到是一種什麼體驗?
※在莉莉絲遊戲公司工作是一種什麼樣的體驗?
※參加 Global Game Jam 2017(全球遊戲創作節) 是一種怎樣的體驗?
※特别有气质是一种怎样的体验?