Saturday, March 7, 2026

Welcome to the world of AI - Putting it all together. Building and training fully functional Decoder-Only transformer

In the first post, we learned about temperature, top_k and top_p. We then built a Decoder-Only Transformer using pure NumPy in the second post. The third post we took advantage of PyTorch.

In this final post, we put the raw code needed to run a full decoder only transformer, to generate baby names. Hope you enjoyed this series. As always, if you think there is something I should have done differently, do not hesitate to reach out. 

'''

## "Welcome to the world of AI" 
#### Putting it all together. Building and training fully functional Decoder-Only transformer .

Ok, in the previous two posts, we built a Decoder Only transformer using pure NumPy. We then use PyTorch to build a transformer. This was however done in Jupyter notebook. Let's write a real script that we can run on any text based dataset to generate similar text. 

I will stick with my baby names dataset to keep this simple

References:
https://docs.python.org/3/library/argparse.html

$ clear && python3 baby_name_gpt.py --filename names.txt --d_model=32 --n_heads=4 --n_layers=2 --epochs=10000 --temperature=1.3 --top_p=0.90

'''

#baby_name_gpt.py

import argparse
import torch
import torch.nn as nn
import torch.nn.functional as F

# Set the seed for reproducibility
torch.manual_seed(42)

CONTEXT_WINDOW_LENGTH = 16  # Max tokens the model can process at once

# Setup the argument parser
arg_parser = argparse.ArgumentParser(prog='gpt.py', description='A mini GPT', epilog='www.securitynik.com')

# Add arguments
arg_parser.add_argument('-f', '--filename', required=True, help='/path/to/some_file with text to learn from')
arg_parser.add_argument('-d', '--d_model', type=int,  help='Embedding dimension of the model')
arg_parser.add_argument('-n', '--n_heads', type=int, help='Number of heads')
arg_parser.add_argument('-l', '--n_layers', type=int, help='Number of layers')
arg_parser.add_argument('-e', '--epochs', type=int, help='Number of training ')
arg_parser.add_argument('-b', '--batch_size', type=int, help='Batch size')
arg_parser.add_argument('-t', '--temperature', type=float, help='temperature')
arg_parser.add_argument('-k', '--top_k', type=int, help='top_k')
arg_parser.add_argument('-p', '--top_p', type=float, help='top_p')

args = arg_parser.parse_args()

# Setup a function to read the data
def get_data(input_file=None):
    print(f'🚀 Getting data ...')
    try:
        with open(file=input_file, mode='r') as fp:
            data = fp.read()
            print(f'✅ Successfully read: {len(data)} bytes of data.')
            return data
    except Exception as e:
        print(f'Error encountered: {e}')


# Tokenize the data:
def tokenizer(data=None):
    chars = sorted(list(set(data)))
    print(f'Chars: {repr("".join(chars))}')

    vocab_size = len(chars)
    print(f'✅ Vocab size: {vocab_size} tokens')

    # Encode the chars to numbers
    stoi = { ch:idx for idx,ch in enumerate(chars)}

    # Decode
    itos = {idx:ch for ch,idx in stoi.items()}
    
    return stoi, itos, int(vocab_size)


# Perform the encoding of text
def encode_data(tokenizer=None, data=None):
    print(f'🚀 Encoding the data ...')
    return torch.tensor([ tokenizer.get(ch) for ch in data ], dtype=torch.long)


# Perform the decoding of numbers
def decode_tokens(tokenizer=None, data=None):
    print(f'🚀 Decoding the data ...')
    return ''.join([ tokenizer.get(i) for i in data ])


# Split the data into train and test sets
def train_test_split(tokens=None):
    print(f'🚀 Splitting into train and test sets ...')
    # Use 90% for training and 10 for test
    n = int(len(tokens) * 0.9)
    X_train = tokens[:n]
    X_test = tokens[n:]
    
    print(f'✅ X_train.shape: {X_train.shape} | X_test.shape: {X_test.shape} ...')

    return X_train, X_test


# Generate batches fo data
def generate_batch(X_train=None, X_test=None, split='train',  batch_size=32):

    X = X_train if split=='train' else X_test
    idx = torch.randint(low=0, high=len(X) - CONTEXT_WINDOW_LENGTH, size=(batch_size,))

    X_batch = torch.stack(tensors=[ X[i:i + CONTEXT_WINDOW_LENGTH] for i in idx], dim=0)

    y_batch = torch.stack(tensors=[ X[i+1:i + CONTEXT_WINDOW_LENGTH + 1] for i in idx], dim=0)
    
    return (X_batch, y_batch)


# Create the GPT Embeddings
class GPTEmbeddings(nn.Module):
    def __init__(self, vocab_size=0, d_model=32):
        super(GPTEmbeddings, self).__init__()

        #self.device = device

        # Token embeddings
        self.tok_embeddings = nn.Embedding(num_embeddings=vocab_size, embedding_dim=d_model)

        # Positional embeddings
        self.pos_embeddings = nn.Embedding(num_embeddings=CONTEXT_WINDOW_LENGTH, embedding_dim=d_model)

    def forward(self, x):
        #x: (B, T)
        # print(f'==[DEBUG]== {x.size()}')
        B, T = x.size()

        # Setup positions
        positions = torch.arange(T)
        pos_emb = self.pos_embeddings(positions) # (B, T, D)
        tok_emb = self.tok_embeddings(x)    # (B, T, D)

        return pos_emb + tok_emb # (B, T, D)


# Setup the MultiHead attention
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model=32, n_heads=4):
        super(MultiHeadAttention, self).__init__()

        # Verify the embedding dimension size vs n_heads
        assert d_model % n_heads == 0, f'd_model: {d_model} is not divisible by n_heads: {n_heads}'

        self.d_model = d_model
        self.n_heads = n_heads
        self.head_dim = d_model // n_heads

        # Fused QKV Projection matrix
        self.qkv_proj = nn.Linear(in_features=d_model, out_features=3*d_model, bias=False)

        # Output projection
        self.out_proj = nn.Linear(in_features=d_model, out_features=d_model, bias=False)

    def forward(self, x):
        #x: (B, T, D)
        B, T, D = x.size()

        qkv = self.qkv_proj(x) # ( B, T, D*3)

        # Reshape to separate heads
        qkv = qkv.view(B, T, 3, self.n_heads, self.head_dim)
        qkv = qkv.permute(2,0,3,1,4) # (3, B, n_heads, T, head_dim)

        # Create the Q K V
        Q, K, V = qkv[0], qkv[1], qkv[2] 

        # Leverage Flash compatible attention
        attn_out = F.scaled_dot_product_attention(
            query=Q, key=K, value=V,
            attn_mask = None,
            dropout_p = 0.0,
            is_causal = True,
        ) # (B, n_heads, T, head_dim)

        # Fuse/merge the heads back together
        attn_out = attn_out.transpose(1, 2).contiguous()

        # Reshape for final output
        attn_out = attn_out.view(B, T, D)

        return self.out_proj(attn_out)



# Setup the FFN
class FFN(nn.Module):
    def __init__(self, d_model=32):
        super(FFN, self).__init__()

        # This /3 has to do with the choice of SwiGLU activation rather than ReLU or GELU and the need to control model representation capacity while maintaing the computation similar to GPT with 4*d_model
        hidden_dim = int(8 * d_model / 3)

        # Setup the parallel projections
        # This also has to do with SwiGLU
        self.ln1 = nn.Linear(in_features=d_model, out_features=hidden_dim, bias=False)
        self.ln2 = nn.Linear(in_features=d_model, out_features=hidden_dim, bias=False)

        # Setup the output projection
        self.ln3 = nn.Linear(in_features=hidden_dim, out_features=d_model, bias=False)
        
    def forward(self, x):
        # x (B, T, D)
        x = F.silu(self.ln1(x) * self.ln2(x))
        x = self.ln3(x)
        return x


# GPT Decoder Block
class DecoderBlock(nn.Module):
    def __init__(self, d_model=32, n_heads=4 ):
        super(DecoderBlock, self).__init__()

        # Setup the norm
        self.norm1 = nn.RMSNorm(normalized_shape=d_model)
        self.mha = MultiHeadAttention(d_model=d_model, n_heads=n_heads)

        self.norm2 = nn.RMSNorm(normalized_shape=d_model)
        self.ffn = FFN(d_model=d_model)


    def forward(self, x):
        # In this case, we are using the pre-norm attention
        # Applying the add and norm before going into self-attention
        x = x + self.mha(self.norm1(x))

        # Apply the second add and norm before going into the FFN
        x = x + self.ffn(self.norm2(x))

        return x


# Setup the GPT
class GPT(nn.Module):
    def __init__(self, vocab_size=0, d_model=32, n_heads=4, n_layers=4):
        super(GPT, self).__init__()

        self.embeddings = GPTEmbeddings(vocab_size=vocab_size, d_model=d_model)

        self.blocks = nn.ModuleList(
            [  DecoderBlock(d_model=d_model, n_heads=n_heads) for _ in range(n_layers) ]
            )

        # Final layernorm before going into the language head
        self.norm = nn.RMSNorm(normalized_shape=d_model)    

        # LM Head
        self.lm_head = nn.Linear(in_features=d_model, out_features=vocab_size, bias=False)

        # Take advantage of weight tying
        self.lm_head.weight = self.embeddings.tok_embeddings.weight    

        # This is to scale the weights, if not the model starts with a very high loss
        self.apply(self._init_weights)


    # Define the weights
    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            nn.init.normal_(module.weight, mean=0.0, std=0.02)

            if module.bias is not None:
                nn.init.zeros_(module.bias)
        
        elif isinstance(module, nn.Embedding):
            nn.init.normal_(module.weight, mean=0, std=0.02)


    def forward(self, x):
        x = self.embeddings(x)

        for block in self.blocks:
           x = block(x)

        # Final norm before going into the language head
        x = self.norm(x)

        # Get the logits
        logits = self.lm_head(x)

        return logits

    
    # Generate sample names
    def _generate(self, idx, max_new_tokens=10, temperature=1, new_line_token: torch.long = 0, top_k=None, top_p=None ):
        # idx: (B, T) starting token indices 
        if temperature <= 0:
            temperature = 0.1

        print(f'==[DEBUG]== Generating ... ')
        # Put the model in eval model
        self.eval()

        for _ in range(max_new_tokens):
            # First crop the context to context window length if needed
            idx_cond = idx[:, -CONTEXT_WINDOW_LENGTH: ]

            # Forward pass to get the logits
            logits = self(idx_cond) # (B, T, vocab_size)

            # Take the logits for the final time sep
            logits = logits[:, -1, :]   # (B, vocab_size)

            # Apply temperature
            logits = logits / temperature

            # Extract the top_k probabilities
            # set everything else to -inf
            if top_k is not None:
                v, _ = torch.topk(logits, top_k)
                logits[logits < v[:, [-1]]] = float('-inf')

            # Set top_p
            if top_p is not None:
                sorted_logits, sorted_indices = torch.sort(logits, descending=True)
                sorted_probs = F.softmax(sorted_logits, dim=-1)
                cumulative_probs = torch.cumsum(sorted_probs, dim=-1)

                sorted_indices_to_remove = cumulative_probs > top_p
                sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone()
                sorted_indices_to_remove[..., 0] = False

                indices_to_remove = sorted_indices_to_remove.scatter(1, sorted_indices, sorted_indices_to_remove )
                logits[indices_to_remove] = float('-inf')

            
            # Convert the logits to probabilities
            probs = F.softmax(logits, dim=-1)

            # Based on the probabilities, sample the next token
            next_token = torch.multinomial(input=probs, num_samples=1, replacement=True) # (B, 1)
            
            # Append to the existing sequence
            idx = torch.cat((idx, next_token), dim=-1)

            # Stop if new line is generated
            #if (next_token == new_line_token).all():
            #    break
        
        return idx



# Configure the optimizer for weight decaying and parameter grouping
def configure_optimizer(model=None, weight_decay=0.1, learning_rate=3e-3, betas=(0.9, 0.95)):

    # Setup two sets to track decay
    decay_params = []
    no_decay_params = []

    # for module in model.modules():
    for name, param in model.named_parameters():
        if not param.requires_grad:
            continue
        
        # Apply weight decay only to linear weights
        if name.endswith('weight') and 'norm' not in name and 'embedding' not in name:
            decay_params.append(param)
        else:
            no_decay_params.append(param)

    # Remove duplicates
    decay_ids = { id(p):p for p in decay_params }
    no_decay_ids = { id(p):p for p in no_decay_params }
    assert set(decay_ids).isdisjoint(set(no_decay_ids))
    
    # Setup our optimizer groups
    optim_groups = [
        { 'params' : decay_params, 'weight_decay' : weight_decay },
        # No decaying these parameters
        { 'params' : no_decay_params, 'weight_decay' : 0.0 }
    ]

    optimizer = torch.optim.AdamW(
        params = optim_groups,
        lr = learning_rate,
        betas = betas
    )
    return optimizer


# Setup the evaluation loop
# Disable gradient tracking
@torch.no_grad()
def estimate_loss(model, X_train=None, X_test=None, vocab_size=None, batch_size=32, eval_iters=50):
    # put the model in eval mode
    model.eval()

    losses = { 'train' : 0, 'test' : 0 }
    
    for split in ['train', 'test']:
        total_loss = 0.0

        for _ in range(eval_iters):
            xb, yb = generate_batch(X_train=X_train, X_test=X_test, batch_size=batch_size)

            logits = model(xb)

            loss = F.cross_entropy(
              input=logits.view(-1, vocab_size), target=yb.view(-1) 
              )
            
            # Track the loss
            total_loss += loss.item()
        
        losses[split] = total_loss / eval_iters 

    model.train()
    return losses



# Define the training loop
def train(model=None, optimizer=None, X_train=None, X_test=None, vocab_size=None, batch_size=64, epochs=10, eval_interval=10, grad_clip=1.0):
    print(f'✅ Beginning training ...')

    model.train()
    for epoch in range(epochs):

        # Evaluate the model periodically
        if epoch % eval_interval == 0:
            losses = estimate_loss(model=model, X_train=X_train, X_test=X_test, vocab_size=vocab_size, batch_size=batch_size)
                        
            print(f'Epoch: {epoch+1} | loss: {losses}')

        # Get Batch
        xb, yb = generate_batch(X_train=X_train, X_test=X_test, split='train')

        # Forward to get the logits
        logits = model(xb)

        # Calculate the loss
        loss = F.cross_entropy(
            input=logits.view(-1, vocab_size), target=yb.view(-1)
            )
        
        # Back propagate
        loss.backward()

        # Clip the gradients
        torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip)

        # Update the parameters
        optimizer.step()

    # Return the model
    return model


def main():
    print(f'🚀 Launching {__file__}')

    # Read the arguments
    file_name = args.filename
    d_model = args.d_model if args.d_model else 32 
    n_heads = args.n_heads if args.n_heads else 4
    n_layers = args.n_layers if args.n_layers else 4
    epochs = args.epochs if args.epochs else 10
    batch_size = args.batch_size if args.batch_size else 64
    temperature = args.temperature if args.temperature else 0.1
    top_k = args.top_k if args.top_k else None
    top_p = args.top_p if args.top_p else None
    #print(f'==[DEBUG]== filename: {file_name} | d_model: {d_model} | n_heads: {n_heads}')

    data = get_data(file_name)
    
    stoi, itos, vocab_size = tokenizer(data=data)
    tokens_encoded = encode_data(tokenizer=stoi, data=data)
    X_train, X_test = train_test_split(tokens=tokens_encoded)
    
    # Setup the model
    model = GPT(vocab_size=vocab_size, d_model=d_model, n_heads=n_heads)

    # get the optimizer
    optimizer = configure_optimizer(model=model, weight_decay=0.1, learning_rate=3e-4)    

    model = train(model=model, optimizer=optimizer, X_train=X_train, X_test=X_test, vocab_size=vocab_size, batch_size=64, epochs=epochs)

    # Generate samples starting from the new line char
    new_line_token = stoi['\n']
    start_token = torch.tensor([[new_line_token]], dtype=torch.long)

    generated = model._generate(idx=start_token, new_line_token=new_line_token, max_new_tokens=50)

    name = ''.join([ itos[i.item()] for i in generated[0] ])
    print(f'{name}')


if __name__ == '__main__':
    main()

After training for 10,000 epochs, here is the result:

🚀 Launching /home/securitynik/stuff/baby_name_gpt.py
🚀 Getting data ...
✅ Successfully read: 228145 bytes of data.
Chars: '\nabcdefghijklmnopqrstuvwxyz'
✅ Vocab size: 27 tokens
🚀 Encoding the data ...
🚀 Splitting into train and test sets ...
✅ X_train.shape: torch.Size([205330]) | X_test.shape: torch.Size([22815]) ...
✅ Beginning training ...

Epoch: 1 | loss: {'train': 3.3060472202301026, 'test': 3.305421471595764}
Epoch: 11 | loss: {'train': 3.1819068813323974, 'test': 3.183610119819641}
...
Epoch: 9971 | loss: {'train': 1.8318881130218505, 'test': 1.8152394461631776}
Epoch: 9981 | loss: {'train': 1.8336570143699646, 'test': 1.8264712977409363}
Epoch: 9991 | loss: {'train': 1.8365000939369203, 'test': 1.8344433832168578}

==[DEBUG]== Generating ... 

mylan
rayona
skaynor
reem
rhil
reiann
sherom
reton

From my perspective, these all look like possible names. 

Well hey, hope you enjoyed this series. Do let me know what you think I could have done differently.

Posts in this series:
   - Git Notebook
   - Git Notebook: 
   - Git Notebook: 


No comments:

Post a Comment