標籤:

多線程執行順序控制?

我又怎麼一個需求,需要開啟多線程計算,然後在線程內列印計算後的結果,線程內輸出可以不按照順序,但是線程間輸出希望按照一定順序,,實際場景類似100份試卷,開多個線程池計算,計算每份試卷每道題的得分,每道題的得分輸出可以不按照順序,但是每份試卷的輸出需要按照順序,也就是輸出試卷1再輸試卷2,,能給個思路嗎,謝謝


簡單的做法有兩種:

1. 處理所有試卷的全部試題,結果暫存起來,最後排序輸出。

2. 先處理試卷1,等它全部處理完再處理試卷2。

當然這兩種做法都是 suboptimal,第1種做法需要一定的額外存儲空間,對於第2種做法,會出現線程空閑,影響吞吐量。

如果知道每份試卷有多少道題(平均值,中位數等),每道題佔多少內存,計算每道題的得分的耗時、是否涉及 IO,線程池大小,機器有多大內存、幾個核等具體條件,才好分析更好的做法。


用go可以像下面這樣寫。計算會充分利用所有線程,後面的試卷不用等待前面的試卷就可以開始計算,不影響吞吐。列印結果時才等待前面的,按順序輸出,而且是一到可以輸出就輸出,不用最後才排序。

如果用java實現,done應該可以用條件變數實現。

package main

import (
"fmt"
"sync/atomic"
)

func main() {
papers := [][]int{
{1, 3, 5, 7, 9},
{2, 4, 6, 8, 10},
{1, 1, 2, 3, 5, 8},
{2, 3, 5, 7, 11, 13, 17},
}

var done []chan struct{}
for range papers {
done = append(done, make(chan struct{}))
}

// 用於計算的線程數
nThreads := 8
sem := make(chan struct{}, nThreads)

for iPaper, tests := range papers {
var nTests int64 = int64(len(tests))
var nChecked int64
iPaper := iPaper
for _, test := range tests {
test := test
sem &<- struct{}{} go func() { defer func() { &<-sem }() // 計算,簡化成直接返回一個數字 points := test // 輸出 go func() { // 等待前面的試卷完成 for i := 0; i &< iPaper; i++ { &<-done[i] } // 輸出 fmt.Printf("%d ", points) // 最後一題,則標記完成該試卷 if n := atomic.AddInt64(nChecked, 1); n == nTests { fmt.Printf(" ") close(done[iPaper]) } }() }() } } // 等待最後一份完成 &<-done[len(done)-1] }

幾次輸出

&> ./a
1 9 3 5 7
10 8 4 6 2
1 1 3 2 5 8
5 3 2 13 11 7 17

&> ./a
3 5 1 9 7
10 8 6 4 2
1 1 8 5 2 3
3 2 17 13 11 7 5

&> ./a
9 3 5 1 7
6 10 2 8 4
8 1 2 3 1 5
2 11 3 7 17 13 5

&> ./a
1 3 5 7 9
2 4 6 8 10
1 1 2 3 5 8
17 2 3 5 7 11 13

&> ./a
3 1 9 7 5
2 4 8 6 10
5 2 3 8 1 1
17 13 11 2 7 3 5

&> ./a
7 3 5 1 9
8 10 6 2 4
8 1 1 2 3 5
7 17 13 5 2 3 11

&> ./a
9 7 1 3 5
8 6 4 10 2
1 1 2 3 5 8
17 13 7 5 2 3 11

符合題目要求

=== update ===

另一個實現,動態創建線程可能java實現起來不經濟,下面是用固定線程的

原理是每次計算完,都盡量輸出,不能輸出就存起來

可以另起一個線程做檢查,計算線程把結果傳過去,這樣可以不用那個鎖

package main

import (
"encoding/binary"
"fmt"
"math/rand"
"sync"
"time"

crand "crypto/rand"
)

func main() {
papers := [][]int{
{1, 3, 5, 7, 9},
{2, 4, 6, 8, 10},
{1, 1, 2, 3, 5, 8},
{2, 3, 5, 7, 11, 13, 17},
}

waitGroup := new(sync.WaitGroup)
numTests := make([]int64, len(papers))
for i, tests := range papers {
numTests[i] = int64(len(tests))
waitGroup.Add(len(tests))
}

type Job struct {
Paper int
Index int
Input int
}
jobs := make(chan Job)

// 越大,允許緩存的結果越多。為1時,順序計算所有題目
sem := make(chan struct{}, 1)

go func() {
for iPaper, tests := range papers {
for i, test := range tests {
sem &<- struct{}{} jobs &<- Job{ Paper: iPaper, Index: i, Input: test, } } } }() type Result int currentPaper := 0 results := make([][]Result, len(papers)) lock := new(sync.Mutex) nThread := 8 for i := 0; i &< nThread; i++ { go func() { for job := range jobs { result := func() Result { // 隨機sleep模擬計算抖動 time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000))) return Result(job.Input) }() lock.Lock() if job.Paper == currentPaper { // 可以輸出 fmt.Printf("%d ", result) &<-sem numTests[job.Paper]-- if numTests[job.Paper] == 0 { // 當前試卷已全部輸出 fmt.Printf(" ") stepPaper: currentPaper++ // 下一張 if currentPaper &< len(papers) { for _, result := range results[currentPaper] { // 輸出保存的結果 fmt.Printf("%d ", result) &<-sem } numTests[currentPaper] -= int64(len(results[currentPaper])) results[currentPaper] = make([]Result, 0) if numTests[currentPaper] == 0 { // 如果該頁也全部輸出了,繼續下一頁 fmt.Printf(" ") goto stepPaper } } } } else { // 未能輸出,保存 results[job.Paper] = append(results[job.Paper], result) } lock.Unlock() waitGroup.Done() } }() } waitGroup.Wait() close(jobs) } func init() { var seed int64 binary.Read(crand.Reader, binary.LittleEndian, seed) rand.Seed(seed) }


謝邀,這個是比較典型的滑動窗口應用

根據你你內存大小,搞一個長度為N的數組(數組每個元素記錄試卷指針、成績和狀態),數量為M的線程池(M&<=N),第一次取N個試卷,讓線程池從數組pop卷子,判卷完成後,到數組更新成績和狀態,當判完的卷子ID連續時就滑動窗口,滑出去多少卷子,就新push多少個未判的卷子進數組。

這裡N越大,可能的並發性就越強,被block的可能性就越低。而可能的BadCase是:一個ID較小的卷子判卷時間很長,比這個卷子ID大的N-1張卷子都判完了,但是窗口沒法滑動的情況。


CountdownLatch


提供一個能實現並行處理,串列輸出的實現方案:

線程A(主線程/啟動線程):

1. 將待處理的試卷封裝成對象(每個試卷一個對象),壓入BlockingQueue A

2. 並啟動包含若干線程B的線程池

線程B(試卷控制線程):

1. 從queueA中獲取試卷對象,取出試卷對象中的所有試題(每個試題一個對象),壓入BlockingQueue B

2. 實例化一個線程安全的集合對象(如Vector或CopyOnWriteArrayList)D,用於保存試題判分的結果

3. 實例化一個count=該試卷中試題數量n的CountDownLatch

4. 啟動包含若干線程C的線程池

5. countDownLatch.await()

6. 輸出集合D中的所有判分結果,輸出結果的邏輯應在線程A上synchronized

7. 繼續1-6循環

線程C(判分線程):

1. 從queue B中獲取試題對象,判分

2. 將處理結果加入集合D

3. countDownLatch.countDown()

4. 繼續1-3循環

這樣,所有線程C都是並發在判分,可以隨時從每個線程B中hold的集合D中獲取到已經處理完的試題分數。輸出是串列的,只有當某個試卷中的所有試題都處理完成之後,才會一次性輸出這個試卷的所有試題分數。同時由於輸出分數的邏輯是在線程A上synchronized的,可以保證不同試卷的結果輸出不會混在一起。


把100個試卷轉換成100個futuretask,然後放入隊列或者list裡面,,並讓其運行,然後循環從list里獲取到futuretask,並get到結果,如果第一個試卷還沒完成,會阻塞,如果完成了,則會馬上返回結果,這樣list循環完成,也就是1-100的順序結果。


結果存儲在一個treemap內,鍵試卷編號,值得分。最後遍歷列印即可


再增加一個線程MGR、專門用來管理並列印那100個線程的執行結果。

那100個線程中的任何一個執行完畢時、將結果交給MGR、然後就可以結束了。

MGR將所有結果按需要的順序輸出,並等到所有100個線程執行完畢、且列印了結果後才結束。

MGR列印時、編號靠前的就列印、編號靠後的就先保存、等列印了前面的結果之後再列印。


僅針對該題目,不管執行順序控制,開多個線程計算總分,同時列印每道題的分數,總分數全局保存到數組中初始化為-1,總分數輸出,單開線程,順序輸出即可。


一個boss線程做任務下發和結果回收,多個worker線程(線程池)響應實際任務。

控制和業務分離,高效率,低耦合。


多線程計算每個試卷,存結果,單開一個線程輸出結果,按順序輸出,沒有某個序號的結果就block住。


100份試卷用自旋鎖控制競爭讀取,算好後寫入100個元素的數組,直接訪問下標用原子操作寫數據即可,之後取數組輸出直到最後一個連續完成成員為止。

我能不能打個廣告代寫作業,每次500?


推薦閱讀:

求詳解該InterlockedIncrement的實現?
word2vec多線程優化時不加鎖的做法合理么?
ASIO + HTTP 如何打造高性能伺服器?
為什麼`atomic::fetch_add()`可以 relaxed memory order?
make 多線程編譯會出錯么?

TAG:Java | 多線程 |