# Test attention mask expansion mask_2d = torch.tensor([[0, 0, 1, 1]]) # batch=1, key_len=4 expanded = patch_attention_mask(mask_2d, query_len=3, key_len=4, dtype=torch.float32) print(f"Expanded mask shape: expanded.shape") # (1,1,3,4) print(expanded) | Issue | Before patch | After patch | |-------|--------------|--------------| | Rotary cache | Recomputes every call, wastes memory | Only recomputes when seqlen changes | | Mask expansion | Only supports 2D masks | Supports 2D/3D/4D, correct broadcast | | Cross-attention | Mask shape mismatch | Proper (batch,1,q_len,k_len) | If you meant a firmware patch for an MPT controller (like in automotive or industrial PLCs), I can write a .bin patching script in Python or C. Just clarify the target.
batch = attention_mask.size(0)
def forward(self, x: torch.Tensor, seq_len: int) -> Tuple[torch.Tensor, torch.Tensor]: self._update_cache(seq_len, x.device, x.dtype) return self._cached_cos[:seq_len], self._cached_sin[:seq_len] 2. Patch Attention Mask Expansion (for cross-attention) ---------------------------------------------------------------------- def patch_attention_mask( attention_mask: torch.Tensor, query_length: int, key_length: int, dtype: torch.dtype, ) -> torch.Tensor: """ Expand mask from (batch, 1, key_len) or (batch, seq_len) to (batch, 1, query_len, key_len) for MPT attention. """ if attention_mask is None: return None patch mpt
class PatchedRotaryEmbedding(nn.Module): """Rotary embedding with cache reset on seqlen change.""" def (self, dim: int, max_seq_len: int = 2048, base: int = 10000): super(). init () self.dim = dim self.max_seq_len = max_seq_len self.base = base self._cached_cos = None self._cached_sin = None self._cached_seq_len = None
# Case: (batch, 1, key_len) elif attention_mask.dim() == 3 and attention_mask.size(1) == 1: mask = attention_mask[:, :, None, :] else: raise ValueError(f"Unexpected mask shape: attention_mask.shape") # Test attention mask expansion mask_2d = torch
# If already 4D, assume correct if attention_mask.dim() == 4: return attention_mask.to(dtype)
def _update_cache(self, seq_len: int, device: torch.device, dtype: torch.dtype): if seq_len == self._cached_seq_len: return inv_freq = 1.0 / (self.base ** (torch.arange(0, self.dim, 2).float() / self.dim)) t = torch.arange(seq_len, device=device, dtype=inv_freq.dtype) freqs = torch.einsum("i,j->ij", t, inv_freq) emb = torch.cat((freqs, freqs), dim=-1) self._cached_cos = emb.cos().to(dtype) self._cached_sin = emb.sin().to(dtype) self._cached_seq_len = seq_len torch.Tensor: """ Expand mask from (batch
# Broadcast to query_len mask = mask.expand(batch, 1, query_length, key_length)