Source code for tfts.models.transformer

"""
`Attention Is All You Need
<https://arxiv.org/abs/1706.03762>`_
"""

import logging
from typing import Dict, List, Optional, Tuple

import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Dense, Dropout, LayerNormalization, MultiHeadAttention

from tfts.layers.attention_layer import Attention, SelfAttention
from tfts.layers.dense_layer import FeedForwardNetwork
from tfts.layers.embed_layer import DataEmbedding
from tfts.layers.mask_layer import CausalMask

from .base import BaseConfig, BaseModel

logger = logging.getLogger(__name__)


[docs] class TransformerConfig(BaseConfig): model_type: str = "transformer" def __init__( self, hidden_size: int = 256, num_layers: int = 2, num_decoder_layers: int = 4, num_attention_heads: int = 4, num_kv_heads: int = 4, ffn_intermediate_size: int = 256, hidden_act: str = "gelu", hidden_dropout_prob: float = 0.0, attention_probs_dropout_prob: float = 0.0, scheduled_sampling: float = 1, max_position_embeddings: int = 512, initializer_range: float = 0.02, positional_type: str = "positional encoding", use_cache: bool = True, classifier_dropout: Optional[float] = None, layer_norm_eps: float = 1e-12, pad_token_id: int = 0, **kwargs: Dict[str, object] ) -> None: """ Initializes the configuration for the Transformer model with the specified parameters. Args: hidden_size: The size of the hidden layers. num_layers: The number of encoder layers. num_decoder_layers: The number of decoder layers. num_attention_heads: The number of attention heads. num_kv_heads: The number of key-value heads. ffn_intermediate_size: The size of the intermediate feed-forward layers. hidden_act: The activation function for hidden layers. hidden_dropout_prob: The dropout probability for hidden layers. attention_probs_dropout_prob: The dropout probability for attention probabilities. scheduled_sampling: Controls the use of teacher forcing vs. last prediction. max_position_embeddings: The maximum length of input sequences. initializer_range: The standard deviation for weight initialization. layer_norm_eps: The epsilon for layer normalization. pad_token_id: The ID for the padding token. positional_type: The type of position embeddings (absolute or relative). use_cache: Whether to use cache during inference. classifier_dropout: Dropout rate for classifier layers. **kwargs: Additional parameters for further customization passed to the parent class. """ super(TransformerConfig, self).__init__() self.hidden_size: int = hidden_size self.num_layers: int = num_layers self.num_decoder_layers: int = num_decoder_layers if num_decoder_layers is not None else self.num_layers self.num_attention_heads: int = num_attention_heads self.num_kv_heads: int = num_kv_heads self.ffn_intermediate_size: int = ffn_intermediate_size self.hidden_act: str = hidden_act self.hidden_dropout_prob: float = hidden_dropout_prob self.attention_probs_dropout_prob: float = attention_probs_dropout_prob self.scheduled_sampling: float = scheduled_sampling self.max_position_embeddings: int = max_position_embeddings self.initializer_range: float = initializer_range self.positional_type: str = positional_type self.use_cache: bool = use_cache self.classifier_dropout: Optional[float] = classifier_dropout self.layer_norm_eps: float = layer_norm_eps self.pad_token_id: int = pad_token_id
[docs] class Transformer(BaseModel): """Transformer model""" def __init__(self, predict_sequence_length: int = 1, config: Optional[TransformerConfig] = None) -> None: """Transformer for time series""" super(Transformer, self).__init__() self.config = config or TransformerConfig() self.predict_sequence_length = predict_sequence_length self.encoder_embedding = DataEmbedding(self.config.hidden_size, positional_type=self.config.positional_type) self.encoder = Encoder( num_hidden_layers=self.config.num_layers, hidden_size=self.config.hidden_size, num_attention_heads=self.config.num_attention_heads, attention_probs_dropout_prob=self.config.attention_probs_dropout_prob, ffn_intermediate_size=self.config.ffn_intermediate_size, hidden_dropout_prob=self.config.hidden_dropout_prob, layer_norm_eps=self.config.layer_norm_eps, ) self.decoder = Decoder( predict_sequence_length=predict_sequence_length, num_decoder_layers=self.config.num_decoder_layers, hidden_size=self.config.hidden_size, num_attention_heads=self.config.num_attention_heads, attention_probs_dropout_prob=self.config.attention_probs_dropout_prob, ffn_intermediate_size=self.config.ffn_intermediate_size, hidden_dropout_prob=self.config.hidden_dropout_prob, layer_norm_eps=self.config.layer_norm_eps, ) def __call__( self, inputs: tf.Tensor, teacher: Optional[tf.Tensor] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, ): """Time series transformer Parameters ---------- inputs : tf.Tensor 3D tensor for batch * seq_len * features teacher : tf.Tensor, optional the teacher for decoding, by default None return_dict: bool if return output a dict Returns ------- tf.Tensor 3D tensor for output, batch * output_seq * 1 """ x, encoder_feature, decoder_feature = self._prepare_3d_inputs(inputs, ignore_decoder_inputs=False) encoder_feature = self.encoder_embedding(encoder_feature) # batch * seq * embedding_size memory = self.encoder(encoder_feature, mask=None) decoder_outputs = self.decoder( decoder_feature, init_input=x[:, -1:, 0:1], encoder_memory=memory, teacher=teacher ) # Example for new CausalMask usage: # dummy = tf.zeros((B, L, 1)) # mask_layer = CausalMask(num_attention_heads=1) # casual_mask = mask_layer(dummy) return decoder_outputs
[docs] class Encoder(tf.keras.layers.Layer): def __init__( self, num_hidden_layers: int, hidden_size: int, num_attention_heads: int, attention_probs_dropout_prob: float, ffn_intermediate_size: int, hidden_dropout_prob: float, layer_norm_eps: float = 1e-9, **kwargs ): super(Encoder, self).__init__(**kwargs) self.num_hidden_layers = num_hidden_layers self.hidden_size = hidden_size self.num_attention_heads = num_attention_heads self.attention_probs_dropout_prob = attention_probs_dropout_prob self.ffn_intermediate_size = ffn_intermediate_size self.hidden_dropout_prob = hidden_dropout_prob self.layer_norm_eps = layer_norm_eps self.layers: List[List[tf.keras.layers.Layer]] = [] def build(self, input_shape: Tuple[int]) -> None: for _ in range(self.num_hidden_layers): attention_layer = SelfAttention( self.hidden_size, self.num_attention_heads, self.attention_probs_dropout_prob ) ffn_layer = FeedForwardNetwork( self.hidden_size, intermediate_size=self.ffn_intermediate_size, hidden_dropout_prob=self.hidden_dropout_prob, ) ln_layer1 = LayerNormalization(epsilon=self.layer_norm_eps, dtype="float32") ln_layer2 = LayerNormalization(epsilon=self.layer_norm_eps, dtype="float32") self.layers.append([attention_layer, ln_layer1, ffn_layer, ln_layer2]) super(Encoder, self).build(input_shape)
[docs] def call(self, inputs: tf.Tensor, mask: Optional[tf.Tensor] = None): """Transformer encoder Parameters ---------- inputs : tf.Tensor Transformer encoder inputs, with dimension of (batch, seq_len, features) mask : tf.Tensor, optional encoder mask to ignore it during attention, by default None Returns ------- tf.Tensor Transformer encoder output """ x = inputs for _, layer in enumerate(self.layers): attention_layer, ln_layer1, ffn_layer, ln_layer2 = layer x = ln_layer1(x + attention_layer(x, mask=mask)) x = ln_layer2(x + ffn_layer(x)) return x
[docs] def get_config(self): config = super().get_config() config.update( { "num_hidden_layers": self.num_hidden_layers, "hidden_size": self.hidden_size, "num_attention_heads": self.num_attention_heads, "attention_probs_dropout_prob": self.attention_probs_dropout_prob, "ffn_intermediate_size": self.ffn_intermediate_size, "hidden_dropout_prob": self.hidden_dropout_prob, "layer_norm_eps": self.layer_norm_eps, } ) return config
def compute_output_shape(self, input_shape): return input_shape
[docs] class Decoder(tf.keras.layers.Layer): """Transformer Decoder that supports both one-time and distributed decoding strategies.""" def __init__( self, predict_sequence_length: int, num_decoder_layers: int, hidden_size: int, num_attention_heads: int, attention_probs_dropout_prob: float, ffn_intermediate_size: int, hidden_dropout_prob: float, layer_norm_eps: float = 1e-9, **kwargs ) -> None: super(Decoder, self).__init__(**kwargs) self.predict_sequence_length = predict_sequence_length self.num_decoder_layers = num_decoder_layers self.hidden_size = hidden_size self.num_attention_heads = num_attention_heads self.attention_probs_dropout_prob = attention_probs_dropout_prob self.ffn_intermediate_size = ffn_intermediate_size self.hidden_dropout_prob = hidden_dropout_prob self.layer_norm_eps = layer_norm_eps
[docs] def build(self, input_shape): """Build the decoder layers.""" super().build(input_shape) self.decoder_embedding = DataEmbedding(embed_size=self.hidden_size) self.decoder_layer = DecoderLayer( num_decoder_layers=self.num_decoder_layers, hidden_size=self.hidden_size, num_attention_heads=self.num_attention_heads, attention_probs_dropout_prob=self.attention_probs_dropout_prob, ffn_intermediate_size=self.ffn_intermediate_size, hidden_dropout_prob=self.hidden_dropout_prob, layer_norm_eps=self.layer_norm_eps, ) self.projection = Dense(units=1, name="final_projection") self.projection.build([input_shape[0], self.hidden_size]) self.built = True
[docs] def call( self, decoder_features: tf.Tensor, init_input: tf.Tensor, encoder_memory: tf.Tensor, teacher: Optional[tf.Tensor] = None, scheduled_sampling: float = 0.0, training: bool = False, **kwargs ): """Transformer decoder""" input_x = init_input if teacher is not None: teacher = tf.squeeze(teacher, 2) teachers = tf.split(teacher, self.predict_sequence_length, axis=1) else: teachers = None for i in range(self.predict_sequence_length): input_tensor = self._get_input_for_step( input_x, decoder_features, i, teachers, scheduled_sampling, training ) embed_input = self.decoder_embedding(input_tensor) decoder_output = self.decoder_layer(embed_input, encoder_memory) projected_output = self.projection(decoder_output) input_x = tf.concat([input_x, projected_output[:, -1:, :]], axis=1) return input_x[:, 1:] # Exclude the first token
def _get_input_for_step( self, input_x: tf.Tensor, decoder_features: tf.Tensor, step: int, teachers: Optional[tf.Tensor], scheduled_sampling: float, training: bool, ) -> tf.Tensor: """Determine the input for each decoding step, considering teacher forcing and scheduled sampling.""" if training: p = np.random.uniform(low=0, high=1) if teachers is not None and p > scheduled_sampling: this_input = teachers[step] else: this_input = input_x[:, : step + 1] else: this_input = input_x[:, : step + 1] if decoder_features is not None: input_tensor = tf.concat([this_input, decoder_features[:, : step + 1, :]], axis=-1) else: input_tensor = this_input return input_tensor
[docs] def get_causal_attention_mask(self, sequence_length: int) -> tf.Tensor: """Generate a causal attention mask to ensure each token only attends to previous tokens.""" i = tf.range(sequence_length)[:, tf.newaxis] j = tf.range(sequence_length) mask = tf.cast(i >= j, dtype="int32") return tf.reshape(mask, (1, sequence_length, sequence_length))
[docs] def get_config(self): config = super().get_config() config.update( { "predict_sequence_length": self.predict_sequence_length, "num_decoder_layers": self.num_decoder_layers, "hidden_size": self.hidden_size, "num_attention_heads": self.num_attention_heads, "attention_probs_dropout_prob": self.attention_probs_dropout_prob, "ffn_intermediate_size": self.ffn_intermediate_size, "hidden_dropout_prob": self.hidden_dropout_prob, "layer_norm_eps": self.layer_norm_eps, } ) return config
def compute_output_shape(self, input_shape): return input_shape
[docs] class DecoderLayer(tf.keras.layers.Layer): def __init__( self, num_decoder_layers: int, hidden_size: int, num_attention_heads: int, attention_probs_dropout_prob: float, ffn_intermediate_size: int, hidden_dropout_prob: float, layer_norm_eps: float = 1e-9, **kwargs ) -> None: super(DecoderLayer, self).__init__(**kwargs) self.num_decoder_layers = num_decoder_layers self.hidden_size = hidden_size self.num_attention_heads = num_attention_heads self.attention_probs_dropout_prob = attention_probs_dropout_prob self.ffn_intermediate_size = ffn_intermediate_size self.hidden_dropout_prob = hidden_dropout_prob self.layer_norm_eps = layer_norm_eps self.layers: List[List[tf.keras.layers.Layer]] = [] def build(self, input_shape): for _ in range(self.num_decoder_layers): self_attention_layer = SelfAttention( self.hidden_size, self.num_attention_heads, self.attention_probs_dropout_prob ) cross_attention_layer = Attention( self.hidden_size, self.num_attention_heads, self.attention_probs_dropout_prob ) ffn_layer = FeedForwardNetwork(self.ffn_intermediate_size, self.hidden_size, self.hidden_dropout_prob) ln_layer1 = LayerNormalization(epsilon=self.layer_norm_eps, dtype="float32") ln_layer2 = LayerNormalization(epsilon=self.layer_norm_eps, dtype="float32") ln_layer3 = LayerNormalization(epsilon=self.layer_norm_eps, dtype="float32") self.layers.append( [self_attention_layer, cross_attention_layer, ffn_layer, ln_layer1, ln_layer2, ln_layer3] ) super(DecoderLayer, self).build(input_shape)
[docs] def call( self, decoder_inputs: tf.Tensor, encoder_memory: tf.Tensor, tgt_mask: Optional[tf.Tensor] = None, cross_mask: Optional[tf.Tensor] = None, ) -> tf.Tensor: """Forward pass through the decoder layer.""" x = decoder_inputs for self_attention_layer, attention_layer, ffn_layer, ln_layer1, ln_layer2, ln_layer3 in self.layers: x = ln_layer1(x + self_attention_layer(x, mask=tgt_mask)) x = ln_layer2(x + attention_layer(x, encoder_memory, encoder_memory, mask=cross_mask)) x = ln_layer3(x + ffn_layer(x)) return x
[docs] def get_config(self): config = { "n_decoder_layers": self.num_decoder_layers, "hidden_size": self.hidden_size, "num_attention_heads": self.num_attention_heads, "attention_probs_dropout_prob": self.attention_probs_dropout_prob, "ffn_intermediate_size": self.ffn_intermediate_size, "hidden_dropout_prob": self.hidden_dropout_prob, } base_config = super(DecoderLayer, self).get_config() return dict(list(base_config.items()) + list(config.items()))
def compute_output_shape(self, input_shape): return input_shape
[docs] class TransformerBlock(tf.keras.layers.Layer): """Basic Transformer block with attention and feed-forward layers.""" def __init__( self, embed_dim: int, feat_dim: int, num_heads: int, ffn_intermediate_size: int, rate: float = 0.1, layer_norm_eps: float = 1e-9, ) -> None: super(TransformerBlock, self).__init__() self.embed_dim = embed_dim self.feat_dim = feat_dim self.num_heads = num_heads self.ffn_intermediate_size = ffn_intermediate_size self.rate = rate self.layer_norm_eps = layer_norm_eps
[docs] def build(self, input_shape): """Build the Transformer block layers.""" super().build(input_shape) self.att = MultiHeadAttention(num_heads=self.num_heads, key_dim=self.embed_dim) self.ffn = tf.keras.Sequential([Dense(self.ffn_intermediate_size, activation="gelu"), Dense(self.feat_dim)]) self.layernorm1 = LayerNormalization(epsilon=self.layer_norm_eps) self.layernorm2 = LayerNormalization(epsilon=self.layer_norm_eps) self.dropout1 = Dropout(self.rate) self.dropout2 = Dropout(self.rate)
[docs] def call(self, inputs: tf.Tensor, training: bool) -> tf.Tensor: """Forward pass through a Transformer block for time series.""" attn_output = self.att(inputs, inputs) attn_output = self.dropout1(attn_output, training=training) out1 = self.layernorm1(inputs + attn_output) ffn_output = self.ffn(out1) ffn_output = self.dropout2(ffn_output, training=training) return self.layernorm2(out1 + ffn_output)