怎麼處理 Spark structured streaming 慢速變化數據 join 的問題?
請教一個問題,代碼如下。邏輯很簡單,從 MySQL 的一個表裡面提取 metadata 然後和 structured streaming 的實時數據做 join,得到想要的結果。這個程序本身運行良好。
問題是這個mysql的表的數據有時候會增加或者變更,有沒有辦法讓 structured streaming 隔一段時間(比如一天)去自動重新讀一次 MySQL?
Spark 版本為 2.2.1。
// Read metadata from MySQL, the metadata is to be joined with Kafka real time message
// 從 MySQL 讀的數據會緩慢變化,比如隔了幾天新增一些數據
val df_meta = spark.read
.format("jdbc")
.option("url", mysql_url)
.option("dbtable", "v_entity_ap_rel")
.load()val df_cell = spark.read
.format("jdbc")
.option("url", mysql_url)
.option("dbtable", "tmac_org_cat")
.load()
.filter($"Category" === 1)
.select("mac3", "Category", "Brand")df_meta.cache()
df_cell.cache()// Read Kafka and join with df_meta to get expect result
// Kafka 裡面是 Mac 地址和採集的相關數據,每行 Kafka 的消息會根上面兩個 cache
// 的 DataFrame join 到一起形成最終需要的結果,問題就是 struct streaming
// 一直在運行,而讀 MySQL 是一次性的,除非重新啟動 Spark
// 程序,如何能夠隔一段時間 reload MySQL 裡面的數據?
val df = spark.readStream.
.format("kafka")
.option("kafka.bootstrap.servers", "namenode:9092")
.option("fetch.message.max.bytes", "50000000")
.option("kafka.max.partition.fetch.bytes", "50000000")
.option("subscribe", "rawdb.raw_data")
.option("failOnDataLoss", true)
.option("startingOffsets", "latest")
.load()
.select($"value".as[Array[Byte]])
.map(avroDeserialize(_))
.as[ApRawData]
.selectExpr("apMacAddress APMAC", "rssi RRSSI", "sourceMacAddress CLIENTMAC", "time STIME", "updatingTime")
.filter($"stime".lt(current_timestamp()))
.filter($"stime".gt(from_unixtime(unix_timestamp(current_timestamp()).minus(5 * 60))))
.as("d")
.join(df_cell.as("c"), substring($"d.CLIENTMAC", 1, 6) === $"c.mac3")
.as("a")
.join(df_meta.as("b"), $"a.apmac" === $"b.apmac")val query = df
.selectExpr(
"ENTITYID",
"CLIENTMAC",
"STIME",
"CASE WHEN a.rrssi &>= b.rssi THEN 1 WHEN a.rrssi &< b.nearbyrssi THEN 3 ELSE 2 END FLAG", "substring(stime, 1, 10) DATE", "substring(stime, 12, 2) HOUR") .repartition(col("DATE"), col("HOUR")) .writeStream .format("csv") .option("header",false) .partitionBy("DATE", "HOUR") .option("checkpointLocation", "/user/root/t_cf_table_chpt") .trigger(ProcessingTime("5 minutes")) .start("T_CF_TABLE") .awaitTermination()
好像不 cache 從 MySQL 讀出來的 DataFrame 就可以了?這樣每次查詢都會重讀一次 MySQL 表。如果不做 cache 對性能的負面影響不可接受,可以每隔一段時間 unpersist 一次 MySQL 的 DataFrame 再重新 cache。這樣下一次查詢時就會重讀 MySQL 表的內容。
請問avroDeserialize
是怎麼定義的
推薦閱讀:
TAG:大數據處理 | Spark | 事件流處理 | StructuredStreaming |