Source code for jam.utils.codec.primitives.integers

"""
Integer codec implementations for JAM protocol encoding specification.

Implements both fixed-width integers and the general variable-length integer 
encoding scheme specified in JAM graypaper Appendix C.

Fixed width integers are encoded in little-endian format.
Variable length integers use the following scheme:
- 0x00-0xFC: Direct value (1 byte)
- 0xFD: u16 value (3 bytes) 
- 0xFE: u24 value (4 bytes)
- 0xFF: u32 value (5 bytes)
"""

from typing import Union, Tuple
from jam.utils.codec.codec import Codec
from jam.utils.codec.errors import EncodeError, DecodeError
from jam.utils.codec.utils import check_buffer_size, ensure_size

import math
from decimal import Decimal


[docs] def encode(value: int, byte_size: int) -> bytes: return value.to_bytes(byte_size, "little", signed=False)
[docs] def decode(buffer: Union[bytes, bytearray, memoryview]) -> int: return int.from_bytes(buffer, "little", signed=False)
[docs] class IntegerCodec(Codec): """ Base codec for fixed-width integers. Encodes integers in little-endian format with fixed width. Supports both signed and unsigned values. """
[docs] def __init__(self, byte_size: int): """ Initialize codec for specific integer type. Args: byte_size: Number of bytes for encoded value python_type: Python type for values """ self.byte_size = byte_size
[docs] def encode_size(self, value) -> int: """Get encoded size (fixed for given type).""" return self.byte_size
[docs] def encode_into( self, value: int, buffer: Union[bytes, bytearray, memoryview], offset: int = 0 ) -> int: """ Encode integer into buffer. Args: value: Integer to encode buffer: Target buffer offset: Starting offset Returns: Number of bytes written Raises: EncodeError: If value out of bounds or buffer too small """ if not 0 <= value < (2 ** (8 * self.byte_size)): raise EncodeError( expected=0, actual=value, message="Integer value out of bounds" ) check_buffer_size(buffer, self.byte_size, offset) encoded_bytes = encode(value, self.byte_size) if isinstance(buffer, bytes): buffer = bytearray(buffer) buffer[offset : offset + self.byte_size] = encoded_bytes return self.byte_size
[docs] @staticmethod def decode_from( _byte_size: int, buffer: Union[bytes, bytearray, memoryview], offset: int = 0 ) -> Tuple[int, int]: """ Decode integer from buffer. Args: buffer: Source buffer offset: Starting offset context: Optional decoding context Returns: Tuple of (decoded value, bytes read) Raises: DecodeError: If buffer too small """ ensure_size(buffer, _byte_size, offset) value = int.from_bytes(buffer[offset : offset + _byte_size], "little") return value, _byte_size
[docs] class GeneralCodec(Codec[int]): """ Codec for variable-length integer encoding. Implements variable length int encoding scheme """
[docs] @staticmethod def l(x): return math.floor(Decimal(x).ln() / (Decimal(7) * Decimal(2).ln()))
[docs] def encode_size(self, value: int) -> int: """Calculate encoded size based on value magnitude.""" if not isinstance(value, int): raise EncodeError( expected=int, actual=type(value), message="Value must be an integer" ) if value < 0: raise EncodeError( expected=0, actual=value, message="Cannot encode negative values" ) if value < 2**7: return 1 elif value < 2 ** (7 * 9): return 1 + self.l(value) elif value < 2**64: return 9 else: raise EncodeError( expected=0, actual=value, message="Value too large for encoding" )
[docs] def encode_into(self, value: int, buffer: bytearray, offset: int = 0) -> int: """ Encode integer using variable-length scheme. Args: value: Integer to encode buffer: Target buffer offset: Starting offset Returns: Number of bytes written Raises: EncodeError: If value invalid or buffer too small """ if not isinstance(value, int): raise EncodeError( expected=int, actual=type(value), message="Value must be an integer" ) if value < 0: raise EncodeError( expected=0, actual=value, message="Cannot encode negative values" ) if value < 2**7: buffer[offset] = value return 1 size = self.encode_size(value) check_buffer_size(buffer, size, offset) if value < 2 ** (7 * 8): _l = self.l(value) buffer[offset : offset + 1] = IntegerCodec(1).encode( 2**8 - 2 ** (8 - _l) + math.floor(Decimal(value) / (Decimal(2) ** (_l * 8))) ) offset += 1 buffer[offset : offset + _l] = IntegerCodec(_l).encode( value % 2 ** (_l * 8) ) elif value < 2**64: buffer[offset : offset + 1] = IntegerCodec(1).encode(2**8 - 1) offset += 1 buffer[offset : offset + 8] = IntegerCodec(8).encode(value) else: raise EncodeError( expected=0, actual=value, message="Value too large for encoding" ) return size
[docs] @staticmethod def decode_from( buffer: Union[bytes, bytearray, memoryview], offset: int = 0 ) -> Tuple[int, int]: """ Decode integer using variable-length scheme. Args: buffer: Source buffer offset: Starting offset Returns: Tuple of (decoded value, bytes read) Raises: DecodeError: If buffer too small or invalid encoding """ ensure_size(buffer, 1, offset) tag = buffer[offset] if tag < 2**7: return tag, 1 if tag == 2**8 - 1: # Full 64-bit encoding ensure_size(buffer, 9, offset) value, _ = IntegerCodec.decode_from(8, buffer, offset + 1) return value, 9 else: # Variable length encoding l = math.floor( Decimal(8) - (Decimal(2**8) - Decimal(tag)).ln() / Decimal(2).ln() ) ensure_size(buffer, l + 1, offset) alpha = tag + 2 ** (8 - l) - 2**8 beta, _ = IntegerCodec.decode_from(l, buffer, offset + 1) return alpha * 2 ** (l * 8) + beta, l + 1