利用TensorFlow搞定知乎驗證碼之《讓你找中文倒轉漢字》
前言
前段時間心血來潮,打算用TensorFlow做些好玩的東西,做什麼呢,看到一個小哥能爬到知乎上的個人信息,我就想著是否可以拿到用戶的個人信息,然後通過用戶的頭像來判斷用戶的性別、職業等等,後來搜了好幾個知乎的爬蟲,
但是都沒有效,在登錄的時候發現有問題,最後一看,奶奶的,知乎把原先的驗證碼改成這樣的了:需要在一排漢字中找到,簡單看了下,大概7個漢字,不定數個漢字是倒置的,需要人為指出哪些漢字是倒置的,和以往常規的驗證碼識別有點區別,有點意思!!!
所以需要弄個模型來解決這個咯,問題很明確,主要是那個:
- 數據問題?如何生成這些數據
- 切字+單個分類器,還是直接輸出所有的倒置情況(保證驗證碼字數一致)
數據準備
有天沙神在群里發了個鏈接,python生成漢字的代碼, 正好用得上,改改然後很快就搞定了,代碼如下:
# -*- coding: utf-8 -*-from PIL import Image,ImageDraw,ImageFontimport randomimport math, stringimport logging# logger = logging.Logger(name=gen verification)class RandomChar(): @staticmethod def Unicode(): val = random.randint(0x4E00, 0x9FBF) return unichr(val) @staticmethod def GB2312(): head = random.randint(0xB0, 0xCF) body = random.randint(0xA, 0xF) tail = random.randint(0, 0xF) val = ( head << 8 ) | (body << 4) | tail str = "%x" % val return str.decode(hex).decode(gb2312) class ImageChar(): def __init__(self, fontColor = (0, 0, 0), size = (100, 40), fontPath = /Library/Fonts/Arial Unicode.ttf, bgColor = (255, 255, 255), fontSize = 20): self.size = size self.fontPath = fontPath self.bgColor = bgColor self.fontSize = fontSize self.fontColor = fontColor self.font = ImageFont.truetype(self.fontPath, self.fontSize) self.image = Image.new(RGB, size, bgColor) def drawText(self, pos, txt, fill): draw = ImageDraw.Draw(self.image) draw.text(pos, txt, font=self.font, fill=fill) del draw def drawTextV2(self, pos, txt, fill, angle=180): image=Image.new(RGB, (25,25), (255,255,255)) draw = ImageDraw.Draw(image) draw.text( (0, -3), txt, font=self.font, fill=fill) w=image.rotate(angle, expand=1) self.image.paste(w, box=pos) del draw def randRGB(self): return (0,0,0) def randChinese(self, num, num_flip): gap = 1 start = 0 num_flip_list = random.sample(range(num), num_flip) # logger.info(num flip list:{0}.format(num_flip_list)) print num flip list:{0}.format(num_flip_list) char_list = [] for i in range(0, num): char = RandomChar().GB2312() char_list.append(char) x = start + self.fontSize * i + gap + gap * i if i in num_flip_list: self.drawTextV2((x, 6), char, self.randRGB()) else: self.drawText((x, 0), char, self.randRGB()) return char_list, num_flip_list def save(self, path): self.image.save(path)err_num = 0for i in range(10): try: ic = ImageChar(fontColor=(100,211, 90), size=(280,28), fontSize = 25) num_flip = random.randint(3,6) char_list, num_flip_list = ic.randChinese(10, num_flip) ic.save(.join(char_list)+_+.join(str(i) for i in num_flip_list)+".jpeg") except: err_num += 1 continue
為了簡單,沒有在生成的字加一些雜訊,顏色什麼的干擾的東西, 生成的圖像差不多這樣
所以,接下來就是在tensorflow構建網路來識別了
模型
網路設計
受項亮的end2end定長的OCR識別的啟發, 因為我這裡的驗證碼都是長度為10的,所以我們只需要構造網路,最後的output有10個輸出,每個輸出為1個二類分類器:
其中,0表示正常,1表示倒置,這樣就可以構造一個input到10個binary classification的網路,網路結構很簡單(小夥伴們可以在這裡都試試網路的設計,多試試黑科技),好吧,廢話不多說,show you code:def network(): images = tf.placeholder(dtype=tf.float32, shape=[None, 28, 280, 1], name=image_batch) labels = tf.placeholder(dtype=tf.int32, shape=[None, 10], name=label_batch) endpoints = {} conv_1 = slim.conv2d(images, 32, [5,5],1, padding=SAME) avg_pool_1 = slim.avg_pool2d(conv_1, [2,2],[1,1], padding=SAME) conv_2 = slim.conv2d(avg_pool_1, 32, [5,5], 1,padding=SAME) avg_pool_2 = slim.avg_pool2d(conv_2, [2,2],[1,1], padding=SAME) conv_3 = slim.conv2d(avg_pool_2, 32, [3,3]) avg_pool_3 = slim.avg_pool2d(conv_3, [2,2], [1,1]) flatten = slim.flatten(avg_pool_3) fc1 = slim.fully_connected(flatten, 512, activation_fn=None) out0 = slim.fully_connected(fc1,2, activation_fn=None) out1 = slim.fully_connected(fc1,2, activation_fn=None) out2 = slim.fully_connected(fc1,2, activation_fn=None) out3 = slim.fully_connected(fc1,2, activation_fn=None) out4 = slim.fully_connected(fc1,2, activation_fn=None) out5 = slim.fully_connected(fc1,2, activation_fn=None) out6 = slim.fully_connected(fc1,2, activation_fn=None) out7 = slim.fully_connected(fc1,2, activation_fn=None) out8 = slim.fully_connected(fc1,2, activation_fn=None) out9 = slim.fully_connected(fc1,2, activation_fn=None) global_step = tf.Variable(initial_value=0) out0_argmax = tf.expand_dims(tf.argmax(out0, 1), 1) out1_argmax = tf.expand_dims(tf.argmax(out1, 1), 1) out2_argmax = tf.expand_dims(tf.argmax(out2, 1), 1) out3_argmax = tf.expand_dims(tf.argmax(out3, 1), 1) out4_argmax = tf.expand_dims(tf.argmax(out4, 1), 1) out5_argmax = tf.expand_dims(tf.argmax(out5, 1), 1) out6_argmax = tf.expand_dims(tf.argmax(out6, 1), 1) out7_argmax = tf.expand_dims(tf.argmax(out7, 1), 1) out8_argmax = tf.expand_dims(tf.argmax(out8, 1), 1) out9_argmax = tf.expand_dims(tf.argmax(out9, 1), 1) out_score = tf.concat([out0, out1, out2, out3, out4, out5, out6, out7, out8, out9], axis=1) out_final = tf.cast(tf.concat([out0_argmax, out1_argmax, out2_argmax, out3_argmax, out4_argmax, out5_argmax, out6_argmax, out7_argmax, out8_argmax, out9_argmax], axis=1), tf.int32) loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=out0, labels=tf.one_hot(labels[:,0],depth=2))) loss1 = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=out1, labels=tf.one_hot(labels[:,1],depth=2))) loss2 = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=out2, labels=tf.one_hot(labels[:,2],depth=2))) loss3 = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=out3, labels=tf.one_hot(labels[:,3],depth=2))) loss4 = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=out4, labels=tf.one_hot(labels[:,4],depth=2))) loss5 = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=out5, labels=tf.one_hot(labels[:,5],depth=2))) loss6 = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=out6, labels=tf.one_hot(labels[:,6],depth=2))) loss7 = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=out7, labels=tf.one_hot(labels[:,7],depth=2))) loss8 = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=out8, labels=tf.one_hot(labels[:,8],depth=2))) loss9 = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=out9, labels=tf.one_hot(labels[:,9],depth=2))) loss_list= [loss, loss1, loss2, loss3,loss4, loss5, loss6, loss7, loss8, loss9] loss_sum = tf.reduce_sum(loss_list) train_op = tf.train.AdamOptimizer(learning_rate=0.0001).minimize(loss_sum, global_step=global_step) accuracy = tf.reduce_mean(tf.cast(tf.reduce_all(tf.equal(out_final, labels), axis=1), tf.float32)) tf.summary.scalar(loss_sum, loss_sum) tf.summary.scalar(accuracy, accuracy) merged_summary_op = tf.summary.merge_all() endpoints[global_step] = global_step endpoints[images] = images endpoints[labels] = labels endpoints[train_op] = train_op endpoints[loss_sum] = loss_sum endpoints[accuracy] = accuracy endpoints[merged_summary_op] = merged_summary_op endpoints[out_final] = out_final endpoints[out_score] = out_score return endpoints
注意tf1.0和tf0.n裡面有一些api介面改了,這裡的代碼是tf1.0的要改成0.n大概需要改下tf.concat,另外因為label是10為binary的數,所以這裡很顯然,loss續傳重新設計,這裡把10為的loss全加起來作為去優化的object func
其實我覺得這裡可以用更多的方法,因為這裡的label可以看出一個10位上的數據分布,因為我們的predict和原先的groundtruth的分布的差異,比如kl距離都可以嘗試,我這裡簡單粗暴的直接加起來了,另外就是tf.reduce_all的使用意思是只要其中有任意為false,則結果為false,恰好滿足我這裡必須10位都相同才視為識別正確其他的代碼沒什麼特別好說明的,想要學習這些api的直接run下代碼就好了,再有不懂的直接在我文章下面留言,我會第一時間回復
模型訓練
def train(): train_feeder = DataIterator(data_dir=FLAGS.train_data_dir) test_feeder = DataIterator(data_dir=FLAGS.test_data_dir) with tf.Session() as sess: train_images, train_labels = train_feeder.input_pipeline(batch_size=FLAGS.batch_size, aug=True) test_images, test_labels = test_feeder.input_pipeline(batch_size=FLAGS.batch_size) endpoints = network() sess.run(tf.global_variables_initializer()) coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(sess=sess, coord=coord) saver = tf.train.Saver() train_writer = tf.summary.FileWriter(./log + /train,sess.graph) test_writer = tf.summary.FileWriter(./log + /val) start_step = 0 if FLAGS.restore: ckpt = tf.train.latest_checkpoint(FLAGS.checkpoint_dir) if ckpt: saver.restore(sess, ckpt) print "restore from the checkpoint {0}".format(ckpt) start_step += int(ckpt.split(-)[-1]) logger.info(:::Training Start:::) try: while not coord.should_stop(): start_time = time.time() train_images_batch, train_labels_batch = sess.run([train_images, train_labels]) feed_dict = {endpoints[images]: train_images_batch, endpoints[labels]: train_labels_batch} _, loss_val, train_summary, step = sess.run([endpoints[train_op], endpoints[loss_sum], endpoints[merged_summary_op], endpoints[global_step]], feed_dict=feed_dict) train_writer.add_summary(train_summary, step) end_time = time.time() logger.info("[train] the step {0} takes {1} loss {2}".format(step, end_time-start_time, loss_val)) if step > FLAGS.max_steps: break if step % FLAGS.eval_steps == 1: logger.info(========Begin eval stage =========) start_time = time.time() # cant run test_images_batch, test_labels_batch = sess.run([test_images, test_labels]) logger.info([test] gen test batch spend {0}.format(time.time()-start_time)) feed_dict = { endpoints[images]: test_images_batch, endpoints[labels]: test_labels_batch } accuracy_val,test_summary = sess.run([endpoints[accuracy], endpoints[merged_summary_op]], feed_dict=feed_dict) end_time = time.time() test_writer.add_summary(test_summary, step) logger.info( [test] the step {0} accuracy {1} spend time {2}.format(step, accuracy_val, (end_time-start_time))) if step % FLAGS.save_steps == 1: logger.info(Save the ckpt of {0}.format(step)) saver.save(sess, os.path.join(FLAGS.checkpoint_dir, my-model), global_step=endpoints[global_step]) except tf.errors.OutOfRangeError: # print "============train finished=========" logger.info(==================Train Finished================) saver.save(sess, os.path.join(FLAGS.checkpoint_dir, my-model), global_step=endpoints[global_step]) finally: coord.request_stop() coord.join(threads)
訓練過程會新建兩個DataIterator的對象,這樣就可以在訓練的過程中對test的dataset來做訓練的驗證,訓練過程的loss和accuracy見下圖:
大家可以看看大概到了4000多個step的時候,train和val的batch就的accuracy就特別高了,可能是我這裡的train和val的dataset都太simple的,所以模型的訓練很快就可以達到很高的性能,為了測試這裡的模型是不是扯淡,照常寫了validation來做所有test dataset的數據集上accuracy的驗證
模型驗證
def validation(): # it should be fixed by using placeholder with epoch num in train stage logger.info("=======Validation Beigin=======") test_feeder = DataIterator(data_dir=../data/test_data/) predict_labels_list = [] groundtruth = [] with tf.Session() as sess: test_images, test_labels = test_feeder.input_pipeline(batch_size=FLAGS.batch_size,num_epochs=1) endpoints = network() sess.run(tf.global_variables_initializer()) sess.run(tf.local_variables_initializer()) coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(sess=sess, coord=coord) saver = tf.train.Saver() ckpt = tf.train.latest_checkpoint(FLAGS.checkpoint_dir) if ckpt: saver.restore(sess, ckpt) logger.info(restore from the checkpoint {0}.format(ckpt)) logger.info(======Start Validation=======) try: i = 0 acc_sum = 0.0 while not coord.should_stop(): i += 1 start_time = time.time() test_images_batch, test_labels_batch = sess.run([test_images, test_labels]) feed_dict = {endpoints[images]:test_images_batch, endpoints[labels]: test_labels_batch} labels_batch, predict_labels_batch, acc = sess.run([endpoints[labels],endpoints[out_final], endpoints[accuracy]], feed_dict=feed_dict) predict_labels_list += predict_labels_batch.tolist() groundtruth += labels_batch.tolist() acc_sum += acc logger.info(the batch {0} takes {1} seconds, accuracy {2}.format(i, time.time()-start_time, acc)) except tf.errors.OutOfRangeError: logger.info(==================Validation Finished===================) logger.info(The finally accuracy {0}.format(acc_sum/i)) finally: coord.request_stop() coord.join(threads) return {predictions:predict_labels_list, gt_labels:groundtruth}
因為我這邊太暴力了,在生成數據集的時候,訓練數據100w(有生成異常,最終80多萬張),測試數據60w(最後好像有40多w成功生成的), 跑一次全測試數據集也好長時間,但是看了好久的輸出,都是下面的結果,嚇死我了,全是1:
模型inference
有點害怕是不是代碼寫錯的原因,再次確認,寫了個inference:
def inference(image): logger.info(============inference==========) temp_image = Image.open(image).convert(L) # temp_image = temp_image.resize((FLAGS.image_height, FLAGS.image_size),Image.ANTIALIAS) temp_image = np.asarray(temp_image) / 255.0 temp_image = temp_image.reshape([-1, 28, 280, 1]) sess = tf.Session() logger.info(========start inference============) # images = tf.placeholder(dtype=tf.float32, shape=[None, 280, 28, 1]) endpoints = network() saver = tf.train.Saver() ckpt = tf.train.latest_checkpoint(FLAGS.checkpoint_dir) if ckpt: saver.restore(sess, ckpt) feed_dict = {endpoints[images]: temp_image} predict_val, predict_index = sess.run([endpoints[out_score],endpoints[out_final]], feed_dict=feed_dict) sess.close() return predict_val, predict_index
隨機找了張圖像,然後做了下inference,結果是
這裡稍微做下解釋../data/test_data/092e9ae8-ee91-11e6-91c1-525400551618_69128.jpeg文件名前面有uuid+』_』+』label』組成,最開始我是把uuid部分用漢字表示,這樣生成的圖像既有漢字信息又有label信息,
在我本地mac上是ok的可以生成的,但是在ubuntu的雲主機上,可能是漢字的不支持的原因,寫入文件為漢字的時候,就生成不了,太忙了暫時沒有解決這個問題。這裡的label 69128轉換為10位binary 就是0110001011,看看我的結果所以好像是沒有大的問題
總結
照例做個總結,綜上,這次主要是設計一個小的網路做一個單input,多output的網路,然後確定一個loss作為目標函數來最小化,整個網路很簡單,但是確實是很有效,另外因為數據還有字數都是自己DIY的,當然不能直接使用到知乎
上,但是選擇好合適字體,做一些扭曲變換,應該會很快cover到真正的知乎的生產環境上來,對此很有信心,有興趣的小伙幫可以fork下項目,然後探索下把知乎的驗證碼搞定,以上的東西純屬拋磚引玉,有興趣的小夥伴一起玩呀所有的代碼都在tensorflow-101/zhihu_code,感覺有意思的star下吧,有興趣把知乎這塊的驗證碼搞定的fork下,我們一起來玩,另外你可能在生成的時候要找下相應的字體,這裡由於字體文件太大我就沒有push上去,有興趣的可以自己去找下知乎生成的字體來玩推薦閱讀:
※cs20si:tensorflow for research 學習筆記2
※深入淺出Tensorflow(一):深度學習及TensorFlow簡介
TAG:TensorFlow | 深度学习DeepLearning | OCR光学字符识别 |