Source code for jam.utils.codec.composite.arrays

"""
Array codec implementation for JAM specification.

Arrays are encoded by concatenating their encoded elements with no length prefix.
The length is known at compile-time in Rust; in Python we enforce it at runtime.
Maximum array size is 1000 elements as per specification.
"""

from typing import Iterable, List, Tuple, Union, Sequence

from jam.utils.codec.codec import Codec
from jam.utils.codec.errors import EncodeError, DecodeError
from jam.utils.codec.utils import check_buffer_size
from jam.utils.codec.codable import Codable


[docs] class ArrayCodec(Codec[Sequence[Codable]]): """ Codec for fixed-length arrays/sequences. Arrays are encoded by concatenating their encoded elements in order. The length is fixed and known at encoding/decoding time. """ MAX_SIZE = 1000 length: int
[docs] def __init__(self, length: int): if length > self.MAX_SIZE: raise ValueError( f"Array length {length} exceeds maximum allowed size {self.MAX_SIZE}" ) if length < 0: raise ValueError(f"Array length cannot be negative: {length}") self.length = length
[docs] def encode_size(self, value: Sequence[Codable]) -> int: if len(value) != self.length: raise EncodeError( self.length, len(value), f"Array length mismatch: expected {self.length}, got {len(value)}", ) if not isinstance(value, Iterable): raise EncodeError(0, 0, f"Expected Iterable, got {type(value)}") size = 0 for item in value: if not isinstance(item, Codable): raise EncodeError(0, 0, f"Expected Codable, got {type(item)}") size += item.encode_size() return size
[docs] def encode_into( self, value: Sequence[Codable], buffer: bytearray, offset: int = 0 ) -> int: if len(value) != self.length: raise EncodeError( self.length, len(value), f"Array length mismatch: expected {self.length}, got {len(value)}", ) total_size = self.encode_size(value) check_buffer_size(buffer, total_size, offset) current_offset = offset for item in value: written = item.encode_into(buffer, current_offset) current_offset += written return current_offset - offset
[docs] @staticmethod def decode_from( length: int, codable_class: type[Codable], buffer: Union[bytes, bytearray, memoryview], offset: int = 0, ) -> Tuple[List, int]: result = [] current_offset = offset bytes_read = 0 try: for _ in range(length): item, size = codable_class.decode_from(buffer, current_offset) result.append(item) current_offset += size bytes_read += size except DecodeError as e: raise DecodeError( 0, 0, f"Failed to decode array element {len(result)}: {str(e)}" ) return result, bytes_read