Kotlin雜談(四) - Coroutines(二): Channel
今天來說channel, channel在Kotlin就是非同步交接資料的管道, 在你需要非同步進行IO時, 你把工作的結果丟進channel, 然後在其他線程取得channel中的資料, 這種做法是很像消費者與生產者模式的, 而實際上在官方文檔與API實作中, channel也被用於實作消費者/生產者隊列
channel只有兩種形態: 有否緩衝, 而特點如下
- FIFO (即隊列Queue)
- 消費者與生產者配對 (在無緩衝channel, 消費/生產不匹配時, 多餘的一方會被掛起(suspend)直到匹配方出現)
- channel.close()具原子特性(atomic)
拿代碼說明一下channel吧, 拿回昨天yield的代碼加上官網的例子改一下
fun fibProducer() = produce(CommonPool) { //建立生產者n var terms = Pair(0L, 1L)n while (true) {n send(terms.first) //把資源送到channeln terms = Pair(terms.second, terms.first + terms.second)n }n}nnfun fibConsumer(id: Int, channel: ReceiveChannel<Long>) = launch(CommonPool) { //建立消費者n channel.consumeEach {n println("#$id consumed $it")n }n}nnfun main(args: Array<String>) = runBlocking<Unit> {n val producer = fibProducer()n repeat(5) { fibConsumer(it, producer) } //開5條消費者協程n delay(1000) //等待消費者與生產者匹配並傳送資源n producer.cancel() //將剩下資源消除並停止生產n}n
然後結果是
#0 consumed 0
#0 consumed 1#1 consumed 1#0 consumed 8 <-?????
#2 consumed 2#3 consumed 3#4 consumed 5#1 consumed 13#0 consumed 21
#2 consumed 34#3 consumed 55#4 consumed 89
好吧我想靜靜....因為按官網說明在多線程競爭下仍能確保FIFO的....
嗯,既然這組的行為有點奇怪,那就說管道(pipelines)吧(拖,畢竟是experimental嘛(キラ~
不同的生產者隊列能互相組合,搭建生產管道,上代碼說明fun fibProducer() = produce(CommonPool) { //建立生產者n var terms = Pair(0L, 1L)n while (true) {n send(terms.first) //把資源送到channeln terms = Pair(terms.second, terms.first + terms.second)n }n}nn//假設我們要找出數列中的素數...nfun fibPrimeProducer(fibQueue: ReceiveChannel<Long>) = produce(CommonPool) {n //假設函數isPrime()能判斷傳進去的正整數是否素數n fibQueue.consumeEach { if(isPrime(it)) send(it) }n}nnfun main(args: Array<String>) = runBlocking {n val fibQueue = fibProducer()n (1..10).forEach {n val fibPrime = fibPrimeProducer(fibQueue).receive()n println(fibPrime)n }n fibQueue.close() //不需要管道時別忘了要關閉...cancel也能關閉channeln}n
這樣組合起來我們就能找出費波那契數列中的素數了, 容許channel的組合以建立出管道能令程序員重用簡單的channel組合出各種管道應付需求
一般的channel預設是沒有緩衝的(buffer capacity=0), 這樣在實際應用時會有資源被漏掉的風險, 因為掛起後channel被關閉或者產生其他變化時, 掛起的請求是不知道的, 直到匹配後想從channel提取資源時就會boom - 拋出Exception
所以就用buffer防止? naive, buffer只是防止生產者/消費者被掛起, 因為即使協程多麽輕量, 掛起生產者/消費者還是需要付出資源的, 先以buffer把資源請求緩存(cache)起來就能減少消耗了
那怎麽建立有緩衝的channel呢?上源碼
public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {n public companion object Factory {n public const val UNLIMITED = Int.MAX_VALUEn public const val CONFLATED = -1nn /**n * Creates a channel with the specified buffer capacity (or without a buffer by default).n *n * The resulting channel type depends on the specified [capacity] parameter:n * * when `capacity` is 0 (default) -- creates [RendezvousChannel] without a buffer; -> 沒有緩衝n * * when `capacity` is [UNLIMITED] -- creates [LinkedListChannel] with buffer of unlimited size; -> 無限緩衝n * * when `capacity` is [CONFLATED] -- creates [ConflatedChannel] that conflates back-to-back sends; -> 只保留最新的資源n * * when `capacity` is positive, but less than [UNLIMITED] -- creates [ArrayChannel] with a buffer of the specified `capacity`; -> 有限緩衝n * * otherwise -- throws [IllegalArgumentException].n */n public operator fun <E> invoke(capacity: Int = 0): Channel<E> =n when (capacity) {n 0 -> RendezvousChannel()n UNLIMITED -> LinkedListChannel()n CONFLATED -> ConflatedChannel()n else -> ArrayChannel(capacity)n }n }n}nnfun main(args: Array<String>) {n val channel = Channel<String>(Channel.UNLIMITED) //就這麽簡單n channel.send("Fuck")n channel.receive()n}n
然後是該怎麽檢查channel是可用的?文檔提供了數個方法
- yield函數 (這個我還沒搞懂怎樣檢查...)
- isClosedForSend/ isClosedForReceive/ isActive屬性
- (僅限消費者使用) receiveOrNull函數
在寫這一篇文章時我發現一個bug...啊不,是feature
在Kotlin中有RAII存在, 使用法如下@Throws(IOException::class) fun foo() {n Socket().use { socket -> socket.getInputStream() } //use中的代碼執行完成後會自動關閉n}nn//來看一下use的源碼n@InlineOnlynpublic inline fun <T : Closeable?, R> T.use(block: (T) -> R): R {...}n//channel並沒有實作Closeable介面,無法用use完成RAIIn
嘛...反正還有try..finally,大不了自己寫一個實作了Closeable介面的channel不就行了(拖
channel就這麽多, 下回回頭寫coroutines的基本介紹
推薦閱讀: