Saturday, March 7, 2026

Welcome to the world of AI - Putting it all together. Building and training fully functional Decoder-Only transformer

In the first post, we learned about temperature, top_k and top_p. We then built a Decoder-Only Transformer using pure NumPy in the second post. The third post we took advantage of PyTorch.

In this final post, we put the raw code needed to run a full decoder only transformer, to generate baby names. Hope you enjoyed this series. As always, if you think there is something I should have done differently, do not hesitate to reach out. 

'''

## "Welcome to the world of AI" 
#### Putting it all together. Building and training fully functional Decoder-Only transformer .

Ok, in the previous two posts, we built a Decoder Only transformer using pure NumPy. We then use PyTorch to build a transformer. This was however done in Jupyter notebook. Let's write a real script that we can run on any text based dataset to generate similar text. 

I will stick with my baby names dataset to keep this simple

References:
https://docs.python.org/3/library/argparse.html

$ clear && python3 baby_name_gpt.py --filename names.txt --d_model=32 --n_heads=4 --n_layers=2 --epochs=10000 --temperature=1.3 --top_p=0.90

'''

#baby_name_gpt.py

import argparse
import torch
import torch.nn as nn
import torch.nn.functional as F

# Set the seed for reproducibility
torch.manual_seed(42)

CONTEXT_WINDOW_LENGTH = 16  # Max tokens the model can process at once

# Setup the argument parser
arg_parser = argparse.ArgumentParser(prog='gpt.py', description='A mini GPT', epilog='www.securitynik.com')

# Add arguments
arg_parser.add_argument('-f', '--filename', required=True, help='/path/to/some_file with text to learn from')
arg_parser.add_argument('-d', '--d_model', type=int,  help='Embedding dimension of the model')
arg_parser.add_argument('-n', '--n_heads', type=int, help='Number of heads')
arg_parser.add_argument('-l', '--n_layers', type=int, help='Number of layers')
arg_parser.add_argument('-e', '--epochs', type=int, help='Number of training ')
arg_parser.add_argument('-b', '--batch_size', type=int, help='Batch size')
arg_parser.add_argument('-t', '--temperature', type=float, help='temperature')
arg_parser.add_argument('-k', '--top_k', type=int, help='top_k')
arg_parser.add_argument('-p', '--top_p', type=float, help='top_p')

args = arg_parser.parse_args()

# Setup a function to read the data
def get_data(input_file=None):
    print(f'🚀 Getting data ...')
    try:
        with open(file=input_file, mode='r') as fp:
            data = fp.read()
            print(f'✅ Successfully read: {len(data)} bytes of data.')
            return data
    except Exception as e:
        print(f'Error encountered: {e}')


# Tokenize the data:
def tokenizer(data=None):
    chars = sorted(list(set(data)))
    print(f'Chars: {repr("".join(chars))}')

    vocab_size = len(chars)
    print(f'✅ Vocab size: {vocab_size} tokens')

    # Encode the chars to numbers
    stoi = { ch:idx for idx,ch in enumerate(chars)}

    # Decode
    itos = {idx:ch for ch,idx in stoi.items()}
    
    return stoi, itos, int(vocab_size)


# Perform the encoding of text
def encode_data(tokenizer=None, data=None):
    print(f'🚀 Encoding the data ...')
    return torch.tensor([ tokenizer.get(ch) for ch in data ], dtype=torch.long)


# Perform the decoding of numbers
def decode_tokens(tokenizer=None, data=None):
    print(f'🚀 Decoding the data ...')
    return ''.join([ tokenizer.get(i) for i in data ])


# Split the data into train and test sets
def train_test_split(tokens=None):
    print(f'🚀 Splitting into train and test sets ...')
    # Use 90% for training and 10 for test
    n = int(len(tokens) * 0.9)
    X_train = tokens[:n]
    X_test = tokens[n:]
    
    print(f'✅ X_train.shape: {X_train.shape} | X_test.shape: {X_test.shape} ...')

    return X_train, X_test


# Generate batches fo data
def generate_batch(X_train=None, X_test=None, split='train',  batch_size=32):

    X = X_train if split=='train' else X_test
    idx = torch.randint(low=0, high=len(X) - CONTEXT_WINDOW_LENGTH, size=(batch_size,))

    X_batch = torch.stack(tensors=[ X[i:i + CONTEXT_WINDOW_LENGTH] for i in idx], dim=0)

    y_batch = torch.stack(tensors=[ X[i+1:i + CONTEXT_WINDOW_LENGTH + 1] for i in idx], dim=0)
    
    return (X_batch, y_batch)


# Create the GPT Embeddings
class GPTEmbeddings(nn.Module):
    def __init__(self, vocab_size=0, d_model=32):
        super(GPTEmbeddings, self).__init__()

        #self.device = device

        # Token embeddings
        self.tok_embeddings = nn.Embedding(num_embeddings=vocab_size, embedding_dim=d_model)

        # Positional embeddings
        self.pos_embeddings = nn.Embedding(num_embeddings=CONTEXT_WINDOW_LENGTH, embedding_dim=d_model)

    def forward(self, x):
        #x: (B, T)
        # print(f'==[DEBUG]== {x.size()}')
        B, T = x.size()

        # Setup positions
        positions = torch.arange(T)
        pos_emb = self.pos_embeddings(positions) # (B, T, D)
        tok_emb = self.tok_embeddings(x)    # (B, T, D)

        return pos_emb + tok_emb # (B, T, D)


# Setup the MultiHead attention
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model=32, n_heads=4):
        super(MultiHeadAttention, self).__init__()

        # Verify the embedding dimension size vs n_heads
        assert d_model % n_heads == 0, f'd_model: {d_model} is not divisible by n_heads: {n_heads}'

        self.d_model = d_model
        self.n_heads = n_heads
        self.head_dim = d_model // n_heads

        # Fused QKV Projection matrix
        self.qkv_proj = nn.Linear(in_features=d_model, out_features=3*d_model, bias=False)

        # Output projection
        self.out_proj = nn.Linear(in_features=d_model, out_features=d_model, bias=False)

    def forward(self, x):
        #x: (B, T, D)
        B, T, D = x.size()

        qkv = self.qkv_proj(x) # ( B, T, D*3)

        # Reshape to separate heads
        qkv = qkv.view(B, T, 3, self.n_heads, self.head_dim)
        qkv = qkv.permute(2,0,3,1,4) # (3, B, n_heads, T, head_dim)

        # Create the Q K V
        Q, K, V = qkv[0], qkv[1], qkv[2] 

        # Leverage Flash compatible attention
        attn_out = F.scaled_dot_product_attention(
            query=Q, key=K, value=V,
            attn_mask = None,
            dropout_p = 0.0,
            is_causal = True,
        ) # (B, n_heads, T, head_dim)

        # Fuse/merge the heads back together
        attn_out = attn_out.transpose(1, 2).contiguous()

        # Reshape for final output
        attn_out = attn_out.view(B, T, D)

        return self.out_proj(attn_out)



# Setup the FFN
class FFN(nn.Module):
    def __init__(self, d_model=32):
        super(FFN, self).__init__()

        # This /3 has to do with the choice of SwiGLU activation rather than ReLU or GELU and the need to control model representation capacity while maintaing the computation similar to GPT with 4*d_model
        hidden_dim = int(8 * d_model / 3)

        # Setup the parallel projections
        # This also has to do with SwiGLU
        self.ln1 = nn.Linear(in_features=d_model, out_features=hidden_dim, bias=False)
        self.ln2 = nn.Linear(in_features=d_model, out_features=hidden_dim, bias=False)

        # Setup the output projection
        self.ln3 = nn.Linear(in_features=hidden_dim, out_features=d_model, bias=False)
        
    def forward(self, x):
        # x (B, T, D)
        x = F.silu(self.ln1(x) * self.ln2(x))
        x = self.ln3(x)
        return x


# GPT Decoder Block
class DecoderBlock(nn.Module):
    def __init__(self, d_model=32, n_heads=4 ):
        super(DecoderBlock, self).__init__()

        # Setup the norm
        self.norm1 = nn.RMSNorm(normalized_shape=d_model)
        self.mha = MultiHeadAttention(d_model=d_model, n_heads=n_heads)

        self.norm2 = nn.RMSNorm(normalized_shape=d_model)
        self.ffn = FFN(d_model=d_model)


    def forward(self, x):
        # In this case, we are using the pre-norm attention
        # Applying the add and norm before going into self-attention
        x = x + self.mha(self.norm1(x))

        # Apply the second add and norm before going into the FFN
        x = x + self.ffn(self.norm2(x))

        return x


# Setup the GPT
class GPT(nn.Module):
    def __init__(self, vocab_size=0, d_model=32, n_heads=4, n_layers=4):
        super(GPT, self).__init__()

        self.embeddings = GPTEmbeddings(vocab_size=vocab_size, d_model=d_model)

        self.blocks = nn.ModuleList(
            [  DecoderBlock(d_model=d_model, n_heads=n_heads) for _ in range(n_layers) ]
            )

        # Final layernorm before going into the language head
        self.norm = nn.RMSNorm(normalized_shape=d_model)    

        # LM Head
        self.lm_head = nn.Linear(in_features=d_model, out_features=vocab_size, bias=False)

        # Take advantage of weight tying
        self.lm_head.weight = self.embeddings.tok_embeddings.weight    

        # This is to scale the weights, if not the model starts with a very high loss
        self.apply(self._init_weights)


    # Define the weights
    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            nn.init.normal_(module.weight, mean=0.0, std=0.02)

            if module.bias is not None:
                nn.init.zeros_(module.bias)
        
        elif isinstance(module, nn.Embedding):
            nn.init.normal_(module.weight, mean=0, std=0.02)


    def forward(self, x):
        x = self.embeddings(x)

        for block in self.blocks:
           x = block(x)

        # Final norm before going into the language head
        x = self.norm(x)

        # Get the logits
        logits = self.lm_head(x)

        return logits

    
    # Generate sample names
    def _generate(self, idx, max_new_tokens=10, temperature=1, new_line_token: torch.long = 0, top_k=None, top_p=None ):
        # idx: (B, T) starting token indices 
        if temperature <= 0:
            temperature = 0.1

        print(f'==[DEBUG]== Generating ... ')
        # Put the model in eval model
        self.eval()

        for _ in range(max_new_tokens):
            # First crop the context to context window length if needed
            idx_cond = idx[:, -CONTEXT_WINDOW_LENGTH: ]

            # Forward pass to get the logits
            logits = self(idx_cond) # (B, T, vocab_size)

            # Take the logits for the final time sep
            logits = logits[:, -1, :]   # (B, vocab_size)

            # Apply temperature
            logits = logits / temperature

            # Extract the top_k probabilities
            # set everything else to -inf
            if top_k is not None:
                v, _ = torch.topk(logits, top_k)
                logits[logits < v[:, [-1]]] = float('-inf')

            # Set top_p
            if top_p is not None:
                sorted_logits, sorted_indices = torch.sort(logits, descending=True)
                sorted_probs = F.softmax(sorted_logits, dim=-1)
                cumulative_probs = torch.cumsum(sorted_probs, dim=-1)

                sorted_indices_to_remove = cumulative_probs > top_p
                sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone()
                sorted_indices_to_remove[..., 0] = False

                indices_to_remove = sorted_indices_to_remove.scatter(1, sorted_indices, sorted_indices_to_remove )
                logits[indices_to_remove] = float('-inf')

            
            # Convert the logits to probabilities
            probs = F.softmax(logits, dim=-1)

            # Based on the probabilities, sample the next token
            next_token = torch.multinomial(input=probs, num_samples=1, replacement=True) # (B, 1)
            
            # Append to the existing sequence
            idx = torch.cat((idx, next_token), dim=-1)

            # Stop if new line is generated
            #if (next_token == new_line_token).all():
            #    break
        
        return idx



# Configure the optimizer for weight decaying and parameter grouping
def configure_optimizer(model=None, weight_decay=0.1, learning_rate=3e-3, betas=(0.9, 0.95)):

    # Setup two sets to track decay
    decay_params = []
    no_decay_params = []

    # for module in model.modules():
    for name, param in model.named_parameters():
        if not param.requires_grad:
            continue
        
        # Apply weight decay only to linear weights
        if name.endswith('weight') and 'norm' not in name and 'embedding' not in name:
            decay_params.append(param)
        else:
            no_decay_params.append(param)

    # Remove duplicates
    decay_ids = { id(p):p for p in decay_params }
    no_decay_ids = { id(p):p for p in no_decay_params }
    assert set(decay_ids).isdisjoint(set(no_decay_ids))
    
    # Setup our optimizer groups
    optim_groups = [
        { 'params' : decay_params, 'weight_decay' : weight_decay },
        # No decaying these parameters
        { 'params' : no_decay_params, 'weight_decay' : 0.0 }
    ]

    optimizer = torch.optim.AdamW(
        params = optim_groups,
        lr = learning_rate,
        betas = betas
    )
    return optimizer


# Setup the evaluation loop
# Disable gradient tracking
@torch.no_grad()
def estimate_loss(model, X_train=None, X_test=None, vocab_size=None, batch_size=32, eval_iters=50):
    # put the model in eval mode
    model.eval()

    losses = { 'train' : 0, 'test' : 0 }
    
    for split in ['train', 'test']:
        total_loss = 0.0

        for _ in range(eval_iters):
            xb, yb = generate_batch(X_train=X_train, X_test=X_test, batch_size=batch_size)

            logits = model(xb)

            loss = F.cross_entropy(
              input=logits.view(-1, vocab_size), target=yb.view(-1) 
              )
            
            # Track the loss
            total_loss += loss.item()
        
        losses[split] = total_loss / eval_iters 

    model.train()
    return losses



# Define the training loop
def train(model=None, optimizer=None, X_train=None, X_test=None, vocab_size=None, batch_size=64, epochs=10, eval_interval=10, grad_clip=1.0):
    print(f'✅ Beginning training ...')

    model.train()
    for epoch in range(epochs):

        # Evaluate the model periodically
        if epoch % eval_interval == 0:
            losses = estimate_loss(model=model, X_train=X_train, X_test=X_test, vocab_size=vocab_size, batch_size=batch_size)
                        
            print(f'Epoch: {epoch+1} | loss: {losses}')

        # Get Batch
        xb, yb = generate_batch(X_train=X_train, X_test=X_test, split='train')

        # Forward to get the logits
        logits = model(xb)

        # Calculate the loss
        loss = F.cross_entropy(
            input=logits.view(-1, vocab_size), target=yb.view(-1)
            )
        
        # Back propagate
        loss.backward()

        # Clip the gradients
        torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip)

        # Update the parameters
        optimizer.step()

    # Return the model
    return model


def main():
    print(f'🚀 Launching {__file__}')

    # Read the arguments
    file_name = args.filename
    d_model = args.d_model if args.d_model else 32 
    n_heads = args.n_heads if args.n_heads else 4
    n_layers = args.n_layers if args.n_layers else 4
    epochs = args.epochs if args.epochs else 10
    batch_size = args.batch_size if args.batch_size else 64
    temperature = args.temperature if args.temperature else 0.1
    top_k = args.top_k if args.top_k else None
    top_p = args.top_p if args.top_p else None
    #print(f'==[DEBUG]== filename: {file_name} | d_model: {d_model} | n_heads: {n_heads}')

    data = get_data(file_name)
    
    stoi, itos, vocab_size = tokenizer(data=data)
    tokens_encoded = encode_data(tokenizer=stoi, data=data)
    X_train, X_test = train_test_split(tokens=tokens_encoded)
    
    # Setup the model
    model = GPT(vocab_size=vocab_size, d_model=d_model, n_heads=n_heads)

    # get the optimizer
    optimizer = configure_optimizer(model=model, weight_decay=0.1, learning_rate=3e-4)    

    model = train(model=model, optimizer=optimizer, X_train=X_train, X_test=X_test, vocab_size=vocab_size, batch_size=64, epochs=epochs)

    # Generate samples starting from the new line char
    new_line_token = stoi['\n']
    start_token = torch.tensor([[new_line_token]], dtype=torch.long)

    generated = model._generate(idx=start_token, new_line_token=new_line_token, max_new_tokens=50)

    name = ''.join([ itos[i.item()] for i in generated[0] ])
    print(f'{name}')


if __name__ == '__main__':
    main()

After training for 10,000 epochs, here is the result:

🚀 Launching /home/securitynik/stuff/baby_name_gpt.py
🚀 Getting data ...
✅ Successfully read: 228145 bytes of data.
Chars: '\nabcdefghijklmnopqrstuvwxyz'
✅ Vocab size: 27 tokens
🚀 Encoding the data ...
🚀 Splitting into train and test sets ...
✅ X_train.shape: torch.Size([205330]) | X_test.shape: torch.Size([22815]) ...
✅ Beginning training ...

Epoch: 1 | loss: {'train': 3.3060472202301026, 'test': 3.305421471595764}
Epoch: 11 | loss: {'train': 3.1819068813323974, 'test': 3.183610119819641}
...
Epoch: 9971 | loss: {'train': 1.8318881130218505, 'test': 1.8152394461631776}
Epoch: 9981 | loss: {'train': 1.8336570143699646, 'test': 1.8264712977409363}
Epoch: 9991 | loss: {'train': 1.8365000939369203, 'test': 1.8344433832168578}

==[DEBUG]== Generating ... 

mylan
rayona
skaynor
reem
rhil
reiann
sherom
reton

From my perspective, these all look like possible names. 

Well hey, hope you enjoyed this series. Do let me know what you think I could have done differently.

Posts in this series:
   - Git Notebook
   - Git Notebook: 
   - Git Notebook: 


Welcome to the world of AI - Learning about the Decoder-Only transformer - From scratch with PyTorch

In this third in this series post, we build on what we did in the previous post to now build GPT from scratch. We will leverage Andrej Karpathy Makemore series

Where as Andrej used Tiny Shakespeare, we will use the baby names dataset that he used in one of his earlier trainings

Import the libraries

import torch
import torch.nn as nn
import torch.nn.functional as F

import matplotlib.pyplot as plt

Preparing our hyperparameters for the model.
# Let us config a data class
class Config:
    d_model = 16    # The embedding dimensions
    n_heads = 4     # When we get to multi-head attention, we will need this
    d_head = 4      # We could calculate this manually by doing d_model // n_heads
    n_layers = 2    # We are going to stack two layers  
    batch_size = 1  # Batch size of 1
    n_epochs = 1000 # Number of epochs
    lr = 0.01      # Step size of Gradient Descent
    eval_iters = 10 # Evaluate the model every 10 epochs

# instantiate the config 
cfg = Config()

Getting our data:
# Let's get our data
with open(file='names.txt', mode='r') as fp:
    text = fp.read()

# Get a sample of the names
print(text[:32])
-----------
emma
olivia
ava
isabella
sophia

Let's build a function to create our vocab
This is overkill but hey, we should learn to write dry code as much as possible ;-)

# Let's build a function to create our vocab
# This is overkill but hey, we should learn to write dry code as much as possible ;-)
def build_vocab(text):
    '''
    text: The full text 
    return:
        chars: The chars in vocabulary
        stoi: maps/encodes characters to numbers
        itos: unmaps/decode numbers back to characters
    '''
    chars = sorted(list(set(text))) # get a list of unique characters in the input text
    stoi = { ch:i for i,ch in enumerate(chars, start=0)} 
    itos = { i:ch for ch,i in stoi.items()}
    return chars, stoi, itos


# Test the function
chars, stoi, itos = build_vocab(text)

print(f'[*] Here are the characters: {chars}')
print(f'[*] Here are the characters: {"".join(chars)}')
print(f'[*] Here is the stoi mapping/encoding: {stoi}')
print(f'[*] Here is the itos un-mapping/decoding: {itos}')

# Setup the vocab size 
vocab_size = len(chars)
print(f'Vocab size / unique tokens: {vocab_size}')
--------------
[*] Here are the characters: ['\n', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']
[*] Here are the characters: 
abcdefghijklmnopqrstuvwxyz
[*] Here is the stoi mapping/encoding: {'\n': 0, 'a': 1, 'b': 2, 'c': 3, 'd': 4, 'e': 5, 'f': 6, 'g': 7, 'h': 8, 'i': 9, 'j': 10, 'k': 11, 'l': 12, 'm': 13, 'n': 14, 'o': 15, 'p': 16, 'q': 17, 'r': 18, 's': 19, 't': 20, 'u': 21, 'v': 22, 'w': 23, 'x': 24, 'y': 25, 'z': 26}
[*] Here is the itos un-mapping/decoding: {0: '\n', 1: 'a', 2: 'b', 3: 'c', 4: 'd', 5: 'e', 6: 'f', 7: 'g', 8: 'h', 9: 'i', 10: 'j', 11: 'k', 12: 'l', 13: 'm', 14: 'n', 15: 'o', 16: 'p', 17: 'q', 18: 'r', 19: 's', 20: 't', 21: 'u', 22: 'v', 23: 'w', 24: 'x', 25: 'y', 26: 'z'}
Vocab size / unique tokens: 27

Setup our encoder and decoder functions as we did in the previous post.
# With above in place, let us setup an encoder function
encode = lambda text, stoi: [ stoi.get(ch) for ch in text ]

# Test the encoder
encode(text='securitynik', stoi=stoi)
-------------
[19, 5, 3, 21, 18, 9, 20, 25, 14, 9, 11]

Similarly, the decoder that maps us back from numbers to texts.

# Similarly setup a decoder
# This maps us back from numbers to chars
decode = lambda indices, itos: ''.join([ itos.get(i) for i in indices ])

# Test the encoder
decode(encode(text='securitynik', stoi=stoi), itos=itos)

Setup the tokens from the full text. This is just us starting the process of converting the entire raw text of baby names into something the computer can use.

tokens = torch.tensor(encode(text=text, stoi=stoi), dtype=torch.long)

# This tensor of size: 228145 represents all the characters in text
# that makes up the different baby names
print(f'Here are the tokens: \n{tokens} | tokens dtype: {tokens.dtype} | shape: {tokens.shape} | Dims: {tokens.ndim}')

# If we print the first 3 chars, we se emm
# The last 3 chars are yzx
print(text[:3], text[-3:])
-----------
Here are the tokens: 
tensor([ 5, 13, 13,  ..., 25, 26, 24]) | tokens dtype: torch.int64 | shape: torch.Size([228145]) | Dims: 1
emm yzx


# Let us visualize above
def plot_token_indices(tokens, title='Token Indices over time'):
    '''
    tokens: np.array of shape (B, T)
    '''
    #assert tokens.shape[0] == 1, f'We are working with 1 full row'
    t = torch.arange(50)
    plt.figure(figsize=(15,6))
    plt.title(title)
    plt.bar(x=t, height=tokens[:t.max()+1])
    plt.xticks(ticks=range(0, len(t),1), labels=text[:len(t)], rotation=90)
    plt.yticks(ticks=range(0,len(chars),1))
    plt.ylabel('Token Index')
    plt.xlabel('Sequence')
    plt.grid(axis='y')
    plt.show()

# Test the function
plot_token_indices(tokens=tokens)


As with all machine learning we generally split our data into train and test sets or train, test and validation split. We will have train and test sets. We will use 90% of the data for training and 10 for testing. =============== 

n = int(len(text) * 0.9)

# This is our train data
X_train = tokens[:n]
print(f'Train data shape: **{X_train.shape}**')

# The remainder will be our test data
# This is how we will test the model's performance
X_test = tokens[n:]
print(f'Test data shape: **{X_test.shape}**')
---------------
Train data shape: **torch.Size([205330])**
Test data shape: **torch.Size([22815])**

Now that we have our tokens for training and testing, let us setup our context window. The context window is the maximum number of tokens the model can use to generate/predict the next token. In this case our model is character based. Therefore we want to predict the next character. We will sample random tokens up to length context_window_length. 

context_window_length = 8

Before adding the data, let us understand our objective. For the X_train, we want to go up to context length. For the y_train, we go context length + 1

# This is the input
print(X_train[:context_window_length])

# For the y_train, we want to go index + 1
# These are the targets
print(X_train[1:context_window_length + 1])
------------
tensor([ 5, 13, 13,  1,  0, 15, 12,  9])
tensor([13, 13,  1,  0, 15, 12,  9, 22])

What do we take away from the output? Note this is in context of the data above only, we want when the input is 6, the target as in the value to predict is 14. When the input is 6,14, the model should predict 14. When the input is 6,14,14 the model should predict 2. .... Until in this case, when we get to  6, 14, 14,  2,  1, 16, 13, 10, the model should predict 23

In these examples, the model is learning multiple combinations of the input as it predicts the targets. The model should be able to learn context from as little as one up to context length, to be able to predict context_window_length + 1 So rather than only given up to - in this case - 8 characters, we can give as little as one and get the model to predict what comes next. If for some reason you have more characters than context_window_length, then the model should truncate your data up to context_window_length.   

Let us now take what we learned above, to start preparing our data for the transformer. At this point, we have T (time dimension), we need to get the batch dimension also, so we can put multiple rows in at one time.

Let's use a batch size of 4 sample at a time. Just using 4 to keep our view cleaner and easier as we move through.
I thought about 8 but when you see (8,8) for (B, T) vs (4, 8), I think (4,8) is a little easier to understand.

batch_size = 4

# setup a small function to generate that batches
def generate_batch(X, batch_size=batch_size):
    '''
    X: input data (T)
    batch_size: int (B)

    Returns:
        (B, T)
    '''
    
    # Setup some random indices to sample from
    # This will be 0 to the number of items in X - context_window_length
    # context_window_length is currently 8
    # This will generate 8 random values
    idx = torch.randint(low=0, high=len(X) - context_window_length, size=(batch_size,))

    # Use those random values to get our X_batch
    # Once we have each of the batches
    # create a new dimension B and stack them vertically
    X_batch = torch.stack(tensors=[ X[i:i + context_window_length] for i in idx], dim=0)

    # With the X_batch in place, let's get the targets -> y_batch
    # We will reuse above with a small tweak
    y_batch = torch.stack(tensors=[ X[i+1:i + context_window_length + 1] for i in idx], dim=0)
    
    # Let's return or X_batch and y_batch
    return (X_batch, y_batch)

Let us now test the function

X_tmp, y_tmp = generate_batch(X=X_test)

print(f'Here is X_tmp has shape: {X_tmp.size()}: \n{X_tmp}')

# print the y_tmp
print(f'\nHere is y_tmp has shape: {y_tmp.size()}: \n{y_tmp}')
------------------
Here is X_tmp has shape: torch.Size([4, 8]): 
tensor([[15, 14,  0,  4,  1,  5,  4, 18],
        [ 0,  1, 12,  5, 11, 19,  5, 10],
        [ 1, 22,  9,  5, 18,  0, 25,  1],
        [21,  5,  0,  5, 18,  8,  1, 14]])

Here is y_tmp has shape: torch.Size([4, 8]): 
tensor([[14,  0,  4,  1,  5,  4, 18,  9],
        [ 1, 12,  5, 11, 19,  5, 10,  0],
        [22,  9,  5, 18,  0, 25,  1, 22],
        [ 5,  0,  5, 18,  8,  1, 14,  0]])


What do you take away from above? 
First we have 8 rows (B). 
This is our batch size of 8   
You see this shape/size in both the X_tmp and y_tmp

Let us take the first row in X_tmp and the correcting first row in y_tmp. This is the first batch of 8 tokens in the (1,T).  
Note my explanation below is in context of the output above. We 

When the model see 1 in X_tmp, we would like it to predict 4. When the model has input X_tmp of 1,4, we would like it to predict 16. Similarly, when the model sees 1,4,16, we would like it to predict 5. As you can see, this is much like what we discussed earlier. Difference being now that we have the batch of 8 items.  

With our data, let us start building our model from scratch.

Let us build a single head attention mechanism. We are not going to use this in the end but are building up, because it is a single head, we will use d_model as the head size. We actually did this in the previous post with NumPy. However, because I am using PyTorch, I wanted to walk through the same process.

class SingleHeadAttention(nn.Module):
    ''' Single attention head'''
    def __init__(self, ):
        super(SingleHeadAttention, self).__init__()

        # Setup our three projection matrices
        # The bias is usually disabled, so only W @ X not W @ X + b
        self.query = nn.Linear(in_features=cfg.d_model, out_features=cfg.d_model, bias=False)
        self.key = nn.Linear(in_features=cfg.d_model, out_features=cfg.d_model, bias=False)
        self.values = nn.Linear(in_features=cfg.d_model, out_features=cfg.d_model, bias=False)
    
        # Setup our triangular matrix for the mask
        self.register_buffer('tril', torch.tril(torch.ones(context_window_length, context_window_length)))
    
    def forward(self, x):
        # x (B, T, d_model)
        # Capture that shape information
        B, T, D = x.size()

        # project the x into the query, keys and values
        Q = self.query(x)   # (B, T, d_model)
        K = self.key(x)     # (B, T, d_model)
        V = self.values(x)  # (B, T, d_model)

        # calculate our attention scores
        # Q has shape (B, d_model, d_model) and K has shape ((B, d_model, d_model))
        attn_scores = Q @ K.transpose(-2, -1) # (B, T, T)

        # scale the scores 
        scaled_attn_scores = attn_scores / cfg.d_model**.5 # (B, T, T)

        # Add the mask
        masked_scores = scaled_attn_scores.masked_fill(self.tril[:T, :T] == 0, float('-inf')) # (B, T, T)

        # Get the weights via softmax
        attn_weights = F.softmax(masked_scores, dim=-1) # (B, T, T)
        
        # Get the seighted sum of the values
        attn_out = attn_weights @ V # (B, T, d_model)

        return attn_out

# Test the class
single_head_attention = SingleHeadAttention()

# Create one batch of dummy data to test our model
# We assume this is our input embeddings (token + position)
tmp_x = torch.rand((1, context_window_length, cfg.d_model))
out_single_head_attention = single_head_attention(tmp_x)
out_single_head_attention.shape
-------------
torch.Size([1, 8, 16])

With confirmation that above works, we could plug this into our model below. Note this will be replaced but I will leave the line commented out when we get to our multi-head attention.

That head_size parameter above is temporary. We will determine the head_size automatically, once we know the number of heads. Anyhow, this still works for now

The Transformer architecture also has a Feed Forward Network. Let's implement that.

# Setup the feed forward network
class FeedForward(nn.Module):
    '''The linear layer for the transformer decoder block '''
    def __init__(self, hidden_dim=cfg.d_model*4):
        super(FeedForward, self).__init__()

        # This operation is being performed on a per token basis
        # it is also being done independently
        self.net = nn.Sequential(
            nn.Linear(in_features=cfg.d_model, out_features=hidden_dim),
            nn.GELU(),
            nn.Linear(in_features=hidden_dim, out_features=cfg.d_model)
        )

    def forward(self, x):
        return self.net(x)  # (B, T, d_model)

# Test the function
ffn = FeedForward()
ffn(out_single_head_attention).shape
-------------
torch.Size([1, 8, 16])

With our FFN is working, let us move towards a multi-head attention.


class MultiHeadAttention(nn.Module):
    def __init__(self, n_heads, d_model):
        super(MultiHeadAttention, self).__init__()
        assert cfg.d_model % n_heads == 0, f'd_model: {cfg.d_model} is not divisible by number of heads: {n_heads}'

        # Get the head dimensions
        # For out demo, this gives us 4 heads
        self.n_heads = n_heads
        self.d_head = cfg.d_model // n_heads
        self.d_model = d_model

        # We use one One matrix for the QKV that we will then split
        # We have *3 because it is the q, k, v
        self.W_qkv_proj = nn.Linear(in_features=d_model, out_features=3*d_model, bias=False)

        # Setup the final linear layer to fuse the data after concatenating the head
        self.W_out_proj = nn.Linear(in_features=d_model, out_features=d_model, bias=False)

        # Whereas in the single head we registered the buffer, we will instead use pytorch built in tools to get the mask


    def forward(self, x):
        # x: (B, T, d_model)
        # Capture those shapes
        B, T, D = x.size()

        # Do our first linear projection
        qkv = self.W_qkv_proj(x) # (B, T, 3*d_model)

        # Get our qkv
        qkv = qkv.view(B, T, 3, self.n_heads, self.d_head) # (B, T, 3, n_heads, d_head)

        # Reshape qkv, so we can extract each of the 3 matrices
        qkv = qkv.permute(2, 0, 3, 1, 4) # (3, B, n_heads, T, d_model)

        # Finally extract the Q, K, V
        # Each of these now have (B, n_heads, T, d_head)
        Q, K, V = qkv[0], qkv[1], qkv[2]

        # Rather than building the mask like we did previously,
        # Let's leverage Torch's efficient implementation of the scaled dot product attention. 
        # https://docs.pytorch.org/docs/stable/generated/torch.nn.functional.scaled_dot_product_attention.html

        attn_output = F.scaled_dot_product_attention(
            query=Q, key=K, value=V, # Our Q, K, V
            attn_mask=None, # No explicit mask needed
            dropout_p=0.0,   # Disable dropout
            is_causal=True,  # Applies lower triangular causal mask
        )   # (B, n_heads, T, d_head)

        # Transpose the attn_output
        # I just use permute her to do something different
        # Let us also ensure we have a contiguous tensor in memory
        attn_output = attn_output.permute(0, 2, 1, 3).contiguous() # (B, T, n_heads, d_head)

        # Reshape now, so that we consolidate back to (B, T, d_model)
        attn_output = attn_output.view(B, T, self.d_model) #(B, T, d_model)

        # Wrap this up with the final project where we fuse the outputs
        out = self.W_out_proj(attn_output)
        
        return out

# Test the function
multihead_self_attention = MultiHeadAttention(n_heads=4, d_model=cfg.d_model)

# Looks like our multi-head attention mechanism is working as expected
multihead_self_attention(tmp_x).shape
-----------------
torch.Size([1, 8, 16])

Setup a Decoder block

class DecoderBlock(nn.Module):
    def __init__(self, d_model, n_heads):
        super(DecoderBlock, self).__init__()
        # Setup two layer norms
        self.ln1 = nn.LayerNorm(normalized_shape=d_model)
        self.ln2 = nn.LayerNorm(normalized_shape=d_model)

        # Multi-head attentions
        self.mha = MultiHeadAttention(n_heads=n_heads, d_model=d_model)

        # Feedforward
        self.ffn = FeedForward(hidden_dim=d_model*4)
    def forward(self, x):
        # Let's leverage residual connection here 
        # We perform layer normalization before passing the input
        # to self-attention
        # by adding the input to the output 

        x = x + self.mha(self.ln1(x))
        x = x + self.ffn(self.ln2(x))
        return x

# Test the function
decoder_block = DecoderBlock(d_model=cfg.d_model, n_heads=4)
decoder_block(tmp_x).shape
-------------
torch.Size([1, 8, 16])

Put it all together.

# implement a class
class BabyNamesModel(nn.Module):
    # Setup our constructor
    def __init__(self, d_model, n_heads):
        # we will inherit from the nn.Module class
        super(BabyNamesModel, self).__init__()

        # Let's setup our embeddings (lookup) table
        # We have 27 unique chars/tokens in our vocab
        # the embedding_dim is the width of our embedding vector
        self.token_embeddings = nn.Embedding(num_embeddings=vocab_size, embedding_dim=d_model)

        # Setup the position embeddings
        # The transformer processes data in parallel
        # thus position/order information is lost
        # Positional embeddings are used to preserve the order
        # This gives every positions its own embedding vector
        self.pos_embeddings = nn.Embedding(num_embeddings=context_window_length, embedding_dim=d_model)

        # Here we use our single attention head
        # self.single_attention_head = SingleHeadAttention()

        # Once we have our multi-head attention, we can comment out the single_attention_head
        # and leverage multi_head
        #self.mha = MultiHeadAttention(n_heads=n_heads, d_model=d_model)

        # Let's add our FFN
        #self.ffn = FeedForward(hidden_dim=d_model * 4)

        # Setup the Decoder Block:
        # Test with one to start
        # self.decoder_block = DecoderBlock(d_model=d_model, n_heads=n_heads)

        # With the decoder block working stack them
        # Let us use blocks
        self.decoder_block = nn.Sequential(
            DecoderBlock(d_model=d_model, n_heads=n_heads),
            DecoderBlock(d_model=d_model, n_heads=n_heads),
            DecoderBlock(d_model=d_model, n_heads=n_heads),
            DecoderBlock(d_model=d_model, n_heads=n_heads),
            nn.LayerNorm(normalized_shape=d_model),
        )

        # Setup the language model head
        self.lm_head = nn.Linear(in_features=d_model, out_features=vocab_size)


    def forward(self, x):
        # x: (B, T)

        # Let's extract those dimensions
        B, T = x.size()

        # Apply the token embeddings 
        tok_embd = self.token_embeddings(x) # (B, T, d_model)

        # Apply the position embeddings
        pos_embd = torch.arange(T) # (T)
        pos_embd = self.pos_embeddings(pos_embd) # (T, d_model)

        # Add the token and positional embeddings to create our first residual
        # Our x here now holds both the token identities and their positions
        x = tok_embd + pos_embd # (B, T, d_model)

        # Apply the single attention head
        #x = self.single_attention_head(x) # (B, T, d_model)

        # Similarly, comment out above
        # Now that we have our Multihead attention
        #x = self.mha(x)

        # Apply the FFN
        #x = self.ffn(x)

        x = self.decoder_block(x)

        # Add the language model head
        logits = self.lm_head(x) # (B, T, vocab_size)

        return logits

# Test the class
model = BabyNamesModel(n_heads=4, d_model=cfg.d_model)

# We test on our X_tmp for now.
# Later we will use our train data properly
model(x=X_tmp).shape
------------------
torch.Size([4, 8, 27])

Setup an optimizer.

optimizer = torch.optim.AdamW(params=model.parameters(), lr=cfg.lr)
optimizer

# Setup our loss function
loss_fn = nn.CrossEntropyLoss(reduction='mean')
loss_fn
-------------
CrossEntropyLoss()


Setup a quick training loop.

print('Training ...')

# Setup the training loop
for epoch in range(cfg.n_epochs):
    X, y = generate_batch(X_train)
    # print(X)
    # print(y)

    # Zero out the gradients
    optimizer.zero_grad(set_to_none=True)
    
    # Get the predictions for the batch
    y_pred = model(X)   # (B, T, vocab_size)
    
    # Need to reshape y_pred to (B*T, vocab_size) 
    # be able to use crossentropy loss 
    y_pred = y_pred.view(-1, vocab_size)

    # We also need to reshape y which is currently (B, T) to (B*T)

    # Now calculate the loss
    loss = loss_fn(input=y_pred, target=y.view(-1))
    loss.backward()
    optimizer.step()

    if epoch % 100 == 0:
        print(f'[*] Epoch: {epoch + 1} | Loss: {loss.item()}')

    #if epoch == 10:
    #    break
----------------
print('Training ...')

# Setup the training loop
for epoch in range(cfg.n_epochs):
    X, y = generate_batch(X_train)
    # print(X)
    # print(y)

    # Zero out the gradients
    optimizer.zero_grad(set_to_none=True)
    
    # Get the predictions for the batch
    y_pred = model(X)   # (B, T, vocab_size)
    
    # Need to reshape y_pred to (B*T, vocab_size) 
    # be able to use crossentropy loss 
    y_pred = y_pred.view(-1, vocab_size)

    # We also need to reshape y which is currently (B, T) to (B*T)

    # Now calculate the loss
    loss = loss_fn(input=y_pred, target=y.view(-1))
    loss.backward()
    optimizer.step()

    if epoch % 100 == 0:
        print(f'[*] Epoch: {epoch + 1} | Loss: {loss.item()}')

    #if epoch == 10:
    #    break

Let us do a quick generation

# Let's generate some names
def generate_baby_names(batch_size=4):
    for _ in range(batch_size):
        # is our current batch, our current context
        X, _ = generate_batch(X=X_train, batch_size=16) # (B, T)

        # We are ensuring that the input is never greater than the context_window_length
        # If we go beyond context_window_length
        # The position embedding table will run out of scope 
        # as we only have positions for up to context_window_length
        idx_cond = X[:, -context_window_length:] # (B, T)
        
        # Get the logits from the model
        logits = model(idx_cond)    # (B, T, d_model)

        # Focus on the last time step
        logits = logits[:, -1, :] # (B, vocab_size)

        # Get the probabilities of the next token
        probs = F.softmax(logits, dim=-1) # (B, vocab_size)

        # Sample from the model
        idx_next = torch.multinomial(input=probs, num_samples=1, replacement=False) 

        # Concatenate the 
        idx = torch.cat((X, idx_next), dim=1)

    return idx

# Test the function
tmp_idx = generate_baby_names(batch_size=10).tolist()
tmp_idx
--------------
[[2, 18, 9, 25, 1, 0, 2, 18, 25],
 [14, 0, 19, 21, 8, 1, 14, 0, 12],
 [0, 1, 4, 25, 12, 25, 14, 14, 1],
 [6, 18, 1, 14, 11, 5, 5, 0, 5],
 [1, 19, 8, 13, 5, 18, 5, 0, 26],
 [5, 0, 8, 15, 12, 12, 25, 14, 0],
 [18, 5, 5, 0, 12, 1, 11, 5, 22],
 [18, 9, 1, 14, 1, 0, 10, 1, 8],
 [12, 21, 26, 9, 1, 14, 1, 0, 13],
 [0, 4, 1, 18, 9, 5, 12, 12, 0],
 [18, 1, 2, 5, 12, 12, 5, 0, 8],
 [0, 18, 15, 19, 1, 12, 9, 14, 20],
 [9, 14, 5, 0, 9, 19, 1, 2, 1],
 [12, 12, 1, 18, 25, 0, 13, 1, 12],
 [1, 18, 0, 3, 1, 13, 5, 12, 12],
 [1, 25, 14, 5, 0, 2, 12, 5, 12]]

Let's now generate some names

# Generate some names from above
print(''.join([itos[j] for i in tmp_idx for j in i]))
------------
saia
savisa
lawsion
rionana
nyasiablegend
creson
burl
dmoni
dlh
kendahdyson
tysdyden
zeloen
deeja
am
jaxyna
jalal
jaernan
jabkeslynn
oelie
zofl

Well that's it for this post. See you in the final post where we wrap this all up.

Posts in this series:
   - Git Notebook
   - Git Notebook: 
   - Git Notebook: 






Welcome to the world of AI - Learning about the Decoder-Only Transformer - From scratch with NumPy

In this post, we build a **Decoder-Only Transformer** from scratch, using **only numpy**.    

I wanted to put this together to see if I can find an easier way to build this very popular architecture, while at the same time, seeing if it helps someone else.  

As you go through, if you find I missed anything or have some suggestions for improvement, please do not hesitate to drop me a line.   

As we go through, we build a decoder-only transformer that can generate baby names.

The original paper for transformer **Attention is all you need**: https://arxiv.org/pdf/1706.03762   

For this problem, we will use character level tokenization. 

Text for training: https:/raw.githubusercontent.com/karpathy/makemore/refs/heads/master/names.txt

Start by importing our libraries.

# We will keep it simple as stated above using numpy
# We will also use matplotlib for visualization
import numpy as np
import matplotlib.pyplot as plt

Preparing our data for the model 

We setup a configuration class that holds our hyperparameters

# Let us config a data class
class Config:
    d_model = 16    # The embedding dimensions
    n_heads = 4     # When we get to multi-head attention, we will need this
    d_head = 4      # We could calculate this manually by doing d_model // n_heads
    n_layers = 2    # We are going to stack two layers, that is two decoder blocks. 
    batch_size = 1  # Batch size of 1. For simplicity and easier visualization

    text = 'Welcome to the world of AI' # The test our untrained model should generate

# instantiate the config 
cfg = Config()
cfg
-----------
<__main__.Config at 0x77ecd644c050>

Let's build a function to create our vocab. This is overkill but hey, we should learn to write dry code as much as possible 😀

def build_vocab(text):
    '''
    text: The full text 
    return:
        chars: The chars in vocabulary
        stoi: maps/encodes characters to numbers
        itos: unmaps/decode numbers back to characters
    '''
    chars = sorted(list(set(text))) # get a list of unique characters in the input text
    
    # Convert the text to numbers
    stoi = { ch:i for i,ch in enumerate(chars, start=1)} 
    
    # Go back from numbers to text
    itos = { i:ch for ch,i in stoi.items()}
    return chars, stoi, itos


# Test the function
chars, stoi, itos = build_vocab(cfg.text)

print(f'[*] Here are the characters: {chars}')
print(f'[*] Here are the characters: {"".join(chars)}')
print(f'[*] Here is the stoi mapping/encoding: {stoi}')
print(f'[*] Here is the itos un-mapping/decoding: {itos}')

# Setup the vocab size 
vocab_size = len(chars)
print(f'Vocab size / unique tokens: {vocab_size}')
-----------

[*] Here are the characters: [' ', 'A', 'I', 'W', 'c', 'd', 'e', 'f', 'h', 'l', 'm', 'o', 'r', 't', 'w']
[*] Here are the characters:  AIWcdefhlmortw
[*] Here is the stoi mapping/encoding: {' ': 1, 'A': 2, 'I': 3, 'W': 4, 'c': 5, 'd': 6, 'e': 7, 'f': 8, 'h': 9, 'l': 10, 'm': 11, 'o': 12, 'r': 13, 't': 14, 'w': 15}
[*] Here is the itos un-mapping/decoding: {1: ' ', 2: 'A', 3: 'I', 4: 'W', 5: 'c', 6: 'd', 7: 'e', 8: 'f', 9: 'h', 10: 'l', 11: 'm', 12: 'o', 13: 'r', 14: 't', 15: 'w'}
Vocab size / unique tokens: 15

Let us take a different view of this mapping by using pandas.

# Import pandas as pd
import pandas as pd
df = pd.DataFrame(stoi.items(), columns=['char', 'num'])
df.style.hide(axis='index')

We do the same thing for the number to strings

df = pd.DataFrame(itos.items(), columns=['num', 'char'])
df.style.hide(axis='index')




With above in place, we now have a clear understanding, of one way to map text to numbers and back from numbers to text. 

Let's build on this to setup an encoder function. This function is what will be called on future text, using the vocabulary we defined above. Remember, our vocab is the unique characters we have within the string "Welcome to the world of AI".

encode = lambda text, stoi: [ stoi.get(ch) for ch in text ]

# Test the encoder
encode(text='Welcome', stoi=stoi)

---------------
[4, 7, 10, 5, 12, 11, 7]

As we said earlier, if we encode from text to numbers, we have to be able to revert that process. While the computer needs numbers to train on, we cannot provide back those numbers to humans. We need to give humans something that is understandable. Hence the need for the decoder to revert the mapping.

# This maps us back from numbers to chars
decode = lambda indices, itos: ''.join([ itos.get(i) for i in indices ])

# Test the encoder
decode(encode(text='Welcome', stoi=stoi), itos=itos)
------------
'Welcome'

Now that we know the encoder and decoder works, let us get all our tokens from the text "Welcome to the world of AI" . At the same time, we make a 1-dimension NumPy. We also add a new (batch) dimension also, moving the input form a list to a 2-dimension NumPy array.

tokens = np.array(encode(text=cfg.text, stoi=stoi), dtype=np.int32)[None, :]
print(f'Here are the tokens: \n{tokens} | tokens dtype: {tokens.dtype} | shape: {tokens.shape} | Dims: {tokens.ndim}')

# Extract the batch and time dimensions and put them into separate variables
B, T = tokens.shape # (batch, timestep)
-------------
Here are the tokens: 
[[ 4  7 10  5 12 11  7  1 14 12  1 14  9  7  1 15 12 13 10  6  1 12  8  1
   2  3]] | tokens dtype: int32 | shape: (1, 26) | Dims: 2

We are making progress, let us setup our X from the tokens. We are using a batch size of 1 for simplicity.  
We use batch size of one as it is easy for us to visualize as we go along.   
I like visuals and you should too ;-) 

# This also means we will feed the entire sequence into the model
X = tokens[:, :-1] # (We are predicting the next token)
Y = tokens[:, 1:] # the 1 is the next token

# Peek into the data
print(f'Here is the X: {X}')
print(f'Here is the Y: {Y}')

-------------
Here is the X: [[ 4  7 10  5 12 11  7  1 14 12  1 14  9  7  1 15 12 13 10  6  1 12  8  1
   2]]
Here is the Y: [[ 7 10  5 12 11  7  1 14 12  1 14  9  7  1 15 12 13 10  6  1 12  8  1  2
   3]]

What do we take away from above? When the model sees 4, we would like it to predict 7. When it sees the sequence of 4, 7, we would like it to predict 10. When it sees, 4, 7, 10, we would like it to predict 5. That pattern continues ...

Let's prepare to visualize our tokens. Setup a function for this even though we don't need to.

# Let us visualize above
def plot_token_indices(tokens, title='Token Indices over time'):
    '''
    tokens: np.array of shape (B, T)
    '''
    assert tokens.shape[0] == 1, f'We are working with 1 full row'
    t = np.arange(tokens.shape[1])
    plt.figure(figsize=(15,4))
    plt.title(title)
    plt.bar(x=t, height=tokens[0])
    plt.xticks(ticks=range(0, len(cfg.text),1), labels=cfg.text)
    plt.yticks(ticks=range(0,15,1))
    plt.ylabel('Token Index')
    plt.xlabel('Sequence')
    plt.grid(axis='y')
    plt.show()


# Test the function
plot_token_indices(tokens=tokens)


Above shows our sequence and the index positions for each token. For example, we see that w has a value of 4, e has a value of 6, space has a value of 0, etc.

With this in place, let's work on our core numerical primitives. 

Stable Softmax / Cross-entropy from logits / LayerNorm / Dropout / GELU   

First up 
Softmax
Softmax is a core activation function used in machine learning tasks. It is used to convert the outputs - usually the raw logits - into a probability distribution. We setup our Softmax via a function. We also consider numerical stability as we build this out.

# Setup a numerically stable implementation of softmax
def softmax_stable(logits, axis=-1):
    '''
    Numerically stale softmax implementation
    logits: np.array(..., D) D Is vocab size
    '''

    # First up find the max value in the logits
    max_logits = np.max(logits, axis=axis, keepdims=True)

    # Shift the logits by the max
    shifted = logits - max_logits
    exp_shifted = np.exp(shifted)
    probs = exp_shifted / np.sum(exp_shifted, axis=axis, keepdims=True)
    return probs

# Suppress scientific notation
np.set_printoptions(suppress=True)

# Test the function
-----------------
array([0.00078972, 0.11720525, 0.01586201, 0.86603615, 0.00010688])

Cool we seem to have a stable Softmax. Lets plot Softmax and also see the impact temperature can have on the probabilities. We learned a lot about temperature, top_p and top_k in the first post in this series: Welcome to the world of AI  - Understanding temperature, top_p and top_k

# Create a 100 evenly spaced points between -5 and +5
x = np.linspace(-5, 5, 100)
for temp in [0.5, 1, 2.9, 0.1, 3]:
    probs = softmax_stable(x/temp)
    plt.plot(x, probs, label=f'Temp-{temp}')

plt.legend()
plt.title('Softmax sensitivity to temperature');



What we see above, is that a lower temperature results in sharper probabilities. Larger temperature, results in flatter probability distributions. As mentioned, we learned alot about temperature, top_p and top_k in the first post in this series: **Welcome to the world of AI  - Understanding temperature, top_p and top_k**

We need to be very careful here as even though we went through the process to make this numerically stable, we still have a situation where if these values are too "large" this Softmax output can - or should I say will - converge to a one-hot vector.

As you see below, once the values are "large" Softmax converges to a one-hot vector. Here is an example of that situation:

softmax_stable(np.array([-20., 30, 100, 50, -4]))
----------------
array([0., 0., 1., 0., 0.])

Well Softmax converging to a one-hot vector is not the only problem we have here.  The other problem is if we take the naive Softmax. We can already see large values causes overflow. Hence we see the *inf* below

a = np.array([-20., 30, 1000, 50, -4])
np.exp(a)
-----------------
/tmp/ipykernel_157535/1527753011.py:2: RuntimeWarning: overflow encountered in exp
  np.exp(a)
array([2.06115362e-09, 1.06864746e+13,            inf, 5.18470553e+21,
       1.83156389e-02])
 
When we try to compute the Softmax using the naive method. We see that we have additional overflows and nan values.Hence the reason why we need to ensure we are using the stable method.

# Overflow and nans
np.exp(a) / np.sum(np.exp(a), axis=-1, keepdims=True)
---------------
/tmp/ipykernel_157535/844943855.py:2: RuntimeWarning: overflow encountered in exp
  np.exp(a) / np.sum(np.exp(a), axis=-1, keepdims=True)
/tmp/ipykernel_157535/844943855.py:2: RuntimeWarning: invalid value encountered in divide
  np.exp(a) / np.sum(np.exp(a), axis=-1, keepdims=True)
array([ 0.,  0., nan,  0.,  0.])

Let us now jump to the 
Cross-entropy loss   
If you are doing anything with classification in neural networks, you are more than likely using cross-entropy loss. If you are doing binary classification, you are more than likely using Binary Cross-entropy. For a multi-class problem, you may be using Categorical Cross-entropy or maybe Sparse Categorical Cross-entropy. All different flavours of Cross-entropy loss.

Let us build a Cross entropy function.
# Cross entropy loss
def cross_entropy_loss(logits, targets):
    '''
    logits: (B, T, vocab_size)
    targets: (B, T)
    Returns scalar loss. Single value
    '''
    B, T, V = logits.shape
    probs = softmax_stable(logits=logits, axis=-1)

    # Now let us get the log probability at those index positions
    log_probs = np.log(probs[np.arange(B)[:, None], np.arange(T)[None, :], targets  ])
    loss = -np.mean(log_probs)

    return loss

# The function
targets = np.array([0,1,1,0,1])
logits = np.array([-2., 3, 1, 5, -4])

cross_entropy_loss(logits=logits.reshape(1, 1, -1), targets=targets)
-------------
np.float64(4.143828630781675)

The result of Cross-entropy is a single (scalar) value that tells us how well the model is learning. The closer this loss is to 0, the higher the model accuracy. So the objective is to minimize the loss.

LayerNorm    
https://arxiv.org/pdf/1607.06450     

While this might seem as only being used for normalization, LayerNorm is also used to condition the residual update scale.    

Normalization also helps with speeding up the training process. Layer normalization is done on a per record - single training example - case. This normalization method uses the same technique at training time and test time. 

# With the loss calculated, let us setup LayerNorm
class LayerNorm:
    def __init__(self, d_model, eps=1e-5):
        self.d_model = d_model
        self.eps = eps

        # The scale and bias will be learned
        self.gamma = np.ones((d_model,), dtype=np.float32)
        self.beta = np.zeros((d_model,), dtype=np.float32)

    def __call__(self, x):
        '''
        x: (B, T, d_model)
        '''
        mean = np.mean(x, axis=-1, keepdims=True)
        var = np.var(x, axis=-1, keepdims=True)

        # Perform standardization
        x_hat = (x - mean) / np.sqrt(var + self.eps)

        # Do the scaling and shifting
        out = self.gamma * x_hat + self.beta
        
        return out


Let us now visualize this.

# Set the seed for repeatability
np.random.seed(10)
B, T, D = 1, vocab_size, cfg.d_model
x = np.random.randn(B, T, D).astype(np.float32) * 3.0 + 5.0 # Just shift and scale a bit
ln = LayerNorm(d_model=D)
y = ln(x)

# Flatten x
x_flat = x.reshape(-1, D)
y_flat = y.reshape(-1, D)

plt.figure(figsize=(10,5))
plt.subplot(1,2,1)
plt.title(f'Pre-LayerNormalization: \nmean:{x_flat.flatten().mean():.4f} \nstd:{x_flat.flatten().std():.4f}')
plt.hist(x=x_flat.flatten(), bins=50)
plt.vlines(x=x_flat.flatten().mean(), ymin=0, ymax=20, label='mean', color='r')
plt.vlines(x=x_flat.flatten().mean() + x_flat.flatten().std() * 1, ymin=0, ymax=20, label='+1 std', color='k')
plt.vlines(x=x_flat.flatten().mean() + x_flat.flatten().std() * -1, ymin=0, ymax=20, label='-1 std', color='k')


plt.legend()

plt.subplot(1,2,2)
plt.title(f'Post-LayerNormalization: \nmean:{y_flat.flatten().mean():.4f} \nstd:{y_flat.flatten().std():.4f} ')
plt.hist(x=y_flat.flatten(), bins=50)
plt.tight_layout()
plt.vlines(x=y_flat.flatten().mean(), ymin=0, ymax=20, label='mean', color='r')
plt.vlines(x=y_flat.flatten().mean() + (1 * y_flat.flatten().std()), ymin=0, ymax=20, label='+ 1 std', color='k')
plt.vlines(x=y_flat.flatten().mean() - (1 * y_flat.flatten().std()), ymin=0, ymax=20, label='-1 std', color='k')

plt.legend()
plt.show()


Without LayerNorm, we have a mean of 5.1 on the left and a standard deviation of 2.9. On the right we have a mean of 0 and a standard deviation of 1. This is what we typically want when training our models.

With LayerNorm in place and its visualization, let's see what Dropout is about

Dropout   

Dropout is a regularization strategy that is used to address overfitting. Dropout - disable - neurons during training of the neural network.

Overfitting is a term you will hear alot about in machine learning. It is where the model has learned not only the patterns in the data but potentially the noise also. Thus while the model may train and have an accuracy of 100% and a loss of 0, during inference time, the model is quite inconsistent. That is to say the model will have high variance and low bias. 

Dropout is one mechanism used to address overfitting. It is what is called a regularization strategy.

# Setup our dropout class
class Dropout:
    def __init__(self, p=0.1):
        self.p = p
        self.training = True

    def __call__(self, x):
        if not self.training or self.p == 0:
            return x
        mask = ( np.random.rand(*x.shape) > self.p).astype(x.dtype)

        # Implement invert dropout: scale by 1/(1-p) at train time only
            
        return mask * x / (1.0 - self.p)

Test the Dropout
B, T, D = (1, 5, 4)
x = np.ones((B, T, D), dtype=np.float32)
print(x)

# Setup dropout
do = Dropout(p=0.5)

# Set training to True
do.training = True

print(f'0.5 dropout:\n{do(x)}')

# Disable dropout
do.training = False
do(x)
------------------
[[[1. 1. 1. 1.]
  [1. 1. 1. 1.]
  [1. 1. 1. 1.]
  [1. 1. 1. 1.]
  [1. 1. 1. 1.]]]
0.5 dropout:
[[[2. 0. 2. 0.]
  [2. 2. 0. 0.]
  [2. 0. 0. 0.]
  [0. 2. 0. 2.]
  [2. 2. 2. 2.]]]

Now that we have an understanding of dropout, let's go ahead and wrap this up with th Gaussian Error Linear Unit (GELU) activation function 

GELU  - Gaussian Error Linear Unit
GELU is considered to be a high performance activation function. Activation functions are what introduces the non-linearity in neural networks. It weights inputs by their values. GELU also includes property from dropout and ReLU. 

# Define GELU
def gelu(x):
    '''
    This is the approximate version using Tanh
    x: np.array
    '''
    return 0.5 * x * (
        1.0 + np.tanh(
            np.sqrt(2.0 / np.pi) * (x + 0.044715 * (x**3) )
        )
    )

# Test the functio
x = np.linspace(-4, 4, 400)

# Implement ReLU so we can compare
y_relu = np.maximum(0, x)
y_gelu = gelu(x)

Let's now visualize the effect GELU has on our data.

# plot GELU
plt.figure(figsize=(8, 4))
plt.subplot(121)
plt.plot(x, y_relu, label='ReLU')
plt.legend()

plt.subplot(122)
plt.plot(x, y_gelu, label='GELU')
plt.legend()
plt.show()



We can see above that while ReLU puts everything below 0 to exactly 0, this is not the case with GELU. With GELU, small negative values are possible while large negative values are clipped at 0. 

We have most of the tools we need so far to move ahead with building our model. Let's move on to Token Embeddings and Learned Positional Encodings.

the positional embeddings will have shape (max_seq_len, d_model).
Each position (time step) will have a trainable vector.  

In our case, our token embeddings will be (vocab_size, d_model)

Our initial residual stream will be 
residual = token_embed + pos_embed  


Token Embeddings and Learned Positional Encodings 

# Setup an embedding class
class Embeddings:
    def __init__(self, vocab_size, d_model, max_len):
        self.vocab_size = vocab_size
        self.d_model = d_model
        self.max_len = max_len

        # Our token embeddings will be: (vocab_size, d_model)
        # We will also use this for weight tying strategy later when setting up our Language Model (LM) Head
        self.W_tok = (np.random.randn(vocab_size+1, d_model) / np.sqrt(d_model) ).astype(np.float32)

        # Learned positional embeddings: (max_len, d_model)
        self.W_pos = (np.random.randn(max_len, d_model) / np.sqrt(d_model) ).astype(np.float32)


    def __call__(self, x):
        '''
        x: (B, T) our integer token indices 
        Returns: residual stream (B, T, d_model)
        '''
        B, T = x.shape
        assert T <= max_len, f'Sequence length: {T} is greater than max len: {self.max_len} '
        
        # Setup the token embeddings
        tok_emb = self.W_tok[x] # (B, T, d_model)

        # Setup the positional embeddings
        pos_emb = self.W_pos[None, :T, :] # (1, T, d_model) - This is for broadcasting

        residual = tok_emb + pos_emb

        return residual, tok_emb, pos_emb

# Just something to start with
max_len = 64

# Set a manual seed so our results are the same
np.random.seed(10)
emb = Embeddings(vocab_size=vocab_size, d_model=cfg.d_model, max_len=max_len)

# Time to build the initial residual stream from x
residual, tok_emb, pos_emb = emb(X)

# All shapes or now (1, T-1, d_model)
residual.shape, tok_emb.shape, pos_emb.shape
--------------
((1, 25, 16), (1, 25, 16), (1, 25, 16))

Cool, we setup our residual, we got our token and positional embeddings.

# Visualize the untrained positional embeddings
def plot_positional_embeddings_heatmap(W_pos, num_positions=16):
    num_positions = min(num_positions, W_pos.shape[0])
    plt.figure(dpi=150)
    plt.title(f'Learned positional embeddings: First: {num_positions}')
    plt.imshow(W_pos[:num_positions], aspect='auto', cmap='coolwarm')
    plt.colorbar()
    plt.xlabel('d_model')
    plt.ylabel('Position')
    plt.yticks(ticks=range(0, len(cfg.text),1), labels=cfg.text)
    plt.xticks(ticks=range(0, cfg.d_model, 1))
    plt.show()

plot_positional_embeddings_heatmap(emb.W_pos, num_positions=32)

At this point, we have no structure above as no learning has been done as yet.   
- Each row is a position.    
- Each column is one of our 16 embedding dimensions.   
- Notice that these are not smooth.   
- We also see roughly same variance across
- It also looks like no two positions look identical.

Think about this as our first view as the positions embedding into the tokens   

def plot_token_vs_pos_norms(tok_emb, pos_emb):
    '''
    tok_emb, pos_emb: (B, T, d_model)
    '''
    assert tok_emb.shape == pos_emb.shape
    B, T, D = tok_emb.shape

    tok_norms = np.linalg.norm(tok_emb, axis=-1)[0] # (T,)
    pos_norms = np.linalg.norm(pos_emb, axis=-1)[0] # (T,)

    plt.figure(figsize=(8,3))
    t = np.arange(T)
    plt.plot(t, tok_norms, label=f'Token embedding norms - mean: {tok_norms.mean():.4f}')
    plt.plot(t, pos_norms, label=f'Positional embedding norms - mean: {pos_norms.mean():.4f}')

    plt.xlabel('Position {t}')
    plt.ylabel('L2 norm')


    plt.legend()
    plt.show()


# Test the function
plot_token_vs_pos_norms(tok_emb, pos_emb)



Our data has d_model = 16 dimensions at this time. We cannot visualize this, so let's leverage PCA to bring this data down.    
We see the average mean norm is about the same. This means they are about the same scale    
If the positional norms are too small, the model may struggle to learn     
At the same time, we don't want the positional embeddings to be too large. We do not wish to overwhelm the token identity    
What we want is a balanced representation. This looks somewhat balanced when we look at the mean    

We could leverage sklearn's PCA but let's build our own just for the fun of it.

# Setup 
def pca_2d(x):
    '''
    x: (n_rows, d_dimensions)
    Returns: (N, 2)
    '''
    x_mean = x.mean(axis=0, keepdims=True)
    x_centered = x - x_mean
    cov = x_centered.T @ x_centered / (x_centered.shape[0] - 1)
    eigvals, eigvecs = np.linalg.eigh(cov)
    idx = np.argsort(eigvals)[::-1]
    eigvecs = eigvecs[:, idx[:2]]  # (D, 2)

    return x_centered @ eigvecs # (N, 2)

# Test the function
pca_2d(tok_emb.reshape(-1, 16))[:5]
----------------
array([[ 0.255776  ,  0.23570058],
       [-0.9714201 ,  0.5525704 ],
       [-0.19787998,  0.3391831 ],
       [-0.63930357,  0.12790056],
       [-0.07901763, -0.9264408 ]], dtype=float32)

Visualization time ...
# Let's visualize this now
def plot_pca_token_vs_token_plus_pos(tok_emb, pos_emb):
    '''
    Compare geometry of token embeddings vs token + pos
    '''
    B, T, D = tok_emb.shape

    # Reshape the embeddings for PCA
    # We have three dimensions but only need 2
    tok_flat = tok_emb.reshape(B*T, D)
    pos_flat = pos_emb.reshape(B*T, D)
    tok_pos_flat = (tok_emb + pos_emb).reshape(B*T, D)

    # Leverage PCA
    tok_pca = pca_2d(tok_flat)
    pos_pca = pca_2d(pos_flat)
    tok_pos_pca = pca_2d(tok_pos_flat)

    plt.figure(figsize=(12,4))
    plt.subplot(131)
    plt.title('Token embeddings PCA')
    plt.scatter(tok_pca[:, 0], tok_pca[:, 1], c=np.arange(T).repeat(B), cmap='viridis')

    for idx, ch in enumerate(chars):
            plt.text(tok_pca[idx, 0], tok_pca[idx, 1], s=ch, fontsize=15)

    plt.subplot(132)
    plt.title('POS embeddings PCA')
    plt.scatter(pos_pca[:, 0], pos_pca[:, 1], c=np.arange(T).repeat(B), cmap='viridis')

    for idx, ch in enumerate(chars):
            plt.text(pos_pca[idx, 0], pos_pca[idx, 1], s=ch, fontsize=15)

    plt.subplot(133)
    plt.title('Token + position embeddings PCA')
    plt.scatter(tok_pos_pca[:, 0], tok_pos_pca[:, 1], c=np.arange(T).repeat(B), cmap='viridis')

    for idx, ch in enumerate(chars):
            plt.text(tok_pos_pca[idx, 0], tok_pos_pca[idx, 1], s=ch, fontsize=15)

    plt.tight_layout()
    plt.show()

plot_pca_token_vs_token_plus_pos(tok_emb, pos_emb)



What should we take away from these images above. Here are a few things:
1. Transformer encodes some structure, even before we interact with attention or the feed forward network.
2. We want to know how adding the position embeddings change the geometry of the token embeddings

Let us move on to a masked single head attention. We will do the mask single head before moving to multi-head attention.

Single Head Masked self-attention mechanism   
We have our residual (token_embeddings + positional_embeddings) with shape (1,25, 16)
At this point we have (B, T, d_model) in the end this will be (B, T, d_head). Remember we will only have one head to start, so d_head will equal to d_model.

# Define a he single head attention
def single_head_attention(x, W_q, W_k, W_v):
    '''
    x: (B, T, d_model)
    W_q: (d_model, d_model)
    W_k: (d_model, d_model)
    W_v: (d_model, d_model)

    Returns:
        attn_out: (B, T, d_model)
        attn_weights: (B, T, T)
        scores_raw: (B, T, T)
        scores_masked: (B, T, T)
    '''

    # Get the shape
    B, T, D = x.shape

    # perform the projections to Q, K, V
    Q = x @ W_q # (B, T, d_model)
    K = x @ W_k # (B, T, d_model)
    V = x @ W_v # (B, T, d_model)

    # With the projections in place, 
    # let get scaled dot-product attention scores
    scores_raw = (Q @ K.transpose(0,2,1)) / np.sqrt(cfg.d_model) # (B, T, T)

    # Setup the causal mask
    mask = np.triu(np.ones((T, T), dtype=bool), k=1)
    scores_masked = scores_raw.copy()
    scores_masked[:, mask] = -1e9   # (B, T, T)

    # Softmax
    attn_weights = softmax_stable(scores_masked, axis=-1) # (B, T, T)

    # Get the weighted values
    attn_out = attn_weights @ V # (B, T, d_model)

    return attn_out, attn_weights, scores_raw, scores_masked

# disable scientific notation
np.set_printoptions(suppress=True)

# Setup the weight matricies 
# We scale the initial weights here by 0.02, just to make them a bit smaller to help the training
# We are basically scaling the standard deviation here so it is closer to 0 with ~0.02 std
W_q = np.random.randn(cfg.d_model, cfg.d_model).astype(np.float32) * 0.02
W_k = np.random.randn(cfg.d_model, cfg.d_model).astype(np.float32) * 0.02
W_v = np.random.randn(cfg.d_model, cfg.d_model).astype(np.float32) * 0.02

# test the function
attn_out, attn_weights, scores_raw, scores_masked = single_head_attention(residual, W_q , W_k, W_v)

# Confirm the shapes
print(f'Residua shape: {residual.shape} -> (B, T, d_model)')
print(f'Attn out shape: {attn_out.shape} -> (B, T, d_model)')
print(f'Attn weights shape: {attn_weights.shape} -> (B, T, T)')
print(f'Scores raw shape: {scores_raw.shape} -> (B, T, T) ')
print(f'Scores masked shape: {scores_masked.shape} -> (B, T, T)')

print(f'W_q mean: {W_q.mean():.4f} | W_q std: {W_q.std():.4f}')
-------------
Residua shape: (1, 25, 16) -> (B, T, d_model)
Attn out shape: (1, 25, 16) -> (B, T, d_model)
Attn weights shape: (1, 25, 25) -> (B, T, T)
Scores raw shape: (1, 25, 25) -> (B, T, T) 
Scores masked shape: (1, 25, 25) -> (B, T, T)
W_q mean: -0.0002 | W_q std: 0.0189


plt.figure(figsize=(15,4))

plt.subplot(141)
plt.imshow(scores_raw[0], aspect='auto', cmap='viridis')
plt.title('Scores pre-masking')
plt.xlabel('Key Position')
plt.ylabel('Query position')

plt.subplot(142)
plt.imshow(scores_masked[0], aspect='auto')
plt.title('Scores post-masking')
plt.xlabel('Key Position')
#plt.ylabel('Query position')

plt.subplot(143)
plt.imshow(attn_weights[0], aspect='auto', cmap='viridis')
plt.title('Attention Weights')
plt.xlabel('Key Position')
#plt.ylabel('Query position')

plt.subplot(144)
plt.imshow(attn_out[0], aspect='auto', cmap='viridis')
plt.title('Attention Output')
plt.xlabel('Key Position')
#plt.ylabel('Query position')

plt.colorbar()
plt.tight_layout()
plt.show()


**Pre-masking**
For the pre-masking, the query is the row and the column is the key   
We still do not have any structure as yet in the pre-masking plot   
right now, each token is most likely similar to itself   
This represents the unrestricted attention landscape   
We can conclude this is how the model would attend if there was no autoregressive behaviour   
This is the raw nature of the residual stream    

**Post-masking** 
What do we take away from the post-masking.   
Keep in mind, this is the same matrix as the pre-masking. Only difference now is the upper triangle has been replaced with -inf   
The mask prevents the model from looking to the future   
Every token can only attend to itself and the tokens preceding it. 
This is the core idea behind autoregressive generation

**Attention weights**  
This now says where each token looks   
Position 0 can only attend to itself  
Earlier positions distribute the attention across earlier tokens  
Think about this as a routing mechanism where the attention flows across the sequences  
This is the model communicating with itself

**Attention output**
Finally, we have the attention output  

# Plot the per attention weights
attn_weights.shape

# Get the shape data
B, T, d_model = attn_weights.shape

# Get the bar plot
plt.figure(figsize=(15,10))
for i in range(28):
    plt.subplot(7, 4,i+1)
    plt.bar(np.arange(T), attn_weights[0, i])
    plt.title(f'attn distribution for pos: {i}:{cfg.text[i]}')
    plt.xlabel('key position')
    plt.ylabel('attn weights')
    plt.xticks(ticks=range(0,25,1))
    if i == attn_weights.shape[1] - 1:
        break

plt.tight_layout()
#plt.bar(np.arange(T), attn_weights[0, 10])

Visualize ...

What do you take away from above.
The one bar in the first plot, means that the model can only attend to the first token. Basically itself.    
For position 5 for example, the model can only attend to positions 0-4. These values sum to 1 for the probabilities  
Some positions may strongly prefer one earlier token   
Overall, we can look at attention as a probability distribution. From a local perspective the model is attending to nearby tokens. From the global perspective the model attends broadly. It is self-focused when the model attends mostly to itself.

Plot the update norm to the residual. Visualize once again.

# We have a larger residual norm than the update norm
# This is what we want 
upd_norms = np.linalg.norm(attn_out[0], axis=-1)
res_norms = np.linalg.norm(residual[0], axis=-1)

plt.plot(np.arange(T), upd_norms, label=f'Attention update norm mean: {upd_norms.mean():.4f}')
plt.plot(np.arange(T), res_norms, label=f'residual norm mean {res_norms.mean():.4f}')
plt.xlabel('Positions T')
plt.ylabel('L2 Norm')
plt.title("Attention update vs residual norm (single head)")

plt.legend()
plt.show()


Think of attention as an additive update not a replacement. 
The update is usually small relative to the residual stream 


Now that we understand how a single attention head works, let us move on to multi-head attention.

Multi-head attention  
We build our own multi-head attention mechanism.

# Let us do this via a class
class MultiHeadSelfAttention:
    def __init__(self, d_model, n_heads, dropout_p=0.0):
        # Let us ensure that the d_model is divisible by n_heads
        assert d_model % n_heads == 0, f'd_model: {d_model} not divisible by n_heads: {n_heads}'

        self.d_model = d_model

        # Each head shares the same input 
        # but will see different subspaces hence difference perspectives
        self.n_heads = n_heads
        self.d_head = d_model // n_heads

        # Using one QKV projection: (d_model, 3*d_model)
        # This approach is also more efficient
        self.W_qkv = (np.random.randn(d_model, 3 * d_model) * 0.02).astype(np.float32)

        # Also setup our input projection
        # We need this to fuse the heads back together
        # Fuse the information from the different heads together
        self.W_o = (np.random.randn(d_model, d_model) * 0.002).astype(np.float32)

        # Setup dropout
        self.dropout = Dropout(p=dropout_p)

    def __call__(self, x):
        '''
        x> (B, T, d_model)
        returns:
        out: (B, T, d_model)
        attn_weights: (B, n_heads, T, T)
        '''
        # Capture the shape information
        B, T, D = x.shape

        # do our first linear projection to QKV
        qkv = x @ self.W_qkv # (B, T, 3*d_model)

        # 3 is included below for the each of the QKV
        qkv = qkv.reshape(B, T, 3, self.n_heads, self.d_head) # (B, T, 3, n_heads, d_head)

        # Transpose the dimensions
        qkv = np.transpose(qkv, axes=(2, 0, 3, 1, 4)) # (3, B, n_heads, T, d_head)

        # Extract the Q, K, V
        # Each of these now have a shape of (B, n_heads, T, d_head )
        Q, K, V = qkv[0], qkv[1], qkv[2]

        # Scaled dot-product attention per head
        # shape (n_heads, T, T)
        scores = (Q @ K.transpose(0, 1, 3, 2)) / np.sqrt(self.d_head)

        # Setup the causal mask
        mask = np.triu(np.ones((T, T), dtype=bool), k=1) # (T,T)
        scores_masked = scores.copy()
        scores_masked[:, :, mask] = -1e9

        # Apply softmax 
        attn_weights = softmax_stable(scores_masked, axis=-1) # (B, n_heads, T, T)

        # Get the weighted sum of values
        attn_out = attn_weights @ V # (B, n_heads, T, d_head)

        # Let us put these heads back together
        attn_out = attn_out.transpose(0, 2, 1, 3).reshape(B, T, self.d_model)   # (B, T, d_model)

        # Final output projection from the attention mechanism
        out = attn_out @ self.W_o # (B, T, d_model)

        # Let's add a dropout if needed
        out = self.dropout(out)

        return out, attn_weights, scores, scores_masked

Testing the class 

# Test the class
mha = MultiHeadSelfAttention(d_model=cfg.d_model, n_heads=cfg.n_heads)

mha_out, mha_attn_weights, mha_scores_raw, mha_scores_masked = mha(x=residual)

# confirming the shapes we saw above
mha_out.shape, mha_attn_weights.shape, mha_scores_raw.shape, mha_scores_masked.shape
---------------

((1, 25, 16), (1, 4, 25, 25), (1, 4, 25, 25), (1, 4, 25, 25))

With the output from the multi-head self-attention, let us visualize some of the items we retrieved

plt.figure(figsize=(15,4))
plt.suptitle('Plots of masked heads')
for i in range(cfg.n_heads):
    plt.subplot(1,4,i+1)
    plt.imshow(mha_attn_weights[0, i], cmap='viridis')
    plt.title(f'head: {i}')
    plt.xlabel('Key Position')
    if i == 0:
        plt.ylabel('Query')

plt.tight_layout()


Remember this model has not been trained as yet, hence as we move from the first row (query) down to the last row, the probabilities are more diffused. This is why the colours move from bright above to seemingly the same as you go further down.

Plot the output variance
We see the residual var is larger than the MHA out variance.

# Plot the output variance
# We see the residual var is larger than the MHA out variance

residual_var = residual[0].var(axis=-1)
mha_out_var = mha_out[0].var(axis=-1)

B, T, D = residual.shape
t = np.arange(T)

plt.plot(t, residual_var, label=f'residual (input) variance. Mean: {residual_var.mean():.4f}')
plt.plot(t, mha_out_var, label=f'MHA output variance. Mean: {mha_out_var.mean():.4f}')
plt.xlabel('Positions ')
plt.ylabel('Variance across d_model')
plt.title('Variance of Residual vs MHA Out')
plt.legend()
plt.show()

We have the multi-head self-attention mechanism in place. Let's move on to the feed forward network (FFN).  

Feed Forward  
The FFN is where the computation occurs
This FFN is position wise. There is no mixing across the time dimension.

class FeedForward:
    def __init__(self, d_model, ffn_expansion=4, dropout_p=0.0):
        '''
        ffn_expansion=4: 4 * d_model
        '''
        self.d_model = d_model
        self.d_hidden = ffn_expansion * d_model

        # Our first linear projection
        self.W1 = (np.random.randn(d_model, self.d_hidden)*0.02).astype(np.float32)
        self.b1 = np.zeros((self.d_hidden,), dtype=np.float32)

        # Our second linear projection
        self.W2 = (np.random.randn(self.d_hidden, self.d_model)*0.02).astype(np.float32)
        self.b2 = np.zeros((self.d_model,), dtype=np.float32)

        # Setup dropout
        self.dropout = Dropout(p=dropout_p)

    def __call__(self, x):
        '''
        x: (B, T, d_model)
        returns: (B, T, d_model)
        '''

        # Apply the first linear layer
        h = x @ self.W1 + self.b1 # (B, T, d_hidden)

        # Apply the activation function
        h = gelu(h) 

        # Final linear projection and get the output of the ffn
        out = h @ self.W2 + self.b2 # (B, T, d_model)

        # Apply dropout if available
        out = self.dropout(out)

        return out


# Test the function
ffn = FeedForward(d_model=cfg.d_model)

# Realistically, we should test this on the output of the MHA
# ffn(mha_out).shape, 

# Let's test it on our residual, the original input
ffn(residual).shape

# Visualization of the effects of the FFN on the input
ffn_pre_activation = residual@ ffn.W1 + ffn.b1
ffn_post_activation = gelu(ffn_pre_activation)

plt.figure(figsize=(10,4))
plt.subplot(121)
plt.title('FFN pre-GELU activations')
plt.hist(ffn_pre_activation.flatten())

plt.subplot(122)
plt.title('FFN post-GELU activations')
plt.hist(ffn_post_activation.flatten())

plt.tight_layout()
plt.show()



Above, we see the impact the activation function has on the input.

Let's take a scatter plot 

x_flat = residual.reshape(-1, cfg.d_model)
y_flat = ffn(residual).reshape(-1, cfg.d_model)

plt.figure(figsize=(15,15))
for i in range(16):
    plt.subplot(4,4,i+1)
    plt.scatter(x_flat[:, i ], y_flat[:, i])
    plt.title(f'ffn input vs out for dim: {i}')
    plt.xlabel(f'input at dim: {i}')
    plt.ylabel(f'output at dim: {i}')
    
plt.tight_layout()



One take away from here is how the FFN reshapes the input vectors

# Remember when we called GELU some neurons will become 0
# Let's calculate how many of those neurons are 0s
sparsity = np.mean(ffn_post_activation > 0, axis=(0,1))

plt.title(f'FFN neuron activation sparsity')
plt.plot(sparsity)
plt.xlabel('Hidden Neuron Index')
plt.ylabel('Fraction Active');

# Get the norms
residual_norms = np.linalg.norm(residual[0], axis=-1)
out_norm = np.linalg.norm(mha_out[0], axis=-1)

plt.plot(t, out_norm, label=f'MHA out norm mean: {out_norm.mean():.4f}')
plt.plot(t, residual_norms, label=f'Residual norm: {residual_norms.mean():.4f}')
plt.title(f'Residual norms vs MHA out norm')
plt.xlabel('Position')
plt.ylabel('L2 norm')

plt.legend()

With all of this in place let's go ahead and put it all together. 
We will use our pre LayerNorm
residual connection
MHA + FFN combined

Basically, let us put together a decoder block
Decoder Block

Like we have done before, let us build a Class

class DecoderBlock:
    def __init__(self, d_model, n_heads, ffn_expansion=4, attn_dropout=0.0, ffn_dropout=0.0):
        # Setup the layer norms
        self.ln1 = LayerNorm(d_model=d_model)
        self.ln2 = LayerNorm(d_model=d_model)

        # Setup MHA
        self.mha = MultiHeadSelfAttention(d_model=d_model, n_heads=cfg.n_heads, dropout_p=attn_dropout)

        # Setup the FFN
        self.ffn = FeedForward(d_model=d_model, ffn_expansion=ffn_expansion, dropout_p=ffn_dropout)

    def __call__(self, x):
        '''
        residual: (B, T, d_model)
        returns:
            residual_out: (B, T, d_model)
            cache: dict of intermediates for visualizations
        '''
        cache = {}

        # MHA Block
        x_norm1 = self.ln1(x)
        mha_out, attn_weights, scores_raw, scores_masked = self.mha(x)

        # Get the residual after the MHA
        residual_mha = x + mha_out # Residual updates

        # Cache some results
        cache['x_norm1'] = x_norm1
        cache['mha_out'] = mha_out
        cache['attn_weights'] = attn_weights
        cache['scores_masked'] = scores_masked
        cache['residual_after_mha'] = residual_mha

        # FFN Block
        x_norm2 = self.ln2(residual_mha)
        ffn_out = self.ffn(x_norm2)
        residual_mha_ffn_out = residual_mha + ffn_out

        cache['x_norm2'] = x_norm2
        cache['ffn_out'] = ffn_out
        cache['residual_mha_ffn_out'] = residual_mha_ffn_out

        return residual_mha_ffn_out, cache

# Test the class
decoder_block = DecoderBlock(d_model=cfg.d_model, n_heads=cfg.n_heads)
decoder_out, decoder_cache = decoder_block(x=residual)
decoder_out.shape, decoder_cache.keys()
---------------
((1, 25, 16),
 dict_keys(['x_norm1', 'mha_out', 'attn_weights', 'scores_masked', 'residual_after_mha', 'x_norm2', 'ffn_out', 'residual_mha_ffn_out']))


Now that we have put together one decoder block, let's put together a stack.

Stack of Decoders
class DecoderStack:
    def __init__(self, n_layers, d_model, n_heads, ffn_expansion=4, attn_dropout_p=0.0, ffn_dropout_p=0.0):
        self.n_layers = n_layers
        self.blocks = [ 
            DecoderBlock(d_model=cfg.d_model, n_heads=cfg.n_heads) for _ in range(n_layers)]

    def __call__(self, x):
        '''
        x: residual (B, T, d_model)
        returns: 
            residual: (B, T, d_model)
            all_caches: list[dict] per layer
        '''
        all_caches = []

        for layer_idx, block in enumerate(self.blocks):
            x, cache = block(x)
            cache['layer_idx']  = layer_idx
            all_caches.append(cache)
        return x, all_caches

Test the stack

decoder_stack = DecoderStack(n_layers=cfg.n_layers, d_model=cfg.d_model, n_heads=cfg.n_heads)

residual_final, caches = decoder_stack(residual)
residual_final.shape, [ i.keys() for i in caches ]
--------------
((1, 25, 16),
 [dict_keys(['x_norm1', 'mha_out', 'attn_weights', 'scores_masked', 'residual_after_mha', 'x_norm2', 'ffn_out', 'residual_mha_ffn_out', 'layer_idx']),
  dict_keys(['x_norm1', 'mha_out', 'attn_weights', 'scores_masked', 'residual_after_mha', 'x_norm2', 'ffn_out', 'residual_mha_ffn_out', 'layer_idx'])])

# Let's visualize what we just built
layer_indices = []
norms_before = []
norms_after = []

x = residual

for cache in caches:
    layer_idx = cache['layer_idx']
    res_after = cache['residual_mha_ffn_out']

    norms_before.append(np.linalg.norm(residual[0], axis=-1).mean())
    norms_after.append(np.linalg.norm(res_after[0], axis=-1).mean())
    layer_indices.append(layer_idx)
    x = res_after

plt.plot(layer_indices, norms_before, label='Residual norm (before layer)')
plt.plot(layer_indices, norms_after, label='Residual Norm afterr')
plt.xlabel('Layer')
plt.ylabel('Mean L2 norm over positions')
plt.title("Residual norm evolution across layers")
plt.legend();


print(f'Norms before: {norms_before}')
print(f'Norms after: {norms_after}')
----------------
Norms before: [np.float32(1.4254605), np.float32(1.4254605)]
Norms after: [np.float64(1.426371919459707), np.float64(1.4268161008242064)]


We can see above the residual stream barely changes across layers 
Residual connections add small updates 

norms_before# Let's visualize what we just built
layers = []
mha_updates = []
ffn_updates = []
x = residual

for cache in caches:
    layer_idx = cache['layer_idx']
    res_before = x
    res_after_mha = cache['residual_after_mha']
    res_after = cache['residual_mha_ffn_out']

    mha_update = res_after_mha - res_before
    ffn_update = res_after - res_after_mha

    mha_updates.append(np.linalg.norm(mha_update[0], axis=-1).mean())
    ffn_updates.append(np.linalg.norm(ffn_update[0], axis=-1).mean())
    layers.append(layer_idx)
    x = res_after

plt.plot(layers, mha_updates, label='MHA update norm')
plt.plot(layers, ffn_updates, label='FFN update norm')
plt.xlabel('Layer')
plt.ylabel('Mean L2 norm over positions')
plt.title("Update magnitude per layer")
plt.legend();


Let's prepare to wrap this up by visualizing the residual stream.

# Create a heatmap of the residual stream
B, T, D = residual.shape

activations = [residual[0]]
x = residual.copy()

for cache in caches:
    x =cache['residual_mha_ffn_out']
    activations.append(x[0])

activations = np.stack(activations, axis=0)
Lp1 = activations.shape[0]

plt.figure(figsize=(15,15))
plt.imshow(activations.reshape(Lp1, T * D ), aspect='auto', cmap='coolwarm')
plt.xlabel('Position x d_model')
plt.ylabel('Layer (0 - input)')
plt.title("Residual stream evolution across layers")
plt.colorbar();


Above, we are visualizing the entire residual stream for all layers 
We have a model at 16 dimensions. We have 0-25 positions 16*26. Hence the reason for the 400 points above.
Think about this as seeing how the model's internal representation evolves as it goes deeper. 
We see similar patters across rows

Let us move ahead with setting up the language head, Softmax and loss ..

LM Head   
We will use the weight tying approach.

class LMHead:
    def __init__(self, W_tok):
        '''
        W_tok: (vocab_size, d_model)
        We reuse tok embeddings as output weights (weight tying)
        '''
        self.W_out = W_tok # (vocab_size, d_model)

    def __call__(self, x):
        '''
        residual: (B, T, d_model)
        returns: logits: (B, T, vocab_size)        
        '''
        B, T, D = x.shape
        V, D2 = self.W_out.shape

        #(B, T, D) @ (D, vocab_size) -> (B, T, vocab_size)
        logits = x @ self.W_out.T

        return logits

# Test the function
lmh = LMHead(W_tok=emb.W_tok)

logits = lmh(residual_final)
logits.shape
---------
(1, 25, 16)

With the logits in place, let us now grab the probabilities.
out_preds = np.argmax(logits, axis=-1)[0]

# Here is our prediction for our untrained model
''.join([ itos[i] for i in out_preds])

Well at this point, we could calculate the loss and backpropagate, etc. However, the objective was to build a transformer, not train a transformer. I think we have achieved this objective. 

To train a transformer, we will take an easier route in the final part of this series: "Building and training fully functional Decoder-Only transformer."

# Put it all together
class DecoderOnlyTransformer:
    def __init__(self, d_model, n_heads, n_layers, dropout_p, W_tok):
        self.decoder_stack = DecoderStack(n_layers=n_layers, d_model=d_model, n_heads=n_heads)
        self.lm_head = LMHead(W_tok=W_tok)
        
    def __call__(self, x):
        x, _ = self.decoder_stack(x)
        x = self.lm_head(x)
        return x

# Setup the full Decoder only transformer
transformer = DecoderOnlyTransformer(d_model=cfg.d_model, n_heads=cfg.n_heads, n_layers=cfg.n_layers, dropout_p=0.0, W_tok=emb.W_tok)

# Get the logits
logits = transformer(residual)
logits.shape

With the logits, we can get the probabilities again if we wish. Maybe you wanted to plot the probability distribution or something.

out_preds = np.argmax(logits, axis=-1)[0]

And finally, we generate from our untrained model.

# Here is our generation for our untrained model
''.join([ itos[i] for i in out_preds])
-----------
'Welco e fo ohe world of A'

Well that's it for this second post in this series. If you find something I could have or should have done differently, let me know.   

Let us do this with Torch now and leverage Andrej Karpathy's makemore series to generate new baby names.