## Morphological inflection with T5 models using CHILDES datasets

- You can either train a t5 model from the scratch or finetune an existing t5 model for the morphological inflection task. You need to try to beat the baseline accuracies mentioned below each language.

- You can choose the language of your choice. We have three datasets for 3 languages.

- The datasets are taken from the SIGMORPHON shared tasks (https://github.com/sigmorphon/2022InflectionST/tree/main/part2). The training sets are sampled weighted by frequency from German and English child-directed speech corpora available from UniMorph with frequencies from the CHILDES database [MacWhinney 2000](https://childes.talkbank.org/access/) such that the smallest training sets contain only the highest frequency words. Arabic is sampled in the same way, but words and their frequencies are taken from the [Penn Arabic Treebank](https://www.marefa.org/images/e/e8/The_penn_arabic_treebank_Building_a_large-scale_an_%281%29.pdf)

- You need to write a summary consisting of:
  - Data description
  - About the problem (e.g., English Tense Debate)
  - Changes you've made to the model
  - Explain the architecture of the model
  - Hypothesize why those changes might have improved the accuracy


### English:

English Past-Tense Debate (e.g., [Marcus et al. 1992](https://www.jstor.org/stable/1166115?seq=1#metadata_info_tab_contents))

Training data: https://raw.githubusercontent.com/sigmorphon/2022InflectionST/refs/heads/main/part2/eng_1000.train

Development data: https://raw.githubusercontent.com/sigmorphon/2022InflectionST/refs/heads/main/part2/eng.dev

Test data: https://raw.githubusercontent.com/sigmorphon/2022InflectionST/refs/heads/main/part2/eng.gold

Basline accuracy: 65%

### German:

German noun plurals, a well-studied challenge case which may have a minority-default pattern ([Clahsen et al. 1992](https://www.sciencedirect.com/science/article/pii/001002779290018D), [Marcus et al. 1995](https://d1wqtxts1xzle7.cloudfront.net/30270110/Marcus_Pinker_et_al_1995_German_Inflection_Cognitive_Psychology-with-cover-page-v2.pdf?Expires=1646265841&Signature=PRNt6JeRUZYQ0KBtfJMzRH3cQPySiWtycYIZqkYPBoxn2-Y3k6zgLMpUHKLE3RFPMajxCT0ReU-~CuADL66-hk7zI9eT6pcoj-jBOTr5Yt4NbjEoHs~o4-AXB6J1sdKcKLqMLH3x6h41Dtnp-tgviym3GV42e6usK0yQyMM9O0KiEY~nWulXAqVFTeY~CL8~0PBYEHXRywsTm6ZOMI7kTZzefyg1ZLGlrGtHcZyHMV4KO0ibT7SddhQgiiuHh6j4jIlCwdxiovf~MPqu5lpJqxDdlOoJS8AktpmsCTipAw4Q2~frNXr1rJ2GM2WBUABjugH0JbBhhvB4TpLzPZ6qrA__&Key-Pair-Id=APKAJLOHF5GGSLRBV4ZA))

Training data: https://raw.githubusercontent.com/sigmorphon/2022InflectionST/refs/heads/main/part2/deu_600.train

Development data: https://raw.githubusercontent.com/sigmorphon/2022InflectionST/refs/heads/main/part2/deu.dev

Test data: https://raw.githubusercontent.com/sigmorphon/2022InflectionST/refs/heads/main/part2/deu.gold

Baseline accuracy: 55%

### Arabic:

Arabic noun plurals, with several competing affixal and templatic patterns ([Ravid & Farah 1999](https://journals.sagepub.com/doi/pdf/10.1177/014272379901905603?casa_token=RHoIAWxOousAAAAA:NpjamGN3dzbA43WuEpZzKbBApqyYol5jI9vqJ3C7NKGigY5nSmm5ZA18sciRfWFESETqXL21chgi), [Dawdy-Hesterberg & Pierrehumbert (2014)](https://www.tandfonline.com/doi/pdf/10.1080/23273798.2014.899377)).

Training data: https://raw.githubusercontent.com/sigmorphon/2022InflectionST/refs/heads/main/part2/ara_900.train

Development data: https://raw.githubusercontent.com/sigmorphon/2022InflectionST/refs/heads/main/part2/ara.dev

Test data: https://raw.githubusercontent.com/sigmorphon/2022InflectionST/refs/heads/main/part2/ara.gold

Baseline accuracy: 43%

## Training T5 model from the scratch

In [None]:
!pip install -U accelerate
!pip install -U transformers
!pip install -U evaluate
!pip install Levenshtein

In [None]:
## Copy-paste the training, development and test data based on what task you've chosen

train_data_path = "https://raw.githubusercontent.com/sigmorphon/2022InflectionST/refs/heads/main/part2/deu_600.train" # fill me
dev_data_path = "https://raw.githubusercontent.com/sigmorphon/2022InflectionST/refs/heads/main/part2/deu.dev" # fill me
test_data_path = "https://raw.githubusercontent.com/sigmorphon/2022InflectionST/refs/heads/main/part2/deu.gold" # fill me

In [None]:
import requests
from typing import List, Dict
from requests.exceptions import RequestException
from IPython.display import display, HTML
import time

def download_data(url: str, max_retries: int = 3, timeout: int = 10) -> List[str]:
    """
    Download and process data from a given URL.

    Args:
        url: The URL to download data from
        max_retries: Maximum number of retry attempts
        timeout: Timeout in seconds for the request

    Returns:
        List of strings, each representing a line from the downloaded data
    """
    for attempt in range(max_retries):
        try:
            response = requests.get(url, timeout=timeout)
            response.raise_for_status()
            data = response.text.strip().split('\n')
            print(f"‚úì Successfully downloaded {len(data)} lines from {url.split('/')[-1]}")
            return data

        except RequestException as e:
            if attempt == max_retries - 1:
                display(HTML(f"<b style='color: red'>Error downloading {url.split('/')[-1]}: {str(e)}</b>"))
                raise
            print(f"‚ö†Ô∏è Attempt {attempt + 1} failed, retrying...")
            time.sleep(2 ** attempt)  # Exponential backoff

# Download and store data
data = {}
try:
    data['train'] = download_data(train_data_path)
    data['dev'] = download_data(dev_data_path)
except Exception as e:
    display(HTML(f"<b style='color: red'>Failed to load datasets: {str(e)}</b>"))

In [None]:
# Display dataset statistics with detailed formatting
def display_data_summary(data: dict) -> None:
    """
    Display summary statistics and sample data in a formatted way.

    Args:
        data: Dictionary containing 'train' and 'dev' datasets
    """
    # Print dataset sizes with formatted output
    print(f"\nüìä Dataset Statistics:")
    print(f"  ‚Ä¢ Training samples: {len(data['train']):,}")
    print(f"  ‚Ä¢ Development samples: {len(data['dev']):,}")

    sample = data['train'][0]
    print(f"\nüìù Sample Data Format:", sample)

# Display the summary
display_data_summary(data)


In [None]:
import re
import regex
from typing import List, Tuple, Set, Dict

def parse_tag(tag: str) -> str:
    """
    Parse a tag string by removing unwanted characters and formatting each tag.

    Args:
        tag: Input tag string containing tags separated by delimiters

    Returns:
        Formatted string with each tag wrapped in angle brackets
    """
    tag = re.sub(r"\)|\(|,|;", ' ', tag).split()
    return ''.join(['<{}>'.format(t) for t in tag])


def preprocess_data(raw_data: List[str]) -> List[Tuple[str, str]]:
    """
    Preprocess raw data by parsing each line into formatted input-target pairs.

    Args:
        raw_data: List of raw data lines (lemma, target, tag separated by tabs)

    Returns:
        List of tuples containing (formatted_input, target)
    """
    preprocessed_data = []
    for line in raw_data:
        try:
            lemma, target, tag = line.split('\t')
            formatted_input = f"{lemma} {parse_tag(tag)}"
            preprocessed_data.append((formatted_input, target))
        except ValueError as e:
            print(f"‚ö†Ô∏è Skipping malformed line: {line} (Error: {e})")

    return preprocessed_data

def extract_vocab(data: List[Tuple[str, str]]) -> Tuple[Set[str], List[str], Dict[str, int]]:
    """
    Extract vocabulary, tags, and character mappings from preprocessed data.

    Args:
        data: List of preprocessed (input, target) pairs

    Returns:
        Tuple containing:
        - Set of unique characters
        - List of unique tags
        - Dictionary mapping characters to indices
    """
    # Extract characters from lemmas and targets
    chars = set(''.join([d[0].split()[0] + d[1] for d in data]))

    # Create character to index mapping
    char2id = {char: idx for idx, char in enumerate(sorted(chars))}

    # Extract unique tags
    tags = list(set(regex.findall(r"<[A-Za-z0-9]+>", ' '.join(d[0] for d in data))))

    return chars, tags, char2id

# Process the data
data['train'] = preprocess_data(data['train'])
data['dev'] = preprocess_data(data['dev'])

# Display sample of preprocessed data
print(f"\nüìù Preprocessed sample:")
print(f"  {data['train'][0]}")

# Extract vocabulary and mappings
chars, tags, char2id = extract_vocab(data['train'])


In [None]:
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Tuple, Dict, List, Union
from transformers import PreTrainedTokenizer
from transformers.tokenization_utils import AddedToken
import json
import warnings

@dataclass
class TokenizerConfig:
    """Configuration for CustomTokenizer"""
    bos_token: str = "<s>"
    eos_token: str = "</s>"
    unk_token: str = "<unk>"
    pad_token: str = "<pad>"
    max_len: int = 512

class CustomTokenizer(PreTrainedTokenizer):
    """
    Custom tokenizer for character-level tokenization with special handling for morphosyntactic features.

    Processes input and output word forms character by character while treating morphosyntactic
    features as special atomic tokens.
    """

    model_input_names = ["input_ids", "attention_mask"]

    def __init__(
        self,
        vocab: Dict[str, int],
        additional_special_tokens: Optional[List[str]] = None,
        max_len: int = 512,
        **kwargs
    ) -> None:
        """
        Initialize the tokenizer.

        Args:
            vocab: Dictionary mapping characters to their IDs
            additional_special_tokens: List of special tokens (e.g., morphological tags)
            max_len: Maximum sequence length
        """
        config = TokenizerConfig()

        # Initialize token mappings
        self.__token_ids = vocab.copy()
        self.__id_tokens = {v: k for k, v in vocab.items()}

        # Process special tokens
        special_tokens = {
            'pad_token': self._create_added_token(config.pad_token),
            'bos_token': self._create_added_token(config.bos_token),
            'eos_token': self._create_added_token(config.eos_token),
            'unk_token': self._create_added_token(config.unk_token)
        }

        # Initialize special token decoder
        self._added_tokens_decoder = {
            i: token for i, token in enumerate(special_tokens.values())
        }
        self.offset = len(self._added_tokens_decoder)

        super().__init__(
            **special_tokens,
            additional_special_tokens=additional_special_tokens,
            max_len=max_len,
            **kwargs
        )

    @staticmethod
    def _create_added_token(token: Union[str, AddedToken]) -> AddedToken:
        """Create an AddedToken with consistent settings."""
        return AddedToken(token, lstrip=False, rstrip=False) if isinstance(token, str) else token

    @property
    def vocab_size(self) -> int:
        """Get the size of the vocabulary."""
        return len(self.__token_ids)

    def get_vocab(self) -> Dict[str, int]:
        """Get the full vocabulary including special tokens."""
        vocab = {
            self.convert_ids_to_tokens(i): i
            for i in range(self.vocab_size + self.offset)
        }
        vocab.update(self.added_tokens_encoder)
        return vocab

    def _add_eos(self, token_ids: List[int]) -> List[int]:
        """Add end-of-sequence token to the token list."""
        return token_ids + [self.eos_token_id]

    def create_token_type_ids_from_sequences(
        self,
        token_ids_0: List[int],
        token_ids_1: Optional[List[int]] = None
    ) -> List[int]:
        """Create token type IDs for single or paired sequences."""
        total_length = len(token_ids_0) + 1  # +1 for EOS
        if token_ids_1:
            total_length += len(token_ids_1) + 1
        return [0] * total_length

    def build_inputs_with_special_tokens(
        self,
        token_ids_0: List[int],
        token_ids_1: Optional[List[int]] = None
    ) -> List[int]:
        """Build model inputs by adding special tokens."""
        token_ids_0 = self._add_eos(token_ids_0)
        if token_ids_1 is None:
            return token_ids_0
        return token_ids_0 + self._add_eos(token_ids_1)

    def _tokenize(self, text: str, **kwargs) -> List[str]:
        """Tokenize text into characters."""
        return list(text)

    def _convert_token_to_id(self, token: str) -> int:
        """Convert a token to its ID."""
        return (self.__token_ids.get(token, self.unk_token_id - self.offset)
                + self.offset)

    def _convert_id_to_token(self, index: int) -> str:
        """Convert an ID back to its token."""
        adjusted_index = index - self.offset
        return self.__id_tokens.get(adjusted_index, self.unk_token)

    def convert_tokens_to_string(self, tokens: List[str]) -> str:
        """Convert tokens back to string."""
        return "".join(tokens)

    def save_vocabulary(self,
                       save_directory: str,
                       filename_prefix: Optional[str] = None) -> Tuple[str]:
        """Save the vocabulary to a file."""
        prefix = filename_prefix or ""
        vocab_path = Path(save_directory) / f"{prefix}vocab.json"

        with vocab_path.open('w', encoding='utf-8') as f:
            json.dump(self.__token_ids, f, ensure_ascii=False, indent=2)

        return (str(vocab_path),)

# Initialize tokenizer with vocabulary and tags
tokenizer = CustomTokenizer(char2id, additional_special_tokens=tags, max_len=100)

# Display example tokenization
sample_idx = 54
sample_text = data['train'][sample_idx][0]
tokens = tokenizer.tokenize(sample_text)
print(f"Tokenization Example:")
print(f"Input text: {sample_text}")
print(f"Tokens: {tokens}")


In [None]:
from transformers import T5Config, T5ForConditionalGeneration
from typing import Optional

def create_t5_model(
    tokenizer,
    d_ff: int = 1024,
    d_model: int = 256,
    num_layers: int = 4,
    num_heads: int = 4,
    dropout_rate: float = 0.2,
    max_new_tokens: int = 32,
    d_kv: Optional[int] = None
) -> T5ForConditionalGeneration:
    """
    Creates and configures a T5 model for morphological generation.

    Args:
        tokenizer: Custom tokenizer for the model
        d_ff: Dimension of the feed-forward layer
        d_model: Dimension of the model embeddings
        num_layers: Number of encoder and decoder layers
        num_heads: Number of attention heads
        dropout_rate: Dropout rate for regularization
        max_new_tokens: Maximum number of tokens to generate
        d_kv: Dimension of key and value vectors (defaults to d_model // num_heads)

    Returns:
        Configured T5ForConditionalGeneration model
    """
    # Calculate default d_kv if not provided
    if d_kv is None:
        d_kv = d_model // num_heads

    # Configure model architecture
    config = T5Config(
        d_ff=d_ff,
        d_model=d_model,
        d_kv=d_kv,
        num_layers=num_layers,
        num_decoder_layers=num_layers,  # Match encoder layers
        num_heads=num_heads,
        dropout_rate=dropout_rate,
        vocab_size=len(tokenizer),
        is_encoder_decoder=True,
        pad_token_id=tokenizer.pad_token_id,
        eos_token_id=tokenizer.eos_token_id,
        bos_token_id=tokenizer.bos_token_id
    )

    # Initialize model
    model = T5ForConditionalGeneration(config)

    # Configure generation parameters
    model.config.decoder_start_token_id = tokenizer.bos_token_id
    model.generation_config.decoder_start_token_id = tokenizer.bos_token_id
    model.generation_config.max_new_tokens = max_new_tokens
    model.generation_config.eos_token_id = tokenizer.eos_token_id
    model.generation_config.no_repeat_ngram_size = 2
    model.generation_config.length_penalty = 1.0
    model.generation_config.num_beams = 10

    return model

# Initialize the model with default parameters
model = create_t5_model(tokenizer)


In [None]:
from torch.utils.data import Dataset
from typing import List, Dict, Tuple, Union
import numpy as np
import torch

class CustomDataset(Dataset):
    """
    Custom dataset for morphological generation task.
    Handles tokenization of input lemmas and target forms.
    """

    def __init__(
        self,
        data: List[Tuple[str, str]],
        tokenizer,
        max_length: int = 128
    ) -> None:
        """
        Initialize dataset.

        Args:
            data: List of (input, target) string pairs
            tokenizer: Tokenizer for encoding text
            max_length: Maximum sequence length
        """
        self.data = data
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self) -> int:
        """Return dataset size."""
        return len(self.data)

    def __getitem__(self, idx: int) -> Dict[str, torch.Tensor]:
        """
        Get tokenized item from dataset.

        Args:
            idx: Index of item to retrieve

        Returns:
            Dictionary with input_ids and labels tensors
        """
        input_text, target_text = self.data[idx]

        # Tokenize input and target
        model_inputs = self.tokenizer(
            input_text,
            padding='longest',
            truncation=True,
            max_length=self.max_length,
            add_special_tokens=True,
            return_tensors='pt'
        )

        labels = self.tokenizer(
            target_text,
            padding='longest',
            truncation=True,
            max_length=self.max_length,
            add_special_tokens=True,
            return_tensors='pt'
        )

        return {
            "input_ids": model_inputs["input_ids"].squeeze(0),
            "attention_mask": model_inputs["attention_mask"].squeeze(0),
            "labels": labels["input_ids"].squeeze(0)
        }

def postprocess_data(
    token_ids: Union[np.ndarray, torch.Tensor],
    tokenizer
) -> List[str]:
    """
    Post-process token IDs into readable text.

    Args:
        token_ids: Array/Tensor of token IDs
        tokenizer: Tokenizer for decoding

    Returns:
        List of decoded strings with special tokens removed
    """
    # Convert torch tensor to numpy if needed
    if isinstance(token_ids, torch.Tensor):
        token_ids = token_ids.cpu().numpy()

    # Replace padding indices
    token_ids = np.where(token_ids != -100, token_ids, tokenizer.pad_token_id)

    # Decode tokens to strings
    return tokenizer.batch_decode(token_ids, skip_special_tokens=True)


In [None]:
"""
Evaluation metrics for morphological generation task.
Includes exact match scoring and detailed error analysis.
"""

import evaluate
import numpy as np
from typing import Dict, List, Tuple, Any
import random
from dataclasses import dataclass
import Levenshtein
from collections import defaultdict

@dataclass
class PredictionExample:
    """Store prediction examples for analysis"""
    input: str
    prediction: str
    target: str
    is_correct: bool
    edit_distance: int

class MetricsComputer:
    """Compute and analyze model predictions"""

    def __init__(self, tokenizer, sample_size: int = 15):
        """
        Initialize metrics computer.

        Args:
            tokenizer: Tokenizer for decoding predictions
            sample_size: Number of random examples to sample for analysis
        """
        self.metric = evaluate.load("exact_match")
        self.tokenizer = tokenizer
        self.sample_size = sample_size
        self.error_stats = defaultdict(int)

    def compute_edit_distance(self, pred: str, target: str) -> int:
        """Compute Levenshtein distance between prediction and target"""
        return Levenshtein.distance(pred, target)

    def analyze_predictions(
        self,
        decoded_preds: List[str],
        decoded_labels: List[str],
        inputs: List[str] = None
    ) -> List[PredictionExample]:
        """
        Analyze a sample of predictions.

        Args:
            decoded_preds: List of model predictions
            decoded_labels: List of target labels
            inputs: Optional list of input texts

        Returns:
            List of PredictionExample objects
        """
        sample_indices = random.sample(range(len(decoded_preds)),
                                     min(self.sample_size, len(decoded_preds)))

        examples = []
        for idx in sample_indices:
            pred = decoded_preds[idx]
            target = decoded_labels[idx]
            input_text = inputs[idx] if inputs else ""

            example = PredictionExample(
                input=input_text,
                prediction=pred,
                target=target,
                is_correct=pred == target,
                edit_distance=self.compute_edit_distance(pred, target)
            )
            examples.append(example)

            # Track error types
            if not example.is_correct:
                self.error_stats['total_errors'] += 1
                if len(pred) != len(target):
                    self.error_stats['length_mismatch'] += 1
                if pred == target[::-1]:
                    self.error_stats['reversed'] += 1

        return examples

    def compute_metrics(self, eval_preds: Tuple[np.ndarray, np.ndarray]) -> Dict[str, float]:
        """
        Compute evaluation metrics for predictions.

        Args:
            eval_preds: Tuple of (predictions, labels) arrays

        Returns:
            Dictionary of metric scores
        """
        preds, labels = eval_preds
        decoded_preds = postprocess_data(preds, self.tokenizer)
        decoded_labels = postprocess_data(labels, self.tokenizer)

        # Compute exact match score
        exact_match = self.metric.compute(
            predictions=decoded_preds,
            references=decoded_labels
        )

        # Analyze sample predictions
        examples = self.analyze_predictions(decoded_preds, decoded_labels)

        # Compute additional metrics
        edit_distances = [ex.edit_distance for ex in examples]

        metrics = {
            "accuracy": exact_match["exact_match"],
        }

        # Print analysis examples
        self._print_analysis(examples)

        return metrics

    def _print_analysis(self, examples: List[PredictionExample]) -> None:
        """Print detailed analysis of prediction examples"""
        print("\nüìä Prediction Analysis:")
        print(f"Analyzing {len(examples)} random examples...")
        print("\nDetailed Examples:")

        for i, ex in enumerate(examples, 1):
            status = "‚úì" if ex.is_correct else "‚úó"
            print(f"\n{i}. {status} Input: {ex.input}")
            print(f"   Prediction: {ex.prediction}")
            print(f"   Target: {ex.target}")
            print(f"   Edit Distance: {ex.edit_distance}")

        print("\nError Statistics:")
        for error_type, count in self.error_stats.items():
            print(f"  ‚Ä¢ {error_type}: {count}")

# Initialize metrics computer
metrics_computer = MetricsComputer(tokenizer)

# Use for evaluation
compute_metrics = metrics_computer.compute_metrics


In [None]:
import warnings
from transformers import utils

# Monkey patch the deprecated warning in transformers utils
utils.deprecation_warning = lambda *args, **kwargs: None

from transformers import DataCollatorForSeq2Seq, Seq2SeqTrainingArguments, Seq2SeqTrainer
from pathlib import Path

# Create datasets with proper batch size
dataset = {
    'train': CustomDataset(data['train'], tokenizer, max_length=128),
    'dev': CustomDataset(data['dev'], tokenizer, max_length=128)
}

# Initialize data collator for sequence-to-sequence task
data_collator = DataCollatorForSeq2Seq(
    tokenizer=tokenizer,
    model=model,
    padding=True,
    label_pad_token_id=tokenizer.pad_token_id
)

# Define training arguments with optimized parameters
training_args = Seq2SeqTrainingArguments(
    output_dir=Path('model_checkpoints'),
    max_steps=2000,
    per_device_train_batch_size=32,
    per_device_eval_batch_size=32,
    learning_rate=5e-5,
    lr_scheduler_type='inverse_sqrt',
    warmup_steps=2000,
    adam_beta1=0.9,
    adam_beta2=0.98,
    adam_epsilon=1e-6,
    label_smoothing_factor=0.1,

    # Evaluation settings
    evaluation_strategy="steps",
    eval_steps=200,
    eval_delay=200,
    metric_for_best_model='exact_match',

    # Checkpointing
    save_strategy="steps",
    save_steps=5000,
    save_total_limit=3,  # Keep more checkpoints

    # Generation settings
    gradient_accumulation_steps=4,
    predict_with_generate=True,
    generation_num_beams=5,

    fp16=True,
    gradient_checkpointing=True,

    # Logging
    logging_strategy="steps",
    logging_steps=100,
    logging_first_step=True,

    # Other settings
    overwrite_output_dir=True,
    load_best_model_at_end=True,
    report_to="none",
    seed=42,
)

# Initialize trainer
trainer = Seq2SeqTrainer(
    model=model,
    args=training_args,
    data_collator=data_collator,
    train_dataset=dataset['train'],
    eval_dataset=dataset['dev'],
    processing_class=tokenizer,
    compute_metrics=compute_metrics
)

# Train the model
trainer.train()


In [None]:
from typing import Dict, List
import time
from tqdm import tqdm

def evaluate_model(
    trainer,
    test_path: str,
    download_data,
    preprocess_data,
    tokenizer,
    batch_size: int = 32
) -> Dict[str, float]:
    """
    Evaluate model performance on test data with detailed metrics.

    Args:
        trainer: Seq2SeqTrainer instance
        test_path: URL to test data
        batch_size: Batch size for evaluation
    """
    try:
        # Load and preprocess test data
        print("Loading test data...")
        test_data = download_data(test_path)
        processed_data = preprocess_data(test_data)
        test_dataset = CustomDataset(processed_data, tokenizer)

        # Run evaluation
        print("Running evaluation...")
        start_time = time.time()
        result = trainer.evaluate(
            test_dataset,
            max_length=128,
            num_beams=5,
            metric_key_prefix="test"
        )
        eval_time = time.time() - start_time

        # Add additional metrics
        result.update({
            "test_samples": len(test_dataset),
            "eval_time_seconds": round(eval_time, 2),
            "eval_samples_per_second": round(len(test_dataset) / eval_time, 2)
        })

        # Print detailed results
        print("\nüìà Evaluation Results:")
        for metric, value in result.items():
            print(f"  ‚Ä¢ {metric}: {value:.4f}")

        return result

    except Exception as e:
        print(f"‚ùå Evaluation failed: {str(e)}")
        raise

# Run evaluation
evaluation_results = evaluate_model(
    trainer=trainer,
    test_path=test_data_path,
    download_data=download_data,
    preprocess_data=preprocess_data,
    tokenizer=tokenizer
)

# Store test accuracy
test_accuracy = evaluation_results['test_exact_match']
print(test_accuracy)

## Finetuning T5 model

In [None]:
from transformers import (
    AutoTokenizer,          # For automatic tokenizer loading based on model name
    DataCollatorForSeq2Seq, # Handles batching and padding for sequence-to-sequence tasks
    AutoModelForSeq2SeqLM,  # For automatic loading of sequence-to-sequence models
    Seq2SeqTrainingArguments, # Contains training configuration
    Seq2SeqTrainer,         # Handles the training loop for sequence-to-sequence models
    pipeline               # Provides easy-to-use interfaces for various NLP tasks
)


In [None]:
import requests
from typing import List, Dict
from requests.exceptions import RequestException
from IPython.display import display, HTML
import time

def download_data(url: str, max_retries: int = 3, timeout: int = 10) -> List[str]:
    """
    Download and process data from a given URL.

    Args:
        url: The URL to download data from
        max_retries: Maximum number of retry attempts
        timeout: Timeout in seconds for the request

    Returns:
        List of strings, each representing a line from the downloaded data
    """
    for attempt in range(max_retries):
        try:
            response = requests.get(url, timeout=timeout)
            response.raise_for_status()
            data = response.text.strip().split('\n')
            print(f"‚úì Successfully downloaded {len(data)} lines from {url.split('/')[-1]}")
            return data

        except RequestException as e:
            if attempt == max_retries - 1:
                display(HTML(f"<b style='color: red'>Error downloading {url.split('/')[-1]}: {str(e)}</b>"))
                raise
            print(f"‚ö†Ô∏è Attempt {attempt + 1} failed, retrying...")
            time.sleep(2 ** attempt)  # Exponential backoff

# Download and store data
data = {}
try:
    data['train'] = download_data(train_data_path)
    data['dev'] = download_data(dev_data_path)
except Exception as e:
    display(HTML(f"<b style='color: red'>Failed to load datasets: {str(e)}</b>"))

In [None]:
# Display dataset statistics with detailed formatting
def display_data_summary(data: dict) -> None:
    """
    Display summary statistics and sample data in a formatted way.

    Args:
        data: Dictionary containing 'train' and 'dev' datasets
    """
    # Print dataset sizes with formatted output
    print(f"\nüìä Dataset Statistics:")
    print(f"  ‚Ä¢ Training samples: {len(data['train']):,}")
    print(f"  ‚Ä¢ Development samples: {len(data['dev']):,}")

    sample = data['train'][0]
    print(f"\nüìù Sample Data Format:", sample)

# Display the summary
display_data_summary(data)


In [None]:
import re
import regex
from typing import List, Tuple, Set, Dict

def parse_tag(tag: str) -> str:
    """
    Parse a tag string by removing unwanted characters and formatting each tag.

    Args:
        tag: Input tag string containing tags separated by delimiters

    Returns:
        Formatted string with each tag wrapped in angle brackets
    """
    tag = re.sub(r"\)|\(|,|;", ' ', tag).split()
    return ''.join(['<{}>'.format(t) for t in tag])


def preprocess_data(raw_data: List[str]) -> List[Tuple[str, str]]:
    """
    Preprocess raw data by parsing each line into formatted input-target pairs.

    Args:
        raw_data: List of raw data lines (lemma, target, tag separated by tabs)

    Returns:
        List of tuples containing (formatted_input, target)
    """
    preprocessed_data = []
    for line in raw_data:
        try:
            lemma, target, tag = line.split('\t')
            formatted_input = f"{lemma} {parse_tag(tag)}"
            preprocessed_data.append((formatted_input, target))
        except ValueError as e:
            print(f"‚ö†Ô∏è Skipping malformed line: {line} (Error: {e})")

    return preprocessed_data

def extract_vocab(data: List[Tuple[str, str]]) -> Tuple[Set[str], List[str], Dict[str, int]]:
    """
    Extract vocabulary, tags, and character mappings from preprocessed data.

    Args:
        data: List of preprocessed (input, target) pairs

    Returns:
        Tuple containing:
        - Set of unique characters
        - List of unique tags
        - Dictionary mapping characters to indices
    """
    # Extract characters from lemmas and targets
    chars = set(''.join([d[0].split()[0] + d[1] for d in data]))

    # Create character to index mapping
    char2id = {char: idx for idx, char in enumerate(sorted(chars))}

    # Extract unique tags
    tags = list(set(regex.findall(r"<[A-Za-z0-9]+>", ' '.join(d[0] for d in data))))

    return chars, tags, char2id

# Process the data
data['train'] = preprocess_data(data['train'])
data['dev'] = preprocess_data(data['dev'])

# Display sample of preprocessed data
print(f"\nüìù Preprocessed sample:")
print(f"  {data['train'][0]}")

# Extract vocabulary and mappings
chars, tags, char2id = extract_vocab(data['train'])


In [None]:
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Tuple, Dict, List, Union
from transformers import PreTrainedTokenizer
from transformers.tokenization_utils import AddedToken
import json
import warnings

@dataclass
class TokenizerConfig:
    """Configuration for CustomTokenizer"""
    bos_token: str = "<s>"
    eos_token: str = "</s>"
    unk_token: str = "<unk>"
    pad_token: str = "<pad>"
    max_len: int = 512

class CustomTokenizer(PreTrainedTokenizer):
    """
    Custom tokenizer for character-level tokenization with special handling for morphosyntactic features.

    Processes input and output word forms character by character while treating morphosyntactic
    features as special atomic tokens.
    """

    model_input_names = ["input_ids", "attention_mask"]

    def __init__(
        self,
        vocab: Dict[str, int],
        additional_special_tokens: Optional[List[str]] = None,
        max_len: int = 512,
        **kwargs
    ) -> None:
        """
        Initialize the tokenizer.

        Args:
            vocab: Dictionary mapping characters to their IDs
            additional_special_tokens: List of special tokens (e.g., morphological tags)
            max_len: Maximum sequence length
        """
        config = TokenizerConfig()

        # Initialize token mappings
        self.__token_ids = vocab.copy()
        self.__id_tokens = {v: k for k, v in vocab.items()}

        # Process special tokens
        special_tokens = {
            'pad_token': self._create_added_token(config.pad_token),
            'bos_token': self._create_added_token(config.bos_token),
            'eos_token': self._create_added_token(config.eos_token),
            'unk_token': self._create_added_token(config.unk_token)
        }

        # Initialize special token decoder
        self._added_tokens_decoder = {
            i: token for i, token in enumerate(special_tokens.values())
        }
        self.offset = len(self._added_tokens_decoder)

        super().__init__(
            **special_tokens,
            additional_special_tokens=additional_special_tokens,
            max_len=max_len,
            **kwargs
        )

    @staticmethod
    def _create_added_token(token: Union[str, AddedToken]) -> AddedToken:
        """Create an AddedToken with consistent settings."""
        return AddedToken(token, lstrip=False, rstrip=False) if isinstance(token, str) else token

    @property
    def vocab_size(self) -> int:
        """Get the size of the vocabulary."""
        return len(self.__token_ids)

    def get_vocab(self) -> Dict[str, int]:
        """Get the full vocabulary including special tokens."""
        vocab = {
            self.convert_ids_to_tokens(i): i
            for i in range(self.vocab_size + self.offset)
        }
        vocab.update(self.added_tokens_encoder)
        return vocab

    def _add_eos(self, token_ids: List[int]) -> List[int]:
        """Add end-of-sequence token to the token list."""
        return token_ids + [self.eos_token_id]

    def create_token_type_ids_from_sequences(
        self,
        token_ids_0: List[int],
        token_ids_1: Optional[List[int]] = None
    ) -> List[int]:
        """Create token type IDs for single or paired sequences."""
        total_length = len(token_ids_0) + 1  # +1 for EOS
        if token_ids_1:
            total_length += len(token_ids_1) + 1
        return [0] * total_length

    def build_inputs_with_special_tokens(
        self,
        token_ids_0: List[int],
        token_ids_1: Optional[List[int]] = None
    ) -> List[int]:
        """Build model inputs by adding special tokens."""
        token_ids_0 = self._add_eos(token_ids_0)
        if token_ids_1 is None:
            return token_ids_0
        return token_ids_0 + self._add_eos(token_ids_1)

    def _tokenize(self, text: str, **kwargs) -> List[str]:
        """Tokenize text into characters."""
        return list(text)

    def _convert_token_to_id(self, token: str) -> int:
        """Convert a token to its ID."""
        return (self.__token_ids.get(token, self.unk_token_id - self.offset)
                + self.offset)

    def _convert_id_to_token(self, index: int) -> str:
        """Convert an ID back to its token."""
        adjusted_index = index - self.offset
        return self.__id_tokens.get(adjusted_index, self.unk_token)

    def convert_tokens_to_string(self, tokens: List[str]) -> str:
        """Convert tokens back to string."""
        return "".join(tokens)

    def save_vocabulary(self,
                       save_directory: str,
                       filename_prefix: Optional[str] = None) -> Tuple[str]:
        """Save the vocabulary to a file."""
        prefix = filename_prefix or ""
        vocab_path = Path(save_directory) / f"{prefix}vocab.json"

        with vocab_path.open('w', encoding='utf-8') as f:
            json.dump(self.__token_ids, f, ensure_ascii=False, indent=2)

        return (str(vocab_path),)

# Initialize tokenizer with vocabulary and tags
tokenizer = CustomTokenizer(char2id, additional_special_tokens=tags, max_len=100)

# Display example tokenization
sample_idx = 54
sample_text = data['train'][sample_idx][0]
tokens = tokenizer.tokenize(sample_text)
print(f"Tokenization Example:")
print(f"Input text: {sample_text}")
print(f"Tokens: {tokens}")


In [None]:
from torch.utils.data import Dataset
from typing import List, Dict, Tuple, Union
import numpy as np
import torch

class CustomDataset(Dataset):
    """
    Custom dataset for morphological generation task.
    Handles tokenization of input lemmas and target forms.
    """

    def __init__(
        self,
        data: List[Tuple[str, str]],
        tokenizer,
        max_length: int = 128
    ) -> None:
        """
        Initialize dataset.

        Args:
            data: List of (input, target) string pairs
            tokenizer: Tokenizer for encoding text
            max_length: Maximum sequence length
        """
        self.data = data
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self) -> int:
        """Return dataset size."""
        return len(self.data)

    def __getitem__(self, idx: int) -> Dict[str, torch.Tensor]:
        """
        Get tokenized item from dataset.

        Args:
            idx: Index of item to retrieve

        Returns:
            Dictionary with input_ids and labels tensors
        """
        input_text, target_text = self.data[idx]

        # Tokenize input and target
        model_inputs = self.tokenizer(
            input_text,
            padding='longest',
            truncation=True,
            max_length=self.max_length,
            add_special_tokens=True,
            return_tensors='pt'
        )

        labels = self.tokenizer(
            target_text,
            padding='longest',
            truncation=True,
            max_length=self.max_length,
            add_special_tokens=True,
            return_tensors='pt'
        )

        return {
            "input_ids": model_inputs["input_ids"].squeeze(0),
            "attention_mask": model_inputs["attention_mask"].squeeze(0),
            "labels": labels["input_ids"].squeeze(0)
        }

def postprocess_data(
    token_ids: Union[np.ndarray, torch.Tensor],
    tokenizer
) -> List[str]:
    """
    Post-process token IDs into readable text.

    Args:
        token_ids: Array/Tensor of token IDs
        tokenizer: Tokenizer for decoding

    Returns:
        List of decoded strings with special tokens removed
    """
    # Convert torch tensor to numpy if needed
    if isinstance(token_ids, torch.Tensor):
        token_ids = token_ids.cpu().numpy()

    # Replace padding indices
    token_ids = np.where(token_ids != -100, token_ids, tokenizer.pad_token_id)

    # Decode tokens to strings
    return tokenizer.batch_decode(token_ids, skip_special_tokens=True)


In [None]:
"""
Evaluation metrics for morphological generation task.
Includes exact match scoring and detailed error analysis.
"""

import evaluate
import numpy as np
from typing import Dict, List, Tuple, Any
import random
from dataclasses import dataclass
import Levenshtein
from collections import defaultdict

@dataclass
class PredictionExample:
    """Store prediction examples for analysis"""
    input: str
    prediction: str
    target: str
    is_correct: bool
    edit_distance: int

class MetricsComputer:
    """Compute and analyze model predictions"""

    def __init__(self, tokenizer, sample_size: int = 15):
        """
        Initialize metrics computer.

        Args:
            tokenizer: Tokenizer for decoding predictions
            sample_size: Number of random examples to sample for analysis
        """
        self.metric = evaluate.load("exact_match")
        self.tokenizer = tokenizer
        self.sample_size = sample_size
        self.error_stats = defaultdict(int)

    def compute_edit_distance(self, pred: str, target: str) -> int:
        """Compute Levenshtein distance between prediction and target"""
        return Levenshtein.distance(pred, target)

    def analyze_predictions(
        self,
        decoded_preds: List[str],
        decoded_labels: List[str],
        inputs: List[str] = None
    ) -> List[PredictionExample]:
        """
        Analyze a sample of predictions.

        Args:
            decoded_preds: List of model predictions
            decoded_labels: List of target labels
            inputs: Optional list of input texts

        Returns:
            List of PredictionExample objects
        """
        sample_indices = random.sample(range(len(decoded_preds)),
                                     min(self.sample_size, len(decoded_preds)))

        examples = []
        for idx in sample_indices:
            pred = decoded_preds[idx]
            target = decoded_labels[idx]
            input_text = inputs[idx] if inputs else ""

            example = PredictionExample(
                input=input_text,
                prediction=pred,
                target=target,
                is_correct=pred == target,
                edit_distance=self.compute_edit_distance(pred, target)
            )
            examples.append(example)

            # Track error types
            if not example.is_correct:
                self.error_stats['total_errors'] += 1
                if len(pred) != len(target):
                    self.error_stats['length_mismatch'] += 1
                if pred == target[::-1]:
                    self.error_stats['reversed'] += 1

        return examples

    def compute_metrics(self, eval_preds: Tuple[np.ndarray, np.ndarray]) -> Dict[str, float]:
        """
        Compute evaluation metrics for predictions.

        Args:
            eval_preds: Tuple of (predictions, labels) arrays

        Returns:
            Dictionary of metric scores
        """
        preds, labels = eval_preds
        decoded_preds = postprocess_data(preds, self.tokenizer)
        decoded_labels = postprocess_data(labels, self.tokenizer)

        # Compute exact match score
        exact_match = self.metric.compute(
            predictions=decoded_preds,
            references=decoded_labels
        )

        # Analyze sample predictions
        examples = self.analyze_predictions(decoded_preds, decoded_labels)

        # Compute additional metrics
        edit_distances = [ex.edit_distance for ex in examples]

        metrics = {
            "exact_match": exact_match["exact_match"],
            "avg_edit_distance": np.mean(edit_distances),
            "max_edit_distance": max(edit_distances),
            "error_rate": self.error_stats['total_errors'] / len(examples)
        }

        # Print analysis examples
        self._print_analysis(examples)

        return metrics

    def _print_analysis(self, examples: List[PredictionExample]) -> None:
        """Print detailed analysis of prediction examples"""
        print("\nüìä Prediction Analysis:")
        print(f"Analyzing {len(examples)} random examples...")
        print("\nDetailed Examples:")

        for i, ex in enumerate(examples, 1):
            status = "‚úì" if ex.is_correct else "‚úó"
            print(f"\n{i}. {status} Input: {ex.input}")
            print(f"   Prediction: {ex.prediction}")
            print(f"   Target: {ex.target}")
            print(f"   Edit Distance: {ex.edit_distance}")

        print("\nError Statistics:")
        for error_type, count in self.error_stats.items():
            print(f"  ‚Ä¢ {error_type}: {count}")

# Initialize metrics computer
metrics_computer = MetricsComputer(tokenizer)

# Use for evaluation
compute_metrics = metrics_computer.compute_metrics


In [None]:
dataset = {
    'train': CustomDataset(data['train'], tokenizer, max_length=128),
    'dev': CustomDataset(data['dev'], tokenizer, max_length=128)
}

In [None]:
# Load the T5-small model and move it to GPU (device 0)
model = AutoModelForSeq2SeqLM.from_pretrained("t5-small")  # Load pre-trained T5 model
model.to(0)  # Move model to first GPU (device ID 0)

In [None]:
data_collator = DataCollatorForSeq2Seq(
    tokenizer=tokenizer,  # The tokenizer used to process text
    model=model           # The T5 model being used
)

In [None]:
# Configure training arguments for fine-tuning the T5 model
training_args = Seq2SeqTrainingArguments(
    # Directory to save model checkpoints and logs
    output_dir="/content/drive/My Drive/morphological_inflection_t5",

    # Evaluate model after each epoch
    evaluation_strategy="epoch",

    # Learning rate for optimization
    learning_rate=2e-5,

    # Batch sizes for training and evaluation
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,

    # L2 regularization factor (This penalty encourages the model to use smaller weights during training)
    weight_decay=0.01,

    # Keep only the last 3 checkpoints
    save_total_limit=3,

    # Number of training epochs
    num_train_epochs=300,

    # Enable text generation during evaluation
    predict_with_generate=True,

    # Enable mixed precision training (faster training, less memory)
    fp16=True,

    # Disable external reporting
    report_to="none"
)

In [None]:
# Initialize the Sequence-to-Sequence Trainer
trainer = Seq2SeqTrainer(
    model=model,                                    # Pre-trained T5 model
    args=training_args,                          # Training configuration
    train_dataset=dataset['train'],
    eval_dataset=dataset['dev'],
    tokenizer=tokenizer,                         # Tokenizer for text processing
    data_collator=data_collator,                 # Handles batch preparation
    compute_metrics=compute_metrics,             # Evaluation metric function
)

In [None]:
trainer.train()

In [None]:
from typing import Dict, List
import time
from tqdm import tqdm

def evaluate_model(
    trainer,
    test_path: str,
    download_data,
    preprocess_data,
    tokenizer,
    batch_size: int = 32
) -> Dict[str, float]:
    """
    Evaluate model performance on test data with detailed metrics.

    Args:
        trainer: Seq2SeqTrainer instance
        test_path: URL to test data
        batch_size: Batch size for evaluation
    """
    try:
        # Load and preprocess test data
        print("Loading test data...")
        test_data = download_data(test_path)
        processed_data = preprocess_data(test_data)
        test_dataset = CustomDataset(processed_data, tokenizer)

        # Run evaluation
        print("Running evaluation...")
        start_time = time.time()
        result = trainer.evaluate(
            test_dataset,
            max_length=128,
            num_beams=5,
            metric_key_prefix="test"
        )
        eval_time = time.time() - start_time

        # Add additional metrics
        result.update({
            "test_samples": len(test_dataset),
            "eval_time_seconds": round(eval_time, 2),
            "eval_samples_per_second": round(len(test_dataset) / eval_time, 2)
        })

        # Print detailed results
        print("\nüìà Evaluation Results:")
        for metric, value in result.items():
            print(f"  ‚Ä¢ {metric}: {value:.4f}")

        return result

    except Exception as e:
        print(f"‚ùå Evaluation failed: {str(e)}")
        raise

# Run evaluation
evaluation_results = evaluate_model(
    trainer=trainer,
    test_path=test_data_path,
    download_data=download_data,
    preprocess_data=preprocess_data,
    tokenizer=tokenizer
)

# Store test accuracy
test_accuracy = evaluation_results['test_exact_match']
print(test_accuracy)