本文は Andrej Karpathy の 4 時間で GPT-2 を再現する内容に基づいており、個人的に非常に良い動画だと感じました。これは LLM の進化の歴史の終結篇であり、この記事はその文字バージョンの補足です。前の内容については https://blog.nagi.fun のホームページを参照してください。このブロガーは非常に詳細に書いています。
本シリーズは全体で 3 つの部分に分かれており、それぞれ主体部分の実装、加速実装、分散トレーニングです。
GPT-2 nn.Module の実装#
Config 設定#
@dataclass
class GPTConfig():
block_size: int=1024 # シーケンス長の制限(コンテキストウィンドウの長さ)
vocab_size: int=50527 # 語彙のサイズ
n_layer: int=12 # Transformerの層数
n_head: int=12 # 注意ヘッドの数
n_embd: int=768 # 埋め込み次元(各トークンのベクトルの長さ)
@dataclass
デコレーターはGPTConfig
という名前の設定クラスを定義します。
(デコレーターがわからない場合は CSDN や知乎を参照してください)
なぜ dataclass を使用するのか:
•通常のクラスでは手動で__init__
メソッドを書く必要がありますが、デコレーターを使うと非常に簡単です。
•明示的な宣言をサポートし、直接pirnt(GPTConfig(n_head=16))
でパラメータを直接印刷できます。
BackBone#
class GPT(nn.Module):
def __init__(self, config):
super().__init__()
self.config = config
self.transformer = nn.ModuleDict(dict(
# 単語トークンの埋め込み
wte = nn.Embedding(config.vocab_size, config.n_embd),
# 単語位置の埋め込み
wpe = nn.Embedding(config.block_size, config.n_embd),
# 主体ブロック
h = nn.ModuleList([Block(config) for _ in range(config.n_layers)])
# 単語トークンの埋め込み、
ln_f = nn.LayerNorm(config.n_embd),
))
self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False)
class Block(nn.Module):
def __init__(self, config):
super().__init__()
self.ln_1 = nn.LayerNorm(config.n_embd)
self.ln_2 = nn.LayerNorm(config.n_embd)
self.attn = CasualSelfAttention(config)
self.mlp = mlp(config)
def forward(self, x):
x = x + self.attn(self.ln_1(x))
x = x + self.mlp(self.ln_2(x))
return x
self.transformer
:transformer アーキテクチャの本体
nn.ModuleDict
:nn.Module 内の辞書、nn.ModuleDict(ln_f = nn.LayerNorm(config.n_embd) ,)
は{ln_f: nn.LayerNorm(config.n_embd)}
と理解できます。
wte
:単語から埋め込みへの線形層 [語彙の長さ、埋め込みの次元]、単語を特徴ベクトルに変換します。
wpe
:単語位置から埋め込みへの線形層 [シーケンスの長さ、埋め込みの次元]、位置情報を特徴ベクトルに変換します。
h
:transformer のエンコーダー本体、各 Block は 1 つの attention と 1 つの mlp で構成されています。
ln_f
:LayerNorm、Pre-Norm 後に得られた大きな分散を正規化します。後で説明があります。
lm_head
:最終層の出力で、単語の特徴ベクトルを具体的な単語に変換します。
Block
:Transformer は複数の同じ Block で構成されています。
Tips⚠️:ここでは GPT2 の LN 層が attention と mlp の前にあることに気づくでしょう。これは上の図の原文の LN 層と残差(先に残差、次に正規化)を接続するのとは異なります。
Karpathyの説明は次のとおりです:元のモデルは先に残差接続を行い、その後LN正規化処理を行います。これは接続された残差も正規化されることを示していますが、これは良くありません。純粋な残差の方が良いです。なぜなら、逆伝播の際に勾配が戻るとき、加算はその勾配を均等に2つの分岐に分配するため、勾配が残差経路を通じて直接入力に流れることを意味します。最適化の観点から見て、クリーンな残差は望ましいです。正直、彼の説明は理解できなかったので、関連する内容をネットで検索しました。そして、GPT2のやり方はpre-normと呼ばれ、Attention is all you needのやり方はpost-normと呼ばれることがわかりました。
スー神はこの 2 つの違いについて非常に適切に説明しています。残差接続は であり、もし の分散が で、 の分散が であれば、残差接続後の分散は になります。つまり、残差は分散を増幅します。私たちはこの分散を縮小する方法を考えなければなりません。素朴な方法は正規化を追加することです。つまり、 ですが、この方法は前向き伝播の分散を安定させますが、実際には残差の恒等分岐を著しく弱めてしまいます。そのため、残差の「訓練しやすさ」という利点を失ってしまいます。通常、ウォームアップを行い、十分に小さな学習率を設定しないと収束しません。また、transformer の構造には 2 つの特徴があります: ウォームアップ段階のハイパーパラメータに敏感で、最適化プロセスの収束速度が遅い(この部分は筆者も理由がわかりません)。つまり、post-norm の状況では収束がさらに難しくなり、トレーニングコストもある程度上昇します。
次に、残差の恒等分岐が弱まったことを説明します(実際には Karpathy が言う「クリーンな残差」)。初期状態で と の分散が 1 であると仮定すると、 の分散は 2 になります。そして、Normalization 操作は分散を再び 1 に戻す役割を果たします。これは、初期段階の Post Norm が次のように表されることを示しています。
再帰的に続けると、
本来残差の意味は、前の入力層に「グリーンルート」を提供し、勾配がより直接に戻ることを可能にすることですが、Post Normではこの「グリーンルート」が著しく弱められ、前の方に近づくほど重みが小さくなります。これにより、複数の残差接続の後、前の残差が末尾の勾配変化を感知できなくなり、残差は「名存実亡」となり、したがって訓練が難しくなります。論文は「ON LAYER NORMALIZATION IN THE TRANSFORMER ARCHITECTURE」を参照してください。
修正された Pre-Norm の形式は次のようになります。
展開して繰り返すと:
各残差経路は平等に重み付けされ、残差の効果は Post Norm よりも明らかになります。したがって、最適化が容易になります。もちろん、最終的な の分散は非常に大きくなるため、予測層の前に にも正規化を追加する必要があります。これがln_f
です。
Karpathy は Attention がトークン間の通信の場所であり、プーリング関数であり、加重和関数であり、 ruduce operation
であると述べています。
MLP は各トークンで発生し、トークン間で情報が収集または交換されることはなく、 map operation
です。
MLP#
class mlp(nn.Module):
def __init__(self, config):
super().__init__()
self.c_fc = nn.Linear(config.n_embd, config.n_embd*4)
self.Gelu = nn.GELU(approximate='tanh')
self.c_proj = nn.Linear(config.n_embd*4, config.n_embd)
def forward(self, x):
x = self.c_fc(x)
x = self.Gelu(x)
x = self.c_proj(x)
return x
非常にシンプルな MLP 線形マッピングで、 [n_embd, 4 * n_embd]
から [4 * n_embd, n_embd]
へ、間に非線形層 GELU 活性化関数を追加します。GELU の関数グラフは Relu に非常に似ていますが、尾部で導関数が存在し、つまり x が 0 未満のときに導関数が常に 0 であるという Relu の問題を解決します。この滑らかさがより良い効果を生み出します。
Karpathy はここで tanh の近似値を使用する理由について、これは歴史的な問題であり、tensorflow の時代に正確な GELU が非常に遅かったため、tanh を使用して GELU を近似する関数が開発されたと述べています。
Attention#
class CasualSelfAttention(nn.Module):
def __init__(self, config):
super().__init__()
assert config.n_embd % config.n_head == 0
self.c_attn = nn.Linear(config.n_embd, config.n_embd*3)
self.c_proj = nn.Linear(config.n_embd, config.n_embd)
self.n_embd = config.n_embd
self.n_head = config.n_head
# 後続の重みモジュールでその役割について説明します
self.c_proj.NANOGPT_SCALE_INIT = 1
self.register_buffer("bias", torch.tril((torch.ones(config.block_size, config.block_size)).view(1, 1, config.block_size, config.block_size)))
def forward(self, x):
B, T, C = x.size()
qkv = self.c_attn(x)
# qkvを取得
q, k, v = qkv.split(self.n_embd, dim=2)
# query, key, valueはすべて[B, n_head, T, n_embd//n_head]に分割されます
query = q.view(B, T, self.n_head, C//self.n_head).transpose(1, 2)
key = k.view(B, T, self.n_head, C//self.n_head).transpose(1, 2)
value = v.view(B, T, self.n_head, C//self.n_head).transpose(1, 2)
# QK^T/d
att = query @ key.transpose(-1, -2) * (1.0/math.sqrt(key.size(-1)))
mask_att = att.masked_fill(self.bias[:,:,:T,:T]==0, float('-inf'))
wei = F.softmax(mask_att, dim=-1)
out = wei @ value
out = out.transpose(1,2).contiguous().view(B, T, C)
out = self.c_proj(out)
return out
self.c_attn
: の組み合わせで、入力 を入力の に変換します。
self.c_proj
: を計算した後の 1 層の線形層です。
self.n_embd
:各トークンの特徴ベクトル空間です。
self.n_head
:マルチヘッド注意メカニズムのヘッドの数です。
self.bias
:ここでの bias はマスクの意味であり、上三角行列で、前のトークンが後のトークンの情報を学習するのを防ぎます。具体的な原理は次のとおりです:入力 に対して:
-inf
は後続のsoftmax
プロセスで 0 に近い値に変わり、分類に影響を与えません。
contiguous()
:transpose は物理的な順序を変更せず、形式的な順序のみを変更します。この関数を使用することで物理的な順序を修正できます。例えば、配列 に対して、 transpose(1,2)
の後は になりますが、これらの配列の物理的なストレージは です。したがって、transpose された配列に対して view 操作を行うとエラーが発生します。
HuggingFace からのダウンロード#
# class GPT内で
@classmethod
def from_pretrained(cls, model_type):
"""Huggingfaceから事前学習済みのGPT-2モデルの重みを読み込みます"""
# 4種類のモデル
assert model_type in {'gpt2','gpt2-medium','gpt2-large','gpt2-xl'}
# 読み込んでいるモデルの種類を表示
print("Loading weights from pretrained gpt:%s"%model_type)
# 各GPTに対応するハイパーパラメータは異なります
config_args ={
'gpt2' : dict(n_layer=12,n_head=12,n_embd=768), # 124M params
'gpt2-medium' : dict(n_layer=24,n_head=16,n_embd=1024), #350M params
'gpt2-large' : dict(n_layer=36,n_head=20,n_embd=1280), #774M params
'gpt2-xl' : dict(n_layer=48,n_head=25,n_embd=1600), #1558M params
}[model_type]
# 語彙のサイズは常に50527です
config_args['vocab_size'] = 50257
# 単一のブロックのサイズは常に1024です
config_args['block_size'] = 1024
# モデルにハイパーパラメータをインポート
config = GPTConfig(**config_args)
model = GPT(config)
# sdはモデルパラメータの辞書です
sd = model.state_dict()
sd_keys = sd.keys()
sd_keys = [k for k in sd_keys if not k.endswith('.attn.bias')] # このマスクを除外
# HFから重みをダウンロードし、sd_hfはダウンロードされたモデルパラメータの辞書です
model_hf = GPT2LMHeadModel.from_pretrained(model_type, cache_dir="/home/shong_Tan/project/gpt_2/model_weight", local_files_only=True)
sd_hf = model_hf.state_dict()
sd_keys_hf = sd_hf.keys()
# HFの重みのマスクバイアスを除外
sd_keys_hf = [k for k in sd_keys_hf if not k.endswith('.attn.masked_bias')]
# HFの重みのバイアスを除外
sd_keys_hf = [k for k in sd_keys_hf if not k.endswith('.attn.bias')]
transposed = ['attn.c_attn.weight', 'attn.c_proj.weight', 'mlp.c_fc.weight', 'mlp.c_proj.weight']
# sdとhf_sdのパラメータ名が同じ数であることを確認
assert len(sd_keys_hf) == len(sd_keys), f"mismatched keys: {len(sd_keys_hf)} != {len(sd_keys)}"
# sdとhf_sdのtransformerブロックの重み名が同じであることを確認
for k in sd_keys_hf:
if any(k.endswith(w) for w in transposed):
assert sd_hf[k].shape[::-1] == sd[k].shape
with torch.no_grad():
sd[k].copy_(sd_hf[k].t())
else:
assert sd_hf[k].shape == sd[k].shape, f"mismatched keys: {sd_hf[k].shape} != {sd[k].shape}"
with torch.no_grad():
sd[k].copy_(sd_hf[k])
return model
コードのコメントを見れば大丈夫です。
Tips⚠️:HF からダウンロードされたlm_head.weight
と transformer.wte.weight
の形状は同じで、どちらも です。これは入力埋め込みと出力 logits であり、これらは一致している必要があります。これは、トークンが特徴ベクトルに埋め込まれたときの意味が、相互作用の後に出力されるときに元のトークンに戻ることができることを反映します。同時に、 であり、大量のメモリを節約できます。
Forward#
# class GPT内で
def forward(self, idx, target):
# 入力時の次元は[batch, トークンの長さ]
B, T = idx.size()
# トークンの長さはコンテキストを超えてはいけません
assert T <= self.config.block_size, f"入力コンテキストの長さ制限を超えています {T-self.config.block_size} token"
# pos [0,1,2,..,T-1]、デバイスに配置することを忘れずに
pos = torch.arange(0, T, dtype=torch.long, device=idx.device)
# 位置埋め込み
pos = self.transformer.wpe(pos) #(T, n_embd)
# トークン埋め込み
tok = self.transformer.wte(idx) #(B, T, n_embd)
# (T, n_embd)の次元で数値を加算
x = tok + pos
# transformerブロックを通過
for block in self.transformer.h:
x = block(x)
# 最終層の正規化
x = self.transformer.ln_f(x)
# 線形層の出力
logits = self.lm_head(x) #(B, T, vocab_size)
loss = None
# targetがある場合は、つまりラベルがある場合は訓練を行い、損失関数を計算します。そうでなければ推論だけで済みます。
if target is not None:
loss = F.cross_entropy(logits.view(-1, logits.size(-1)), target.view(-1))
return logits, loss
# 小試牛刀
num_return_sequences = 5
max_length = 30
model = GPT.from_pretrained('gpt2')
# evalは評価時にドロップアウト層を無効にし、バッチノーマル化にも異なる反応を示し、パラメータを固定します。
model.eval()
# モデルをGPUに移動
model.to('cuda')
# 以下はトークン化で、openaiのtiktokenライブラリを使用します。原理を知りたい場合は、記事の冒頭のブロガーのブログを参照することをお勧めします。
import tiktoken
enc = tiktoken.get_encoding('gpt2')
tokens = enc.encode("Hello, I'm a language model, ")
tokens = torch.tensor(tokens, dtype=torch.long) # [8, ]
tokens = tokens.unsqueeze(0).repeat(num_return_sequences, 1) # [5,8]
x = tokens.to('cuda')
while x.size(1) < max_length:
with torch.no_grad():
# モデルに入力して結果を得る
logits, loss = model(x) # x: [B, T] logits:[B,T,C]
# 最後のトークンの予測を取得
logits = logits[:, -1, :] # [B, 1, C]
# 最後の次元Cに対してsoftmaxを取る
probs = F.softmax(logits, dim=-1) # [B, 1, C]
# 最後の次元Cからtopkの確率と対応するインデックスを選択
topk_probs, topk_indices = torch.topk(probs, 50, dim=-1)
# topkの確率からランダムに1つの確率を選択
ix = torch.multinomial(topk_probs, 1)
# 選択された確率に対応するインデックスを見つける
xcol = torch.gather(topk_indices, -1, ix)
# 得られた出力トークンをxに追加し、再び入力として使用します [B, T+1]
x = torch.cat((x, xcol), dim=1)
トークン化の形式: "Hello, I'm a language model," を [15496, 11, 314, 1101, 257, 3303, 2746, 11, 220] に変換します。
以下のウェブサイトを参考にして自分で試すことができます。
https://tiktokenizer.vercel.app/
初期化#
データセット#
device = 'cpu'
if torch.cuda.is_available():
device = 'cuda'
# これはmacのMチップシリーズです
elif hasattr(torch.backends, "eps") and torch.backends.mps.is_available():
device = 'mps'
print("Using device: ", device)
import tiktoken
enc = tiktoken.get_encoding('gpt2')
with open('input.txt', 'r') as f:
data = f.read()
text = data[:1000]
tokens = enc.encode(text)
B, T = 4, 32
buf = torch.tensor(tokens[:B*T+1])
buf.to(device)
# 本質的には前n個の単語からn+1個の単語を推測します
x = buf[:-1].view(B, T)
y = buf[1:].view(B,T)
model.GPT(GPTConfig())
model.to(device)
logits, loss = model(x)
print(loss.item())
ここでloss
の値は約 11 になります。なぜならだからです。
単一バッチのトレーニングコード
# AdamWオプティマイザーを使用します。AdamとSGDの違いは自分で調べてください。
optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4)
for i in range(50):
# オプティマイザーの勾配をゼロにします
optimizer.zero_grad()
# logitsと損失を取得
logits, loss = model(x, y)
# 逆伝播で導出を求めます
loss.backward()
# 導出を使用して元のパラメータを更新します
optimizer.step()
Adam オプティマイザーは SGD よりも早く収束します。
データローダー関数
class DataLoaderLite():
def __init__(self, B, T):
self.B = B
self.T = T
# input.txt全体を読み込みます
with open('input.txt', 'r') as f:
data = f.read()
enc = tiktoken.get_encoding('gpt2')
tokens = enc.encode(data)
self.tokens = torch.tensor(tokens, dtype=torch.long)
print(f"load {len(self.tokens)} tokens")
print(f"1 epoch = {len(self.tokens)//(B*T)} batched")
# 現在のバッチの位置を定義します
self.current_position = 0
def next_batch(self):
B, T = self.B, self.T
buf = self.tokens[self.current_position: self.current_position+B*T+1]
x = buf[:-1].view(B, T)
y = buf[1:].view(B, T)
# 各バッチにはB*T個の要素対があります
self.current_position += B*T
# バッチがトークンを使い果たした場合、再びtokens[0]に戻ります
if self.current_position+B*T+1 > len(self.tokens):
self.current_position = 0
return x, y
トレーニングコードの修正
train_loader = DataLoaderLite(4, 32 )
optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4)
for i in range(50):
optimizer.zero_grad()
x, y = train_loader.next_batch()
x, y = x.to(device), y.to(device)
logits, loss = model(x, y)
loss.backward()
optimizer.step()
print(f"step: {i}, loss: {loss.item()}")
重み#
def _init_weights(self, module):
if isinstance(module, nn.Linear):
std = 0.02
if hasattr(module, ' '):
std = std * (2*self.config.n_layer**-0.5)
torch.nn.init.normal_(module.weight, mean=0, std=std)
if module.bias is not None:
torch.nn.init.zeros_(module.bias)
elif isinstance(module, nn.Embedding):
torch.nn.init.normal_(module.weight, mean=0, std=0.02)
std = std * (2*self.config.n_layer)**-0.5
:ここでの分散は残差流の寄与を考慮したもので、各残差接続は入力 input に等しい寄与をもたらすことを示しています。したがって、因子が必要です。 ここでは Pre-Norm の残差接続による分散の過大評価を制御しています。ここでの 2 は、各層内で Attention と MLP がそれぞれ 1 回ずつ残差を使用するためです。
std
:std の値の出所も根拠があります。GPT2 の文書によれば、最適なのは のあたりです。