歡迎您光臨本站 註冊首頁

keras實現基於孿生網絡的圖片相似度計算方式

←手機掃碼閱讀     ljg58026 @ 2020-06-12 , reply:0

我就廢話不多說了,大家還是直接看代碼吧!

  import keras  from keras.layers import Input,Dense,Conv2D  from keras.layers import MaxPooling2D,Flatten,Convolution2D  from keras.models import Model  import os  import numpy as np  from PIL import Image  from keras.optimizers import SGD  from scipy import misc  root_path = os.getcwd()  train_names = ['bear','blackswan','bus','camel','car','cows','dance','dog','hike','hoc','kite','lucia','mallerd','pigs','soapbox','stro','surf','swing','train','walking']  test_names = ['boat','dance-jump','drift-turn','elephant','libby']     def load_data(seq_names,data_number,seq_len):   #生成圖片對    print('loading data.....')    frame_num = 51    train_data1 = []    train_data2 = []    train_lab = []    count = 0    while count < data_number:      count = count + 1      pos_neg = np.random.randint(0,2)      if pos_neg==0:        seed1 = np.random.randint(0,seq_len)        seed2 = np.random.randint(0,seq_len)        while seed1 == seed2:         seed1 = np.random.randint(0,seq_len)         seed2 = np.random.randint(0,seq_len)        frame1 = np.random.randint(1,frame_num)        frame2 = np.random.randint(1,frame_num)        path1 = os.path.join(root_path,'data','simility_data',seq_names[seed1],str(frame1)+'.jpg')        path2 = os.path.join(root_path, 'data', 'simility_data', seq_names[seed2], str(frame2) + '.jpg')        image1 = np.array(misc.imresize(Image.open(path1),[224,224]))        image2 = np.array(misc.imresize(Image.open(path2),[224,224]))        train_data1.append(image1)        train_data2.append(image2)        train_lab.append(np.array(0))      else:       seed = np.random.randint(0,seq_len)       frame1 = np.random.randint(1, frame_num)       frame2 = np.random.randint(1, frame_num)       path1 = os.path.join(root_path, 'data', 'simility_data', seq_names[seed], str(frame1) + '.jpg')       path2 = os.path.join(root_path, 'data', 'simility_data', seq_names[seed], str(frame2) + '.jpg')       image1 = np.array(misc.imresize(Image.open(path1),[224,224]))       image2 = np.array(misc.imresize(Image.open(path2),[224,224]))       train_data1.append(image1)       train_data2.append(image2)       train_lab.append(np.array(1))    return np.array(train_data1),np.array(train_data2),np.array(train_lab)     def vgg_16_base(input_tensor):    net = Conv2D(64(3,3),activation='relu',padding='same',input_shape=(224,224,3))(input_tensor)    net = Convolution2D(64,(3,3),activation='relu',padding='same')(net)    net = MaxPooling2D((2,2),strides=(2,2))(net)       net = Convolution2D(128,(3,3),activation='relu',padding='same')(net)    net = Convolution2D(128,(3,3),activation='relu',padding='same')(net)    net= MaxPooling2D((2,2),strides=(2,2))(net)       net = Convolution2D(256,(3,3),activation='relu',padding='same')(net)    net = Convolution2D(256,(3,3),activation='relu',padding='same')(net)    net = Convolution2D(256,(3,3),activation='relu',padding='same')(net)    net = MaxPooling2D((2,2),strides=(2,2))(net)       net = Convolution2D(512,(3,3),activation='relu',padding='same')(net)    net = Convolution2D(512,(3,3),activation='relu',padding='same')(net)    net = Convolution2D(512,(3,3),activation='relu',padding='same')(net)    net = MaxPooling2D((2,2),strides=(2,2))(net)       net = Convolution2D(512,(3,3),activation='relu',padding='same')(net)    net = Convolution2D(512,(3,3),activation='relu',padding='same')(net)    net = Convolution2D(512,(3,3),activation='relu',padding='same')(net)    net = MaxPooling2D((2,2),strides=(2,2))(net)    net = Flatten()(net)    return net     def siamese(vgg_path=None,siamese_path=None):    input_tensor = Input(shape=(224,224,3))    vgg_model = Model(input_tensor,vgg_16_base(input_tensor))    if vgg_path:      vgg_model.load_weights(vgg_path)    input_im1 = Input(shape=(224,224,3))    input_im2 = Input(shape=(224,224,3))    out_im1 = vgg_model(input_im1)    out_im2 = vgg_model(input_im2)    diff = keras.layers.substract([out_im1,out_im2])    out = Dense(500,activation='relu')(diff)    out = Dense(1,activation='sigmoid')(out)    model = Model([input_im1,input_im2],out)    if siamese_path:      model.load_weights(siamese_path)    return model     train = True  if train:    model = siamese(siamese_path='model/simility/vgg.h5')    sgd = SGD(lr=1e-6,momentum=0.9,decay=1e-6,nesterov=True)    model.compile(optimizer=sgd,loss='mse',metrics=['accuracy'])    tensorboard = keras.callbacks.TensorBoard(histogram_freq=5,log_dir='log/simility',write_grads=True,write_images=True)    ckpt = keras.callbacks.ModelCheckpoint(os.path.join(root_path,'model','simility','vgg.h5'),                      verbose=1,period=5)    train_data1,train_data2,train_lab = load_data(train_names,4000,20)    model.fit([train_data1,train_data2],train_lab,callbacks=[tensorboard,ckpt],batch_size=64,epochs=50)  else:    model = siamese(siamese_path='model/simility/vgg.h5')    test_im1,test_im2,test_labe = load_data(test_names,1000,5)    TP = 0    for i in range(1000):     im1 = np.expand_dims(test_im1[i],axis=0)     im2 = np.expand_dims(test_im2[i],axis=0)     lab = test_labe[i]     pre = model.predict([im1,im2])     if pre>0.9 and lab==1:      TP = TP + 1     if pre<0.9 and lab==0:      TP = TP + 1    print(float(TP)/1000)

輸入兩張圖片,標記1為相似,0為不相似。

損失函數用的是簡單的均方誤差,有待改成Siamese的對比損失。

總結:

1.隨機生成了幾組1000對的圖片,測試精度0.7左右,效果一般。

2.問題 1)數據加載沒有用生成器,還得繼續認真看看文檔 2)訓練時劃分驗證集的時候,訓練就會報錯,什麼輸入維度的問題,暫時沒找到原因 3)輸入的shape好像必須給出數字,本想用shape= input_tensor.get_shape(),能訓練,不能保存模型,會報(NOT JSON Serializable,Dimension(None))類型錯誤
 

補充知識: keras 問答匹配孿生網絡文本匹配 RNN 帶有數據

用途:

這篇博客解釋瞭如何搭建一個簡單的匹配網絡。並且使用了keras的lambda層。在建立網絡之前需要對數據進行預處理。處理過後,文本轉變為id字符序列。將一對question,answer分別編碼可以得到兩個向量,在匹配層中比較兩個向量,計算相似度。

網絡圖示:

數據準備:

數據基於網上的淘寶客服對話數據,我也會放在我的下載頁面中。原數據是對話,我篩選了其中label為1的對話。然後將對話拆解成QA對,q是用戶,a是客服。然後對於每個q,有一個a是匹配的,label為1.再選擇一個a,構成新的樣本,label為0.

超參數:

比較簡單,具體看代碼就可以了。

  # dialogue max pair q,a  max_pair = 30000  # top k frequent word ,k  MAX_FEATURES = 450  # fixed q,a length  MAX_SENTENCE_LENGTH = 30  embedding_size = 100  batch_size = 600  # learning rate  lr = 0.01  HIDDEN_LAYER_SIZE = n_hidden_units = 256 # neurons in hidden layer

 

細節:

導入一些庫

  # -*- coding: utf-8 -*-  from keras.layers.core import Activation, Dense, Dropout, SpatialDropout1D  from keras.layers.embeddings import Embedding  from keras.layers.recurrent import LSTM  from keras.preprocessing import sequence  from sklearn.model_selection import train_test_split  import collections  import matplotlib.pyplot as plt  import nltk  import numpy as np  import os  import pandas as pd  from alime_data import convert_dialogue_to_pair  from parameter import MAX_SENTENCE_LENGTH,MAX_FEATURES,embedding_size,max_pair,batch_size,HIDDEN_LAYER_SIZE  DATA_DIR = "../data"  NUM_EPOCHS = 2  # Read training data and generate vocabulary  maxlen = 0  num_recs = 0

 

數據準備,先統計詞頻,然後取出top N個常用詞,然後將句子轉換成 單詞id的序列。把句子中的有效id靠右邊放,將句子左邊補齊padding。然後分成訓練集和測試集

  word_freqs = collections.Counter()  training_data = convert_dialogue_to_pair(max_pair)  num_recs = len([1 for r in training_data.iterrows()])     #for line in ftrain:  for line in training_data.iterrows():    label ,sentence_q = line[1]['label'],line[1]['sentence_q']    label ,sentence_a = line[1]['label'],line[1]['sentence_a']    words = nltk.word_tokenize(sentence_q.lower())#.decode("ascii", "ignore")    if len(words) > maxlen:      maxlen = len(words)    for word in words:      word_freqs[word] += 1    words = nltk.word_tokenize(sentence_a.lower())#.decode("ascii", "ignore")    if len(words) > maxlen:      maxlen = len(words)    for word in words:      word_freqs[word] += 1    #num_recs += 1  ## Get some information about our corpus     # 1 is UNK, 0 is PAD  # We take MAX_FEATURES-1 featurs to accound for PAD  vocab_size = min(MAX_FEATURES, len(word_freqs)) + 2  word2index = {x[0]: i+2 for i, x in enumerate(word_freqs.most_common(MAX_FEATURES))}  word2index["PAD"] = 0  word2index["UNK"] = 1  index2word = {v:k for k, v in word2index.items()}  # convert sentences to sequences  X_q = np.empty((num_recs, ), dtype=list)  X_a = np.empty((num_recs, ), dtype=list)  y = np.zeros((num_recs, ))  i = 0  def chinese_split(x):    return x.split(' ')     for line in training_data.iterrows():    label ,sentence_q,sentence_a = line[1]['label'],line[1]['sentence_q'],line[1]['sentence_a']    #label, sentence = line.strip().split("	")    #print(label,sentence)    #words = nltk.word_tokenize(sentence_q.lower())    words = chinese_split(sentence_q)    seqs = []    for word in words:      if word in word2index.keys():        seqs.append(word2index[word])      else:        seqs.append(word2index["UNK"])    X_q[i] = seqs    #print('add_q')    #words = nltk.word_tokenize(sentence_a.lower())    words = chinese_split(sentence_a)    seqs = []    for word in words:      if word in word2index.keys():        seqs.append(word2index[word])      else:        seqs.append(word2index["UNK"])    X_a[i] = seqs    y[i] = int(label)    i += 1  # Pad the sequences (left padded with zeros)  X_a = sequence.pad_sequences(X_a, maxlen=MAX_SENTENCE_LENGTH)  X_q = sequence.pad_sequences(X_q, maxlen=MAX_SENTENCE_LENGTH)  X = []  for i in range(len(X_a)):    concat = [X_q[i],X_a[i]]    X.append(concat)     # Split input into training and test  Xtrain, Xtest, ytrain, ytest = train_test_split(X, y, test_size=0.2,                          random_state=42)  #print(Xtrain.shape, Xtest.shape, ytrain.shape, ytest.shape)  Xtrain_Q = [e[0] for e in Xtrain]  Xtrain_A = [e[1] for e in Xtrain]  Xtest_Q = [e[0] for e in Xtest]  Xtest_A = [e[1] for e in Xtest]

 

最後建立網絡。先定義兩個函數,一個是句子編碼器,另一個是lambda層,計算兩個向量的絕對差。將QA分別用encoder處理得到兩個向量,把兩個向量放入lambda層。最後有了2*hidden size的一層,將這一層接一個dense層,接activation,得到分類概率。

  from keras.layers.wrappers import Bidirectional  from keras.layers import Input,Lambda  from keras.models import Model     def encoder(inputs_seqs,rnn_hidden_size,dropout_rate):    x_embed = Embedding(vocab_size, embedding_size, input_length=MAX_SENTENCE_LENGTH)(inputs_seqs)    inputs_drop = SpatialDropout1D(0.2)(x_embed)    encoded_Q = Bidirectional(      LSTM(rnn_hidden_size, dropout=dropout_rate, recurrent_dropout=dropout_rate, name='RNN'))(inputs_drop)    return encoded_Q     def absolute_difference(vecs):    a,b =vecs    #d = a-b    return abs(a - b)     inputs_Q = Input(shape=(MAX_SENTENCE_LENGTH,), name="input")  # x_embed = Embedding(vocab_size, embedding_size, input_length=MAX_SENTENCE_LENGTH)(inputs_Q)  # inputs_drop = SpatialDropout1D(0.2)(x_embed)  # encoded_Q = Bidirectional(LSTM(HIDDEN_LAYER_SIZE, dropout=0.2, recurrent_dropout=0.2,name= 'RNN'))(inputs_drop)  inputs_A = Input(shape=(MAX_SENTENCE_LENGTH,), name="input_a")  # x_embed = Embedding(vocab_size, embedding_size, input_length=MAX_SENTENCE_LENGTH)(inputs_A)  # inputs_drop = SpatialDropout1D(0.2)(x_embed)  # encoded_A = Bidirectional(LSTM(HIDDEN_LAYER_SIZE, dropout=0.2, recurrent_dropout=0.2,name= 'RNN'))(inputs_drop)  encoded_Q = encoder(inputs_Q,HIDDEN_LAYER_SIZE,0.1)  encoded_A = encoder(inputs_A,HIDDEN_LAYER_SIZE,0.1)     # import tensorflow as tf  # difference = tf.subtract(encoded_Q, encoded_A)  # difference = tf.abs(difference)  similarity = Lambda(absolute_difference)([encoded_Q, encoded_A])  # x = concatenate([encoded_Q, encoded_A])  #  # matching_x = Dense(128)(x)  # matching_x = Activation("sigmoid")(matching_x)  polar = Dense(1)(similarity)  prop = Activation("sigmoid")(polar)  model = Model(inputs=[inputs_Q,inputs_A], outputs=prop)  model.compile(loss="binary_crossentropy", optimizer="adam",         metrics=["accuracy"])  training_history = model.fit([Xtrain_Q, Xtrain_A], ytrain, batch_size=batch_size,                 epochs=NUM_EPOCHS,                 validation_data=([Xtest_Q,Xtest_A], ytest))  # plot loss and accuracy  def plot(training_history):    plt.subplot(211)    plt.title("Accuracy")    plt.plot(training_history.history["acc"], color="g", label="Train")    plt.plot(training_history.history["val_acc"], color="b", label="Validation")    plt.legend(loc="best")       plt.subplot(212)    plt.title("Loss")    plt.plot(training_history.history["loss"], color="g", label="Train")    plt.plot(training_history.history["val_loss"], color="b", label="Validation")    plt.legend(loc="best")    plt.tight_layout()    plt.show()     # evaluate  score, acc = model.evaluate([Xtest_Q,Xtest_A], ytest, batch_size = batch_size)  print("Test score: %.3f, accuracy: %.3f" % (score, acc))     for i in range(25):    idx = np.random.randint(len(Xtest_Q))    #idx2 = np.random.randint(len(Xtest_A))    xtest_Q = Xtest_Q[idx].reshape(1,MAX_SENTENCE_LENGTH)    xtest_A = Xtest_A[idx].reshape(1,MAX_SENTENCE_LENGTH)    ylabel = ytest[idx]    ypred = model.predict([xtest_Q,xtest_A])[0][0]    sent_Q = " ".join([index2word[x] for x in xtest_Q[0].tolist() if x != 0])    sent_A = " ".join([index2word[x] for x in xtest_A[0].tolist() if x != 0])    print("%.0f	%d	%s	%s" % (ypred, ylabel, sent_Q,sent_A))

 

最後是處理數據的函數,寫在另一個文件裡。

  import nltk  from parameter import MAX_FEATURES,MAX_SENTENCE_LENGTH  import pandas as pd  from collections import Counter  def get_pair(number, dialogue):    pairs = []    for conversation in dialogue:      utterances = conversation[2:].strip(' ').split('	')      # print(utterances)      # break         for i, utterance in enumerate(utterances):        if i % 2 != 0: continue        pairs.append([utterances[i], utterances[i + 1]])        if len(pairs) >= number:          return pairs    return pairs        def convert_dialogue_to_pair(k):    dialogue = open('dialogue_alibaba2.txt', encoding='utf-8', mode='r')    dialogue = dialogue.readlines()    dialogue = [p for p in dialogue if p.startswith('1')]    print(len(dialogue))    pairs = get_pair(k, dialogue)    # break    # print(pairs)    data = []    for p in pairs:      data.append([p[0], p[1], 1])    for i, p in enumerate(pairs):      data.append([p[0], pairs[(i + 8) % len(pairs)][1], 0])    df = pd.DataFrame(data, columns=['sentence_q', 'sentence_a', 'label'])       print(len(data))    return df

  


[ljg58026 ] keras實現基於孿生網絡的圖片相似度計算方式已經有391次圍觀

http://coctec.com/docs/python/shhow-post-238287.html