from functools import reduce
Sequence Classification ✅
1 Review of python’s reduce
#
# Aggregating sequences of numbers by summation
#
= [1, 2, 3, 4, 5]
xs
def update(total: int, x: int) -> int:
return total + x
reduce(update, xs, 0)
15
#
# Count total characters of a sequence of words
#
= ["hello", "how", "are", "u", "doing?"]
xs
def update(total: int, word: str) -> int:
return total + len(word)
reduce(update, xs, 0)
18
#
# Aggregate a sequence of words to their average number of characters per word
#
= ["hello", "how", "are", "u", "doing?"]
xs
def update(stat: (int, int), word: str) -> (int, int):
= stat
count, total return (count + 1, total + len(word))
= reduce(update, xs, (0, 0))
(total_words, total_chars) / total_words total_chars
3.6
2 Processing unbounded sequences of vectors
Suppose we have a sequence of vectors:
\[ x = (x_1, x_2, \dots, x_L) \] where each \(x_i\in\mathbb{R}^m\). Thus, \(x\in(\mathbb{R}^m)^*\).
We want to describe an aggregation function \(\mathbf{agg}:(\mathbb{R}^m)^*\to\mathbb{R}^n\)
The challenge is that \(\mathbf{agg}\) must be able to handle sequences of any length \(L\).
Auxiliary functions
Consider a state space \(\mathbb{R}^k\) that will be used to aggregate the sequence to a hidden state.
This can be done by introducing:
- an initial state \(s_0\in\mathbb{R}^k\).
- an update function \(h:\mathbb{R}^k \times \mathbb{R}^m\to\mathbb{R}^k\).
Note the update function \(h\) takes two inputs: current state and current input. \[ s_{i+1} = h(s_i, x_i) \]
Using the update function \(h\), we can reduce \(x\) to a final state:
\(s^* = \mathrm{reduce}(h, x, s_0)\)
Introduce an output function \(f:\mathbb{R}^k\to\mathbb{R}^n\) which maps the final state to the aggregation output.
\(\mathbf{agg}(x) = f(s^*)\)
3 Revisiting Python reduce
In the case of:
= ["hello", "how", "are", "u", "doing?"]
xs
def update(stat: (int, int), word: str) -> (int, int):
= stat
count, total return (count + 1, total + len(word))
= reduce(update, xs, (0, 0))
(total_words, total_chars) / total_words total_chars
We have the following:
- State space: \[\mathbb{N}^2\]
- Initial state: \[(0, 0)\]
- Update function: \[h(s, w) = (s[0]+1, s[1]+\mathrm{len}(w))\]
- Output function: \[f(s) = \frac{s[1]}{s[0]}\]
4 Learning sequence aggregations
In sequence learning, we need to model the aggregation as:
- \(h\): update function
- \(f\): output function
Both are just normal vector functions, and thus can be learned using MLPs.
The equations that describe the sequence function is given as:
\[ s_{i+1} = h(x_i, s_i) \mathrm{\ for\ } i\in 0\dots L-1 \]
\[ y = f(x, s_L) \]
where:
- \(s_0\) is the initial state.
- \(x = (x_0, x_1, \dots, x_L) \in{\mathbb{R}^m}^*\) is the sequence input.
- \(y\in\mathbb{R}^n\) is the output vector.
5 Preprocessing of IMDB Reviews
from importlib import reload
import mytext
import torch
from torchtext.data import get_tokenizer
from torch.utils.data import TensorDataset, DataLoader
reload(mytext);
= get_tokenizer('basic_english') tokenizer
= mytext.imdb_reviews(tokenizer) reviews, targets
= mytext.build_vocab(reviews) voc
= mytext.build_tensor(reviews, voc)
reviews_tensor reviews_tensor.shape
torch.Size([25000, 2633])
= reviews_tensor[:, :100]
inputs_tensor inputs_tensor.shape
torch.Size([25000, 100])
= torch.tensor(targets, dtype=torch.long)
targets_tensor targets_tensor.shape
torch.Size([25000])
= TensorDataset(inputs_tensor, targets_tensor)
dataset = DataLoader(dataset, batch_size=64, shuffle=True) dataloader
6 Recurrent Neural Network From Scratch
We model the update function as a fully connected layer.
from torch import nn
from torch.optim import Adam
import numpy as np
class MyRNN(nn.Module):
def __init__(self, voc_size, input_dim, state_dim, output_dim):
super().__init__()
self.voc_size = voc_size
self.input_dim = input_dim
self.state_dim = state_dim
self.output_dim = output_dim
self.embedding = nn.Embedding(voc_size, input_dim)
self.h = nn.Linear(input_dim + state_dim, state_dim)
self.f = nn.Linear(input_dim + state_dim, output_dim)
def forward(self, tokens):
# tokens: (-1, L)
= tokens.shape
(batch_size, L) = self.embedding(tokens) # (-1, L, input_dim)
x_seq = self.init_state(batch_size) # (-1, state_dim)
s for i in range(L):
= x_seq[:, i, :] # (-1, input_dim)
x = torch.cat([x, s], axis=1) # (-1, input_dim + state_dim)
combined = self.h(combined) # (-1, state_dim)
s
= self.f(
y 1)
torch.cat((x, s), # (-1, output_dim)
) return y, s
def init_state(self, batch_size):
return torch.zeros(batch_size, self.state_dim)
Let’s try the module on a single batch.
= next(iter(dataloader))
(tokens, targets)
print(tokens.shape)
print(targets.shape)
torch.Size([64, 100])
torch.Size([64])
= MyRNN(
rnn =len(voc),
voc_size=32,
input_dim=64,
state_dim=2,
output_dim )
= rnn(tokens)
(outputs, states) outputs.shape
torch.Size([64, 2])
7 Training
= 5
epochs = 32
input_dim = 64
state_dim = MyRNN(len(voc), input_dim, state_dim, 2)
model = nn.CrossEntropyLoss()
loss = Adam(model.parameters())
optimizer
for epoch in range(epochs):
= []
losses for (x, target) in dataloader:
= model(x)
y, _ = loss(y, target)
l
l.backward()
optimizer.step()
optimizer.zero_grad()
losses.append(l.item())= np.mean(losses)
l print("{}: loss={:.4f}".format(epoch, l))
0: loss=0.6980
1: loss=0.6911
2: loss=0.6856
3: loss=0.6875
4: loss=0.6696
#
# Evaluate
#
from torchmetrics import Accuracy
with torch.no_grad():
= 0
success = 0
total for x, target in dataloader:
= model(x)
y, _ = y.argmax(axis=1)
pred += (pred == target).sum()
success += target.shape[0]
total print("Accuracy = {:.2f}".format(success/total))
Accuracy = 0.60