127 lines
4.2 KiB
Python
127 lines
4.2 KiB
Python
"""
|
|
Secure SMS Protocol v3 (Simplified Symmetric)
|
|
|
|
Message Types:
|
|
@S:NORM| — Switch to normal mode
|
|
@S:SFRA|<pkt_id>|<part>|<total>|<data> — Fragment of multi-part message (symmetric)
|
|
@S:SYM|<encrypted_payload> — Single encrypted message (symmetric)
|
|
@S:NACK|<pkt_id>|<missing_part> — Retransmission request
|
|
(no prefix) — Plain text message (normal mode)
|
|
"""
|
|
import uuid
|
|
from dataclasses import dataclass
|
|
from typing import Optional
|
|
|
|
PREFIX = "@S:"
|
|
G_PREFIX = "@G:"
|
|
|
|
# Maximum SMS body size in characters
|
|
MAX_SMS_CHARS = 140
|
|
FRAG_OVERHEAD = 25
|
|
FRAG_CHUNK_SIZE = MAX_SMS_CHARS - FRAG_OVERHEAD
|
|
|
|
|
|
@dataclass
|
|
class ParsedMessage:
|
|
"""Result of parsing an incoming SMS."""
|
|
msg_type: str # "norm", "sym", "sfra", "nack", "plain", "gsym"
|
|
is_group: bool = False
|
|
encrypted_payload: Optional[str] = None
|
|
packet_id: Optional[str] = None
|
|
part_no: Optional[int] = None
|
|
total_parts: Optional[int] = None
|
|
chunk: Optional[str] = None
|
|
missing_part: Optional[int] = None
|
|
plain_text: Optional[str] = None
|
|
|
|
|
|
# ── Builders ────────────────────────────────────────────────────────
|
|
|
|
def build_normal_mode() -> str:
|
|
return f"{PREFIX}NORM|"
|
|
|
|
|
|
def build_symmetric_msg(encrypted_payload: str) -> list[str]:
|
|
"""Build one or more SMS for a symmetrically encrypted payload."""
|
|
full = f"{PREFIX}SYM|{encrypted_payload}"
|
|
if len(full) <= MAX_SMS_CHARS:
|
|
return [full]
|
|
return build_fragments(encrypted_payload)
|
|
|
|
|
|
def build_group_msg(encrypted_payload: str) -> str:
|
|
"""Build a single SMS for a group encrypted payload."""
|
|
return f"{G_PREFIX}SYM|{encrypted_payload}"
|
|
|
|
|
|
def build_fragments(payload: str, packet_id: Optional[str] = None) -> list[str]:
|
|
"""Split a payload into numbered fragments (always symmetric in v3)."""
|
|
pkt_id = packet_id or uuid.uuid4().hex[:10]
|
|
prefix = f"{PREFIX}SFRA"
|
|
chunks = [
|
|
payload[i:i + FRAG_CHUNK_SIZE]
|
|
for i in range(0, len(payload), FRAG_CHUNK_SIZE)
|
|
] or [""]
|
|
total = len(chunks)
|
|
return [
|
|
f"{prefix}|{pkt_id}|{idx + 1}|{total}|{chunk}"
|
|
for idx, chunk in enumerate(chunks)
|
|
]
|
|
|
|
|
|
def build_nack(packet_id: str, missing_part: int) -> str:
|
|
return f"{PREFIX}NACK|{packet_id}|{missing_part}"
|
|
|
|
|
|
# ── Parser ──────────────────────────────────────────────────────────
|
|
|
|
def parse_incoming(raw_text: str) -> ParsedMessage:
|
|
"""Parse an incoming SMS and determine its type."""
|
|
is_group = raw_text.startswith(G_PREFIX)
|
|
if not raw_text.startswith(PREFIX) and not is_group:
|
|
return ParsedMessage(msg_type="plain", plain_text=raw_text)
|
|
|
|
# Strip prefix
|
|
prefix_len = len(G_PREFIX) if is_group else len(PREFIX)
|
|
body = raw_text[prefix_len:]
|
|
|
|
if body.startswith("NORM|"):
|
|
return ParsedMessage(msg_type="norm", is_group=is_group)
|
|
|
|
elif body.startswith("SYM|"):
|
|
return ParsedMessage(
|
|
msg_type="sym" if not is_group else "gsym",
|
|
is_group=is_group,
|
|
encrypted_payload=body[4:],
|
|
)
|
|
|
|
elif body.startswith("SFRA|"):
|
|
parts = body[5:].split("|", 3)
|
|
if len(parts) == 4:
|
|
try:
|
|
return ParsedMessage(
|
|
msg_type="sfra",
|
|
is_group=is_group,
|
|
packet_id=parts[0],
|
|
part_no=int(parts[1]),
|
|
total_parts=int(parts[2]),
|
|
chunk=parts[3],
|
|
)
|
|
except ValueError:
|
|
pass
|
|
|
|
elif body.startswith("NACK|"):
|
|
parts = body[5:].split("|", 1)
|
|
if len(parts) == 2:
|
|
try:
|
|
return ParsedMessage(
|
|
msg_type="nack",
|
|
is_group=is_group,
|
|
packet_id=parts[0],
|
|
missing_part=int(parts[1]),
|
|
)
|
|
except ValueError:
|
|
pass
|
|
|
|
return ParsedMessage(msg_type="plain", plain_text=raw_text)
|