多線程執行順序控制?
我又怎麼一個需求,需要開啟多線程計算,然後在線程內列印計算後的結果,線程內輸出可以不按照順序,但是線程間輸出希望按照一定順序,,實際場景類似100份試卷,開多個線程池計算,計算每份試卷每道題的得分,每道題的得分輸出可以不按照順序,但是每份試卷的輸出需要按照順序,也就是輸出試卷1再輸試卷2,,能給個思路嗎,謝謝
簡單的做法有兩種:
1. 處理所有試卷的全部試題,結果暫存起來,最後排序輸出。2. 先處理試卷1,等它全部處理完再處理試卷2。當然這兩種做法都是 suboptimal,第1種做法需要一定的額外存儲空間,對於第2種做法,會出現線程空閑,影響吞吐量。如果知道每份試卷有多少道題(平均值,中位數等),每道題佔多少內存,計算每道題的得分的耗時、是否涉及 IO,線程池大小,機器有多大內存、幾個核等具體條件,才好分析更好的做法。
用go可以像下面這樣寫。計算會充分利用所有線程,後面的試卷不用等待前面的試卷就可以開始計算,不影響吞吐。列印結果時才等待前面的,按順序輸出,而且是一到可以輸出就輸出,不用最後才排序。
如果用java實現,done應該可以用條件變數實現。package main
import (
"fmt"
"sync/atomic"
)
func main() {
papers := [][]int{
{1, 3, 5, 7, 9},
{2, 4, 6, 8, 10},
{1, 1, 2, 3, 5, 8},
{2, 3, 5, 7, 11, 13, 17},
}
var done []chan struct{}
for range papers {
done = append(done, make(chan struct{}))
}
// 用於計算的線程數
nThreads := 8
sem := make(chan struct{}, nThreads)
for iPaper, tests := range papers {
var nTests int64 = int64(len(tests))
var nChecked int64
iPaper := iPaper
for _, test := range tests {
test := test
sem &<- struct{}{}
go func() {
defer func() {
&<-sem
}()
// 計算,簡化成直接返回一個數字
points := test
// 輸出
go func() {
// 等待前面的試卷完成
for i := 0; i &< iPaper; i++ {
&<-done[i]
}
// 輸出
fmt.Printf("%d ", points)
// 最後一題,則標記完成該試卷
if n := atomic.AddInt64(nChecked, 1); n == nTests {
fmt.Printf("
")
close(done[iPaper])
}
}()
}()
}
}
// 等待最後一份完成
&<-done[len(done)-1]
}
&> ./a
1 9 3 5 7
10 8 4 6 2
1 1 3 2 5 8
5 3 2 13 11 7 17
&> ./a
3 5 1 9 7
10 8 6 4 2
1 1 8 5 2 3
3 2 17 13 11 7 5
&> ./a
9 3 5 1 7
6 10 2 8 4
8 1 2 3 1 5
2 11 3 7 17 13 5
&> ./a
1 3 5 7 9
2 4 6 8 10
1 1 2 3 5 8
17 2 3 5 7 11 13
&> ./a
3 1 9 7 5
2 4 8 6 10
5 2 3 8 1 1
17 13 11 2 7 3 5
&> ./a
7 3 5 1 9
8 10 6 2 4
8 1 1 2 3 5
7 17 13 5 2 3 11
&> ./a
9 7 1 3 5
8 6 4 10 2
1 1 2 3 5 8
17 13 7 5 2 3 11
符合題目要求
=== update ===另一個實現,動態創建線程可能java實現起來不經濟,下面是用固定線程的原理是每次計算完,都盡量輸出,不能輸出就存起來可以另起一個線程做檢查,計算線程把結果傳過去,這樣可以不用那個鎖package main
import (
"encoding/binary"
"fmt"
"math/rand"
"sync"
"time"
crand "crypto/rand"
)
func main() {
papers := [][]int{
{1, 3, 5, 7, 9},
{2, 4, 6, 8, 10},
{1, 1, 2, 3, 5, 8},
{2, 3, 5, 7, 11, 13, 17},
}
waitGroup := new(sync.WaitGroup)
numTests := make([]int64, len(papers))
for i, tests := range papers {
numTests[i] = int64(len(tests))
waitGroup.Add(len(tests))
}
type Job struct {
Paper int
Index int
Input int
}
jobs := make(chan Job)
// 越大,允許緩存的結果越多。為1時,順序計算所有題目
sem := make(chan struct{}, 1)
go func() {
for iPaper, tests := range papers {
for i, test := range tests {
sem &<- struct{}{}
jobs &<- Job{
Paper: iPaper,
Index: i,
Input: test,
}
}
}
}()
type Result int
currentPaper := 0
results := make([][]Result, len(papers))
lock := new(sync.Mutex)
nThread := 8
for i := 0; i &< nThread; i++ {
go func() {
for job := range jobs {
result := func() Result {
// 隨機sleep模擬計算抖動
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
return Result(job.Input)
}()
lock.Lock()
if job.Paper == currentPaper { // 可以輸出
fmt.Printf("%d ", result)
&<-sem
numTests[job.Paper]--
if numTests[job.Paper] == 0 { // 當前試卷已全部輸出
fmt.Printf("
")
stepPaper:
currentPaper++ // 下一張
if currentPaper &< len(papers) {
for _, result := range results[currentPaper] { // 輸出保存的結果
fmt.Printf("%d ", result)
&<-sem
}
numTests[currentPaper] -= int64(len(results[currentPaper]))
results[currentPaper] = make([]Result, 0)
if numTests[currentPaper] == 0 { // 如果該頁也全部輸出了,繼續下一頁
fmt.Printf("
")
goto stepPaper
}
}
}
} else { // 未能輸出,保存
results[job.Paper] = append(results[job.Paper], result)
}
lock.Unlock()
waitGroup.Done()
}
}()
}
waitGroup.Wait()
close(jobs)
}
func init() {
var seed int64
binary.Read(crand.Reader, binary.LittleEndian, seed)
rand.Seed(seed)
}
謝邀,這個是比較典型的滑動窗口應用
根據你你內存大小,搞一個長度為N的數組(數組每個元素記錄試卷指針、成績和狀態),數量為M的線程池(M&<=N),第一次取N個試卷,讓線程池從數組pop卷子,判卷完成後,到數組更新成績和狀態,當判完的卷子ID連續時就滑動窗口,滑出去多少卷子,就新push多少個未判的卷子進數組。這裡N越大,可能的並發性就越強,被block的可能性就越低。而可能的BadCase是:一個ID較小的卷子判卷時間很長,比這個卷子ID大的N-1張卷子都判完了,但是窗口沒法滑動的情況。CountdownLatch
提供一個能實現並行處理,串列輸出的實現方案:
線程A(主線程/啟動線程):
1. 將待處理的試卷封裝成對象(每個試卷一個對象),壓入BlockingQueue A2. 並啟動包含若干線程B的線程池線程B(試卷控制線程):1. 從queueA中獲取試卷對象,取出試卷對象中的所有試題(每個試題一個對象),壓入BlockingQueue B
2. 實例化一個線程安全的集合對象(如Vector或CopyOnWriteArrayList)D,用於保存試題判分的結果3. 實例化一個count=該試卷中試題數量n的CountDownLatch4. 啟動包含若干線程C的線程池5. countDownLatch.await()6. 輸出集合D中的所有判分結果,輸出結果的邏輯應在線程A上synchronized7. 繼續1-6循環線程C(判分線程):1. 從queue B中獲取試題對象,判分2. 將處理結果加入集合D3. countDownLatch.countDown()
4. 繼續1-3循環這樣,所有線程C都是並發在判分,可以隨時從每個線程B中hold的集合D中獲取到已經處理完的試題分數。輸出是串列的,只有當某個試卷中的所有試題都處理完成之後,才會一次性輸出這個試卷的所有試題分數。同時由於輸出分數的邏輯是在線程A上synchronized的,可以保證不同試卷的結果輸出不會混在一起。把100個試卷轉換成100個futuretask,然後放入隊列或者list裡面,,並讓其運行,然後循環從list里獲取到futuretask,並get到結果,如果第一個試卷還沒完成,會阻塞,如果完成了,則會馬上返回結果,這樣list循環完成,也就是1-100的順序結果。
結果存儲在一個treemap內,鍵試卷編號,值得分。最後遍歷列印即可
再增加一個線程MGR、專門用來管理並列印那100個線程的執行結果。那100個線程中的任何一個執行完畢時、將結果交給MGR、然後就可以結束了。MGR將所有結果按需要的順序輸出,並等到所有100個線程執行完畢、且列印了結果後才結束。MGR列印時、編號靠前的就列印、編號靠後的就先保存、等列印了前面的結果之後再列印。
僅針對該題目,不管執行順序控制,開多個線程計算總分,同時列印每道題的分數,總分數全局保存到數組中初始化為-1,總分數輸出,單開線程,順序輸出即可。
一個boss線程做任務下發和結果回收,多個worker線程(線程池)響應實際任務。控制和業務分離,高效率,低耦合。
多線程計算每個試卷,存結果,單開一個線程輸出結果,按順序輸出,沒有某個序號的結果就block住。
100份試卷用自旋鎖控制競爭讀取,算好後寫入100個元素的數組,直接訪問下標用原子操作寫數據即可,之後取數組輸出直到最後一個連續完成成員為止。我能不能打個廣告代寫作業,每次500?
推薦閱讀:
※求詳解該InterlockedIncrement的實現?
※word2vec多線程優化時不加鎖的做法合理么?
※ASIO + HTTP 如何打造高性能伺服器?
※為什麼`atomic::fetch_add()`可以 relaxed memory order?
※make 多線程編譯會出錯么?