95 lines
2.7 KiB
Python
95 lines
2.7 KiB
Python
import json
|
|
import uuid
|
|
from dataclasses import dataclass
|
|
from typing import Optional
|
|
|
|
from secure_sms.core.security import b64u_decode, b64u_encode
|
|
|
|
|
|
FRAME_PREFIX = "@SSM1"
|
|
FRAME_CHUNK_SIZE = 92
|
|
|
|
|
|
@dataclass
|
|
class ParsedFrame:
|
|
category: str
|
|
packet_id: str
|
|
part_no: int
|
|
total_parts: int
|
|
chunk: str
|
|
mode: Optional[str] = None
|
|
|
|
|
|
def _split_payload(encoded_payload: str) -> list[str]:
|
|
return [
|
|
encoded_payload[index:index + FRAME_CHUNK_SIZE]
|
|
for index in range(0, len(encoded_payload), FRAME_CHUNK_SIZE)
|
|
] or [""]
|
|
|
|
|
|
def encode_plain_body(text: str) -> str:
|
|
return b64u_encode(text.encode("utf-8"))
|
|
|
|
|
|
def decode_plain_body(encoded_text: str) -> str:
|
|
return b64u_decode(encoded_text).decode("utf-8")
|
|
|
|
|
|
def encode_control_payload(payload: dict) -> str:
|
|
packed = json.dumps(payload, ensure_ascii=False, separators=(",", ":")).encode("utf-8")
|
|
return b64u_encode(packed)
|
|
|
|
|
|
def decode_control_payload(encoded_payload: str) -> dict:
|
|
return json.loads(b64u_decode(encoded_payload).decode("utf-8"))
|
|
|
|
|
|
def build_control_frames(payload: dict, packet_id: Optional[str] = None) -> list[str]:
|
|
packet_id = packet_id or uuid.uuid4().hex[:10]
|
|
encoded_payload = encode_control_payload(payload)
|
|
parts = _split_payload(encoded_payload)
|
|
return [
|
|
f"{FRAME_PREFIX}|CTL|{packet_id}|{index + 1}|{len(parts)}|{chunk}"
|
|
for index, chunk in enumerate(parts)
|
|
]
|
|
|
|
|
|
def build_message_frames(mode: str, encoded_payload: str, packet_id: Optional[str] = None) -> list[str]:
|
|
packet_id = packet_id or uuid.uuid4().hex[:10]
|
|
parts = _split_payload(encoded_payload)
|
|
return [
|
|
f"{FRAME_PREFIX}|MSG|{mode}|{packet_id}|{index + 1}|{len(parts)}|{chunk}"
|
|
for index, chunk in enumerate(parts)
|
|
]
|
|
|
|
|
|
def parse_frame(raw_text: str) -> Optional[ParsedFrame]:
|
|
if not raw_text.startswith(FRAME_PREFIX):
|
|
return None
|
|
if raw_text.startswith(f"{FRAME_PREFIX}|CTL|"):
|
|
parts = raw_text.split("|", 5)
|
|
if len(parts) != 6:
|
|
return None
|
|
_, _, packet_id, part_no, total_parts, chunk = parts
|
|
return ParsedFrame(
|
|
category="control",
|
|
packet_id=packet_id,
|
|
part_no=int(part_no),
|
|
total_parts=int(total_parts),
|
|
chunk=chunk,
|
|
)
|
|
if raw_text.startswith(f"{FRAME_PREFIX}|MSG|"):
|
|
parts = raw_text.split("|", 6)
|
|
if len(parts) != 7:
|
|
return None
|
|
_, _, mode, packet_id, part_no, total_parts, chunk = parts
|
|
return ParsedFrame(
|
|
category="message",
|
|
mode=mode,
|
|
packet_id=packet_id,
|
|
part_no=int(part_no),
|
|
total_parts=int(total_parts),
|
|
chunk=chunk,
|
|
)
|
|
return None
|