IMDB 網路資料庫 (Internet Movie Database),是一個電影相關的線上資料庫,內部資料集共有50000筆影評,訓練資料與測試資料各25000筆,每一筆影評都被分為正評
或 負評
。
本篇文章利用Pytorch中的BERT 模型去分類IMDB中的影評
BERT(PyTorch) 載入套件 1 !pip install transformers
1 2 3 4 5 6 7 8 9 10 11 12 13 14 # Basic import time import os import pandas as pd from transformers import BertTokenizer # PyTorch import torch from torch.utils.data import Dataset,random_split from torch.utils.data import DataLoader from torch.nn.utils.rnn import pad_sequence # IMDB from keras.datasets import imdb
因為IMDB資料是英文的評論,所以讀取pretrain-model是用不區分英文大小寫的 bert-base-uncased
,再隨機取10個字來看一下BERT的Tokenizer完的字典
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 # 取得 BERT 內的 pre-train tokenizer PRETRAINED_MODEL_NAME = "bert-base-uncased" #英文pretrain(不區分大小寫) tokenizer = BertTokenizer.from_pretrained(PRETRAINED_MODEL_NAME) vocab = tokenizer.vocab print("dict size", len(vocab)) # 隨機看一下 BERT tokenizer 完的字典 import random random_tokens = random.sample(list(vocab), 10) random_ids = [vocab[t] for t in random_tokens] print("{0:20}{1:15}".format("token", "index")) print("-" * 25) for t, id in zip(random_tokens, random_ids): #隨便看幾個字 print("{0:15}{1:10}".format(t, id))
準備原始文本資料 把IMDB的資料集讀取進來
1 2 # 僅保留訓練資料集前10000個最常出現的單詞,捨棄低頻的單詞 (train_data,train_labels),(test_data,test_labels) = imdb.load_data(num_words=10000)
跟LSTM前處理一樣,先把sequence還原成文字
1 2 3 4 5 6 7 8 9 10 # 下載IMDB的字典 word_index -> word:index word_index = imdb.get_word_index() # 鍵值對調 reverse_word_index -> index:word reverse_word_index = {value:key for key,value in word_index.items()} # 查看每一筆評論內容(index-3,因為index=0,1和2分別是“填充”,“序列開始”,“未知”的保留索引),查不到的以?表示 def read_IMDB_text(train_data): text = ' '.join([reverse_word_index.get(i-3,'?') for i in train_data]) return text
把文字內容(text)以及評價分類(label)做成DataFrame
1 2 3 4 5 6 7 8 9 10 11 12 # 做成train/test的dataframe df_train = pd.DataFrame({'TRAIN_text_to_sequence':train_data,"TRAIN_label":train_labels}) df_test = pd.DataFrame({'TEST_text_to_sequence':test_data,"TEST_label":test_labels}) df_train['TRAIN_text'] = df_train['TRAIN_text_to_sequence'].apply(read_IMDB_text) df_test['TEST_text'] = df_test['TEST_text_to_sequence'].apply(read_IMDB_text) df_train = df_train[["TRAIN_text","TRAIN_label"]] df_test = df_test[["TEST_text","TEST_label"]] display(df_train.head()) display(df_test.head())
將原始文本轉換成BERT相容的輸入格式 實作一個可以用來讀取訓練與測試集的 Dataset ,這個Dataset會將資料裏頭的text轉換成BERT的相容輸入格式,並回傳3個tensors
tokens_tensor
: 合併句子的index sequence,包含[CLS],[SEP]
segments_tensor
: 用來區別兩句子的界線
label_tensor
: 將分類的label轉換成index的tensor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 # 建立Dataset class IMDB_Dataset(Dataset): def __init__(self, mode, tokenizer): assert mode in ["train", "test"] self.mode = mode self.df = eval(f"df_{mode}") # df_train or df_test self.len = len(self.df) self.maxlen = 300 #限制文章長度(depend on 你的記憶體) self.tokenizer = tokenizer # 把 BERT tokenizer 傳進來 # 定義回傳一筆訓練/測試數據的函式 def __getitem__(self, idx): origin_text = self.df.iloc[idx][0] # 原始文本 origin_label = self.df.iloc[idx][1] # 原始分類 if self.mode == "test": text = self.df.iloc[idx][0] label_tensor = None # label_id = self.df.iloc[idx][1] # label_tensor = torch.tensor(label_id) else: text = self.df.iloc[idx][0] # label_id = self.label_id label_tensor = torch.tensor(origin_label) # 建立第一個句子的 BERT tokens 並加入分隔符號 [SEP] word_pieces = ["[CLS]"] tokens_a = self.tokenizer.tokenize(text) word_pieces += tokens_a[:self.maxlen] + ["[SEP]"] len_a = len(word_pieces) # 將整個 token 序列轉換成索引序列 ids = self.tokenizer.convert_tokens_to_ids(word_pieces) tokens_tensor = torch.tensor(ids) # 將第一句包含 [SEP] 的 token 位置設為 0,其他為 1 表示第二句 segments_tensor = torch.tensor([0] * len_a,dtype=torch.long) return (tokens_tensor, segments_tensor, label_tensor, origin_text, origin_label) def __len__(self): return self.len
透過IMDB_Dataset
的class實例出訓練資料集與測試資料集,並轉成BERT的輸入格式
1 2 3 # initialize Dataset trainset = IMDB_Dataset("train", tokenizer=tokenizer) testset = IMDB_Dataset("test", tokenizer=tokenizer)
訓練資料集的第一筆回傳3個tensor加上原始文本與原始label,分別是tokens_tensor
, segments_tensor
, label_tensor
, origin_text
, origin_label
tensor([ 101, 1029, 2023, 2143, 2001, 2074, 8235, 9179, 3295, 17363, 2466, 3257, 3071, 1005, 1055, 2428, 10897, 1996, 2112, 2027, …]), tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, …]), tensor(1), “? this film was just brilliant casting location scenery story direction everyone’s …”, 1
隨機選一個id來看一下轉換前後的差異
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 # 隨便選一個樣本 sample_idx = 2 # 利用剛剛建立的 Dataset 取出轉換後的 id tensors tokens_tensor, segments_tensor, label_tensor, origin_text, origin_label = trainset[sample_idx] # 將 tokens_tensor 還原成文本 tokens = tokenizer.convert_ids_to_tokens(tokens_tensor.tolist()) print(f"""[原始文本] 句子:{origin_text} 分類 :{origin_label} -------------------- [Dataset 回傳的 tensors] tokens_tensor :{tokens_tensor[0:20]} segments_tensor:{segments_tensor[0:20]} label_tensor :{label_tensor} """)
[原始文本] 句子:? this has to be one of the worst films of the 1990s when… 分類 :0
[Dataset 回傳的 tensors] tokens_tensor :tensor([ 101, 1029, 2023, 2038, 2000, 2022, 2028, 1997, 1996, 5409, …])
segments_tensor:tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, …])
label_tensor :0
製作一個DataLoader去分批讀取小量的mini-batch 這個函式的輸入 samples
是一個 list,裡頭的每個 element 都是剛剛定義的 IMDB_Dataset
回傳的一個資料,每個資料都包含3個tensors:
tokens_tensor
segments_tensor
label_tensor
它會對前兩個 tensors 作 zero padding,並產生masks_tensors
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def create_mini_batch(samples): tokens_tensors = [s[0] for s in samples] segments_tensors = [s[1] for s in samples] # 訓練集有 labels if samples[0][2] is not None: label_ids = torch.stack([s[2] for s in samples]) else: label_ids = None # zero pad到該batch下最長的長度 tokens_tensors = pad_sequence(tokens_tensors, batch_first=True) segments_tensors = pad_sequence(segments_tensors,batch_first=True) # attention masks,將 tokens_tensors 裡頭不為 zero padding # 的位置設為 1 讓 BERT 只關注這些位置的 tokens masks_tensors = torch.zeros(tokens_tensors.shape,dtype=torch.long) masks_tensors = masks_tensors.masked_fill(tokens_tensors != 0, 1) return tokens_tensors, segments_tensors, masks_tensors, label_ids
實例化一個每次回傳 batch size 個訓練樣本的 DataLoader,並利用 collate_fn
將 list of samples 合併成一個 mini-batch
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 BATCH_SIZE = 64 trainloader = DataLoader(trainset, batch_size=BATCH_SIZE,collate_fn=create_mini_batch,shuffle=True) testloader = DataLoader(testset, batch_size=BATCH_SIZE,collate_fn=create_mini_batch,shuffle=False) data = next(iter(trainloader)) tokens_tensors, segments_tensors, masks_tensors, label_ids = data print(f""" tokens_tensors.shape = {tokens_tensors.shape} {tokens_tensors} ------------------------ segments_tensors.shape = {segments_tensors.shape} {segments_tensors} ------------------------ masks_tensors.shape = {masks_tensors.shape} {masks_tensors} ------------------------ label_ids.shape = {label_ids.shape} {label_ids} """)
以BERT為基礎加入layers成下游任務模型 載入一個可以做分類的 BERT 模型
1 2 3 4 from transformers import BertForSequenceClassification NUM_LABELS = 2 model = BertForSequenceClassification.from_pretrained(PRETRAINED_MODEL_NAME, num_labels=NUM_LABELS)
定義一個可以針對特定 DataLoader 取得模型預測結果以及分類準確度的函式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 def get_predictions(model, dataloader, compute_acc=False): predictions = None correct = 0 total = 0 with torch.no_grad(): # 遍巡整個資料集 for data in dataloader: # 將所有 tensors 移到 GPU 上 if next(model.parameters()).is_cuda: data = [t.to("cuda:0") for t in data if t is not None] # 別忘記前 3 個 tensors 分別為 tokens, segments 以及 masks # 且強烈建議在將這些 tensors 丟入 `model` 時指定對應的參數名稱 tokens_tensors, segments_tensors, masks_tensors = data[:3] outputs = model(input_ids=tokens_tensors, token_type_ids=segments_tensors, attention_mask=masks_tensors) logits = outputs[0] _, pred = torch.max(logits.data, 1) # 用來計算訓練集的分類準確率 if compute_acc: labels = data[3] total += labels.size(0) correct += (pred == labels).sum().item() # 將當前 batch 記錄下來 if predictions is None: predictions = pred else: predictions = torch.cat((predictions, pred)) if compute_acc: acc = correct / total return predictions, acc return predictions
1 2 3 4 5 6 # 讓模型跑在 GPU 上並取得訓練集的分類準確率 device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") print("device:", device) model = model.to(device) _, acc = get_predictions(model, trainloader, compute_acc=True) print("classification acc:", acc)
device: cuda:0 classification acc: 0.49984
fine-tune下游任務模型 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 %%time # 訓練模式 model.train() # 使用 Adam Optim 更新整個分類模型的參數 optimizer = torch.optim.Adam(model.parameters(), lr=1e-5) EPOCHS = 6 for epoch in range(EPOCHS): running_loss = 0.0 for data in trainloader: tokens_tensors, segments_tensors, \ masks_tensors, labels = [t.to(device) for t in data] # 將參數梯度歸零 optimizer.zero_grad() # forward pass outputs = model(input_ids=tokens_tensors, token_type_ids=segments_tensors, attention_mask=masks_tensors, labels=labels) loss = outputs[0] # backward loss.backward() optimizer.step() # 紀錄當前 batch loss running_loss += loss.item() # 計算分類準確率 _, acc = get_predictions(model, trainloader, compute_acc=True) print('[epoch %d] loss: %.3f, acc: %.3f' % (epoch + 1, running_loss, acc))
對測試資料做推論 1 2 3 4 5 6 7 # 建立測試集。這邊我們可以用跟訓練時不同的 batch_size,看你 GPU 多大 testset = IMDB_Dataset("test", tokenizer=tokenizer) testloader = DataLoader(testset, batch_size=256, collate_fn=create_mini_batch) # 用分類模型預測測試集 predictions = get_predictions(model, testloader) predictions
完整程式碼 IMDB-BERT
參考