標籤:

Kotlin雜談(四) - Coroutines(二): Channel

圖id: 63363216 (pixiv)

今天來說channel, channel在Kotlin就是非同步交接資料的管道, 在你需要非同步進行IO時, 你把工作的結果丟進channel, 然後在其他線程取得channel中的資料, 這種做法是很像消費者與生產者模式的, 而實際上在官方文檔與API實作中, channel也被用於實作消費者/生產者隊列

channel只有兩種形態: 有否緩衝, 而特點如下

  1. FIFO (即隊列Queue)
  2. 消費者與生產者配對 (在無緩衝channel, 消費/生產不匹配時, 多餘的一方會被掛起(suspend)直到匹配方出現)
  3. channel.close()具原子特性(atomic)

拿代碼說明一下channel吧, 拿回昨天yield的代碼加上官網的例子改一下

fun fibProducer() = produce(CommonPool) { //建立生產者n var terms = Pair(0L, 1L)n while (true) {n send(terms.first) //把資源送到channeln terms = Pair(terms.second, terms.first + terms.second)n }n}nnfun fibConsumer(id: Int, channel: ReceiveChannel<Long>) = launch(CommonPool) { //建立消費者n channel.consumeEach {n println("#$id consumed $it")n }n}nnfun main(args: Array<String>) = runBlocking<Unit> {n val producer = fibProducer()n repeat(5) { fibConsumer(it, producer) } //開5條消費者協程n delay(1000) //等待消費者與生產者匹配並傳送資源n producer.cancel() //將剩下資源消除並停止生產n}n

然後結果是

#0 consumed 0

#0 consumed 1

#1 consumed 1

#0 consumed 8 <-?????

#2 consumed 2

#3 consumed 3

#4 consumed 5

#1 consumed 13

#0 consumed 21

#2 consumed 34

#3 consumed 55

#4 consumed 89

好吧我想靜靜....因為按官網說明在多線程競爭下仍能確保FIFO的....

嗯,既然這組的行為有點奇怪,那就說管道(pipelines)吧(拖,畢竟是experimental嘛(キラ~

不同的生產者隊列能互相組合,搭建生產管道,上代碼說明

fun fibProducer() = produce(CommonPool) { //建立生產者n var terms = Pair(0L, 1L)n while (true) {n send(terms.first) //把資源送到channeln terms = Pair(terms.second, terms.first + terms.second)n }n}nn//假設我們要找出數列中的素數...nfun fibPrimeProducer(fibQueue: ReceiveChannel<Long>) = produce(CommonPool) {n //假設函數isPrime()能判斷傳進去的正整數是否素數n fibQueue.consumeEach { if(isPrime(it)) send(it) }n}nnfun main(args: Array<String>) = runBlocking {n val fibQueue = fibProducer()n (1..10).forEach {n val fibPrime = fibPrimeProducer(fibQueue).receive()n println(fibPrime)n }n fibQueue.close() //不需要管道時別忘了要關閉...cancel也能關閉channeln}n

這樣組合起來我們就能找出費波那契數列中的素數了, 容許channel的組合以建立出管道能令程序員重用簡單的channel組合出各種管道應付需求

一般的channel預設是沒有緩衝的(buffer capacity=0), 這樣在實際應用時會有資源被漏掉的風險, 因為掛起後channel被關閉或者產生其他變化時, 掛起的請求是不知道的, 直到匹配後想從channel提取資源時就會boom - 拋出Exception

所以就用buffer防止? naive, buffer只是防止生產者/消費者被掛起, 因為即使協程多麽輕量, 掛起生產者/消費者還是需要付出資源的, 先以buffer把資源請求緩存(cache)起來就能減少消耗了

那怎麽建立有緩衝的channel呢?上源碼

public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {n public companion object Factory {n public const val UNLIMITED = Int.MAX_VALUEn public const val CONFLATED = -1nn /**n * Creates a channel with the specified buffer capacity (or without a buffer by default).n *n * The resulting channel type depends on the specified [capacity] parameter:n * * when `capacity` is 0 (default) -- creates [RendezvousChannel] without a buffer; -> 沒有緩衝n * * when `capacity` is [UNLIMITED] -- creates [LinkedListChannel] with buffer of unlimited size; -> 無限緩衝n * * when `capacity` is [CONFLATED] -- creates [ConflatedChannel] that conflates back-to-back sends; -> 只保留最新的資源n * * when `capacity` is positive, but less than [UNLIMITED] -- creates [ArrayChannel] with a buffer of the specified `capacity`; -> 有限緩衝n * * otherwise -- throws [IllegalArgumentException].n */n public operator fun <E> invoke(capacity: Int = 0): Channel<E> =n when (capacity) {n 0 -> RendezvousChannel()n UNLIMITED -> LinkedListChannel()n CONFLATED -> ConflatedChannel()n else -> ArrayChannel(capacity)n }n }n}nnfun main(args: Array<String>) {n val channel = Channel<String>(Channel.UNLIMITED) //就這麽簡單n channel.send("Fuck")n channel.receive()n}n

然後是該怎麽檢查channel是可用的?文檔提供了數個方法

  1. yield函數 (這個我還沒搞懂怎樣檢查...)
  2. isClosedForSend/ isClosedForReceive/ isActive屬性
  3. (僅限消費者使用) receiveOrNull函數

在寫這一篇文章時我發現一個bug...啊不,是feature

在Kotlin中有RAII存在, 使用法如下

@Throws(IOException::class) fun foo() {n Socket().use { socket -> socket.getInputStream() } //use中的代碼執行完成後會自動關閉n}nn//來看一下use的源碼n@InlineOnlynpublic inline fun <T : Closeable?, R> T.use(block: (T) -> R): R {...}n//channel並沒有實作Closeable介面,無法用use完成RAIIn

嘛...反正還有try..finally,大不了自己寫一個實作了Closeable介面的channel不就行了(拖

channel就這麽多, 下回回頭寫coroutines的基本介紹


推薦閱讀:

TAG:Kotlin | 协程 |