from typing import Any, Generic, List, Optional, Sequence, Type, TypeVar, Union
from jam.utils.codec.codable import Codable
from jam.utils.codec.codec import Codec
from jam.utils.json import JsonSerde
from jam.utils.json.serde import JsonDeserializationError
T = TypeVar("T", bound=Codable)
[docs]
class BaseSequence(Codable[Sequence[T]], Sequence[T], JsonSerde, Generic[T]):
"""
Base class for sequence types.
Provides common functionality for sequence types that support codec operations.
All elements must be instances of the same Codable type.
"""
_element_type: Optional[Type[T]] = None
value: List[T]
[docs]
def __init__(self, initial: Sequence[T] = [], codec: Optional[Codec] = None):
"""
Initialize sequence.
Args:
initial: Initial values
codec: Optional codec
"""
# Make sure initial values are all of the same type
for value in initial:
self._validate_value(value)
self.value = initial
super().__init__(codec=codec)
[docs]
def __len__(self) -> int:
"""Get number of elements."""
return len(self.value)
[docs]
def __getitem__(self, index: Union[int, slice]) -> Union[T, Sequence[T]]:
"""Get item at index."""
return self.value[index]
[docs]
def __iter__(self):
"""Iterate over elements."""
return iter(self.value)
[docs]
def __repr__(self) -> str:
"""Get string representation."""
return f"{self.__class__.__name__}([{', '.join(f'{value!r}' for value in self.value)}])"
[docs]
def _validate_value(self, value: T) -> None:
"""
Validate that a value is of the correct type.
Args:
value: Value to validate
Raises:
TypeError: If value is not of the correct type
"""
if self._element_type is None:
self._element_type = type(value)
elif not str(type(value)) == str(self._element_type):
raise TypeError(
f"Value {value} must be instance of {self._element_type}. Debug: {str(type(value)) == str(self._element_type)}"
)
[docs]
def __eq__(self, other: object) -> bool:
"""Compare for equality."""
if isinstance(other, BaseSequence):
if len(self) == 0 and len(other) == 0:
return True
return (
self._element_type == other._element_type and self.value == other.value
)
if isinstance(other, list) or isinstance(other, tuple):
return all(x == y for x, y in zip(self.value, other))
return False
[docs]
def __gt__(self, other: object) -> bool:
"""Compare for greater than."""
if isinstance(other, BaseSequence):
return self.value > other.value
return False
[docs]
def __lt__(self, other: object) -> bool:
"""Compare for less than."""
if isinstance(other, BaseSequence):
return self.value < other.value
return False
[docs]
def __ge__(self, other: object) -> bool:
"""Compare for greater than or equal to."""
return self > other or self == other
[docs]
def __le__(self, other: object) -> bool:
"""Compare for less than or equal to."""
if isinstance(other, BaseSequence):
return self < other or self == other
return False
@property
def element_type(self) -> Optional[Type[T]]:
"""Get the type of elements in this sequence."""
return self._element_type
[docs]
def __setitem__(self, index: int, value: T) -> None:
"""Set item at index."""
if not 0 <= index < len(self.value):
raise IndexError(f"Index {index} out of range")
self._validate_value(value)
self.value[index] = value
[docs]
def append(self, value: T) -> None:
"""
Append value to end of vector.
Args:
value: Value to append. Must be instance of the same type as other elements.
Raises:
TypeError: If value is not of the correct type
"""
self._validate_value(value)
self.value.append(value)
[docs]
def pop(self, index: int = -1) -> T:
"""
Remove and return item at index.
Args:
index: Index of item to remove
Returns:
Removed item
Raises:
IndexError: If index out of range
"""
return self.value.pop(index)
[docs]
def insert(self, index: int, value: T) -> None:
"""
Insert value at index.
Args:
index: Index to insert at
value: Value to insert. Must be instance of the same type as other elements.
Raises:
TypeError: If value is not of the correct type
"""
self._validate_value(value)
self.value.insert(index, value)
[docs]
def remove(self, value: T) -> None:
"""
Remove first occurrence of value.
Args:
value: Value to remove
Raises:
ValueError: If value not found
"""
self.value.remove(value)
[docs]
def clear(self) -> None:
"""Clear all elements."""
self.value.clear()
self._element_type = None
[docs]
def count(self, value: T) -> int:
"""
Return number of occurrences of value.
Args:
value: Value to count
Returns:
Number of occurrences
"""
return self.value.count(value)
[docs]
def index(self, value: T, start: int = 0, stop: Optional[int] = None) -> int:
"""
Return first index of value.
Args:
value: Value to find
start: Start index for search
stop: Stop index for search
Returns:
Index of value
Raises:
ValueError: If value not found
"""
if stop is None:
stop = len(self)
return self.value.index(value, start, stop)
[docs]
def reverse(self) -> "BaseSequence[T]":
"""Reverse the vector in place."""
self.value = self.value.reverse()
return self
[docs]
def extend(self, values: Sequence[T]) -> None:
"""
Extend vector with values.
Args:
values: Values to add. Must all be instances of the same type as existing elements.
Raises:
TypeError: If values are not all of the correct type
"""
for value in values:
self.append(value)
def __bytes__(self) -> bytes:
# Combine bytes of all values in the vector
return b"".join(bytes(value) for value in self.value)
[docs]
def __add__(
self, other: Union["BaseSequence[T]", Sequence[T]]
) -> "BaseSequence[T]":
"""Add two sequences together, returning a new sequence."""
if isinstance(other, BaseSequence):
other_values = other.value
else:
other_values = other
new_sequence = self.__class__(self.value.copy())
new_sequence.extend(other_values)
return new_sequence
[docs]
def __iadd__(
self, other: Union["BaseSequence[T]", Sequence[T]]
) -> "BaseSequence[T]":
"""In-place addition of sequences."""
if isinstance(other, BaseSequence):
self.extend(other.value)
else:
self.extend(other)
return self
[docs]
def __mul__(self, n: int) -> "BaseSequence[T]":
"""Multiply sequence by an integer, returning a new sequence."""
if not isinstance(n, int):
raise TypeError("Can only multiply sequence by an integer")
new_sequence = self.__class__(self.value * n, codec=self.codec)
return new_sequence
[docs]
def __imul__(self, n: int) -> "BaseSequence[T]":
"""In-place multiplication of sequence."""
if not isinstance(n, int):
raise TypeError("Can only multiply sequence by an integer")
self.value *= n
return self
[docs]
def __contains__(self, item: T) -> bool:
"""Check if item is in sequence."""
return item in self.value
[docs]
def copy(self) -> "BaseSequence[T]":
"""Return a shallow copy of the sequence."""
return self.__class__(self.value.copy(), codec=self.codec)
[docs]
def sort(self, *, key=None, reverse=False) -> None:
"""
Sort the sequence in place.
Args:
key: Function of one argument that is used to extract a comparison key
reverse: If True, sort in descending order
"""
self.value.sort(key=key, reverse=reverse)
[docs]
def __reversed__(self):
"""Return a reverse iterator over the sequence."""
return reversed(self.value)
[docs]
def __delitem__(self, index: Union[int, slice]) -> None:
"""Delete item at index."""
del self.value[index]
[docs]
def __rmul__(self, n: int) -> "BaseSequence[T]":
"""Right multiplication (n * sequence)."""
return self.__mul__(n)
[docs]
def __hash__(self) -> None:
"""Sequences are mutable, so they should not be hashable."""
from hashlib import blake2b
return int.from_bytes(blake2b(bytes(self)).digest())
[docs]
def startswith(self, prefix: Sequence[T]) -> bool:
"""Check if sequence starts with prefix."""
return self.value[: len(prefix)] == prefix
[docs]
def endswith(self, suffix: Sequence[T]) -> bool:
"""Check if sequence ends with suffix."""
return self.value[-len(suffix) :] == suffix
[docs]
@classmethod
def from_json(cls, data: Any) -> "BaseSequence[T]":
"""Deserialize from JSON."""
if not isinstance(data, list):
raise JsonDeserializationError(
f"Expected list for {cls.__name__}, got {type(data)}"
)
return cls([cls._element_type.from_json(item) for item in data])