Saba-python/secure_sms/infrastructure/gsm.py
2026-03-27 19:20:38 +03:30

344 lines
12 KiB
Python

import abc
import re
import threading
import time
from typing import Callable, Optional
import serial
_TERMINAL_STATUS_RE = re.compile(r"(?:^|\r?\n)(OK|ERROR)\r?\n?\s*$", re.DOTALL)
_PROTOCOL_ALLOWED_RE = re.compile(r"[^A-Za-z0-9@:\|\-_=+/]")
def _has_non_ascii(text: str) -> bool:
"""Check if text contains non-ASCII characters."""
try:
text.encode("ascii")
return False
except UnicodeEncodeError:
return True
def _to_ucs2_hex(text: str) -> str:
"""Encode text as UCS2 hex using utf-16-be."""
return text.encode("utf-16-be").hex().upper()
def _decode_ucs2_hex(hex_str: str) -> str:
"""Decode UCS2 hex text and ignore junk after the valid prefix."""
try:
match = re.match(r"^([0-9A-Fa-f]{4})+", hex_str)
if not match:
return hex_str
valid_hex = match.group(0)
return bytes.fromhex(valid_hex).decode("utf-16-be")
except Exception:
return hex_str
def _has_terminal_status(text: str) -> bool:
"""Only treat OK/ERROR as complete when it is the final modem status line."""
return bool(_TERMINAL_STATUS_RE.search(text))
def _sanitize_protocol_body(text: str) -> str:
"""Remove control bytes and modem junk from protocol SMS bodies."""
no_controls = re.sub(r"[\x00-\x1F\x7F]", "", text)
compact = "".join(no_controls.split())
if compact.startswith(("@S:", "@G:")):
return _PROTOCOL_ALLOWED_RE.sub("", compact)
return text
class IMessageGateway(abc.ABC):
@property
@abc.abstractmethod
def is_connected(self) -> bool:
pass
@abc.abstractmethod
def connect(self) -> bool:
pass
@abc.abstractmethod
def disconnect(self) -> None:
pass
@abc.abstractmethod
def send_frames(self, phone: str, frames: list[str]) -> bool:
pass
class GSMGateway(IMessageGateway):
def __init__(
self,
port: str,
baudrate: int,
message_callback: Optional[Callable[[str, str], None]] = None,
):
self.port = port
self.baudrate = baudrate
self.message_callback = message_callback
self.serial_conn = None
self.is_running = False
self.read_thread = None
self.poll_thread = None
self.lock = threading.Lock()
self.buffer_lock = threading.Lock()
self._unread_buffer: bytes = b""
self._processing_lock = threading.Lock()
self._processing_indexes: set[int] = set()
@property
def is_connected(self) -> bool:
return bool(self.serial_conn and self.serial_conn.is_open)
def connect(self) -> bool:
try:
print(f"[GSM] Connecting to port={self.port} baudrate={self.baudrate}")
self.serial_conn = serial.Serial(
port=self.port,
baudrate=self.baudrate,
timeout=0.1,
xonxoff=False,
rtscts=False,
dsrdtr=False,
)
self.is_running = True
self._send_raw("AT\r\n")
time.sleep(0.5)
self._send_raw("ATE0\r\n")
time.sleep(0.5)
self._send_raw("AT+CMGF=1\r\n")
time.sleep(0.5)
self._send_raw('AT+CPMS="ME","ME","ME"\r\n')
time.sleep(0.5)
self._send_raw("AT+CNMI=2,1,0,0,0\r\n")
time.sleep(0.5)
self._send_raw("AT+IFC=0,0\r\n")
time.sleep(0.5)
self.read_thread = threading.Thread(target=self._read_loop, daemon=True)
self.read_thread.start()
self.poll_thread = threading.Thread(target=self._poll_loop, daemon=True)
self.poll_thread.start()
return True
except Exception as exc:
print(f"[GSM] Connection failed: {exc}")
self.serial_conn = None
self.is_running = False
return False
def disconnect(self):
self.is_running = False
if self.serial_conn and self.serial_conn.is_open:
self.serial_conn.close()
if self.read_thread:
self.read_thread.join(timeout=1.5)
def _send_raw(self, cmd: str):
if self.serial_conn and self.serial_conn.is_open:
self.serial_conn.write(cmd.encode("utf-8"))
def _send_at_simple(self, command: str, wait: float = 2.0) -> str:
"""Send an AT command and wait for a complete modem response."""
with self.lock:
if not self.is_connected:
return ""
with self.buffer_lock:
self._unread_buffer = b""
print(f"[GSM] CMD: {command}")
self.serial_conn.write((command + "\r\n").encode("ascii", errors="ignore"))
start_time = time.time()
while (time.time() - start_time) < (wait + 5):
time.sleep(0.1)
with self.buffer_lock:
text = self._unread_buffer.decode("ascii", errors="replace")
if _has_terminal_status(text):
break
with self.buffer_lock:
final_resp = self._unread_buffer.decode("utf-8", errors="replace")
self._unread_buffer = b""
return final_resp
def send_at_cmd(
self,
command: str,
expected_response: str = "OK",
timeout: float = 2.0,
) -> tuple[bool, list[str]]:
resp = self._send_at_simple(command, wait=timeout)
lines = [line.strip() for line in resp.split("\n") if line.strip()]
return _has_terminal_status(resp) and expected_response in resp, lines
def send_frames(self, phone: str, frames: list[str]) -> bool:
if not self.is_connected:
return False
for frame in frames:
if not self._send_single_sms(phone, frame):
return False
time.sleep(1.0)
return True
def _send_single_sms(self, phone: str, body: str) -> bool:
with self.lock:
try:
needs_ucs2 = _has_non_ascii(body)
print(f"[GSM] Sending SMS to {phone}, ucs2={needs_ucs2}")
self.serial_conn.write(b'AT+CSCS="UCS2"\r\n' if needs_ucs2 else b'AT+CSCS="GSM"\r\n')
time.sleep(1)
self.serial_conn.write(b'AT+CSMP=17,167,0,8\r\n' if needs_ucs2 else b'AT+CSMP=17,167,0,0\r\n')
time.sleep(1)
self.serial_conn.write(b"AT+CMGF=1\r\n")
time.sleep(1)
target = _to_ucs2_hex(phone) if needs_ucs2 else phone
self.serial_conn.write(f'AT+CMGS="{target}"\r\n'.encode("utf-8"))
time.sleep(1.5)
payload = _to_ucs2_hex(body) if needs_ucs2 else body
self.serial_conn.write(payload.encode("utf-8"))
time.sleep(0.5)
self.serial_conn.write(bytes([26]))
time.sleep(8)
with self.buffer_lock:
resp = self._unread_buffer.decode("utf-8", errors="replace")
self._unread_buffer = b""
if _has_terminal_status(resp) and "OK" in resp:
print("[GSM] SMS sent successfully.")
return True
print(f"[GSM] SMS fail or uncertain. Response: {repr(resp)}")
return False
except Exception as exc:
print(f"[GSM] Exception during SMS: {exc}")
return False
def _read_loop(self):
"""Continuously read serial and react to unsolicited +CMTI notifications."""
while self.is_running:
try:
if self.is_connected and self.serial_conn.in_waiting > 0:
data = self.serial_conn.read(self.serial_conn.in_waiting)
if data:
with self.buffer_lock:
self._unread_buffer += data
try:
text = data.decode("ascii", errors="replace")
if "+CMTI:" in text:
match = re.search(r'\+CMTI:\s*".*?",(\d+)', text)
if match:
index = int(match.group(1))
print(f"[GSM] New SMS notification at index {index}")
threading.Thread(
target=self._process_incoming_sms,
args=(index,),
daemon=True,
).start()
except Exception:
pass
except Exception as exc:
if self.is_running:
print(f"[GSM] Read error: {exc}")
time.sleep(0.1)
def _poll_loop(self):
"""Fail-safe polling in case +CMTI notifications are missed."""
while self.is_running:
time.sleep(60)
if not self.is_connected:
continue
try:
success, lines = self.send_at_cmd('AT+CMGL="ALL"', timeout=5)
if success:
for line in lines:
if line.startswith("+CMGL:"):
parts = line.split(",")
if len(parts) >= 1:
try:
index_str = parts[0].split(":")[1].strip()
index = int(index_str)
print(f"[GSM] Found message during polling at index {index}")
self._process_incoming_sms(index)
except Exception:
pass
except Exception as exc:
print(f"[GSM] Polling error: {exc}")
def _process_incoming_sms(self, index: int):
"""Read, decode, sanitize, and dispatch a message from one modem slot."""
with self._processing_lock:
if index in self._processing_indexes:
return
self._processing_indexes.add(index)
try:
time.sleep(0.25)
resp = self._send_at_simple(f"AT+CMGR={index}", wait=4)
if not _has_terminal_status(resp):
return
sender = "unknown"
body_lines = []
reading_body = False
for line in resp.split("\n"):
line = line.strip()
if line.startswith("+CMGR:"):
parts = line.split(",")
if len(parts) >= 2:
sender = parts[1].strip('"')
if sender.startswith("00") and len(sender) >= 8:
sender = _decode_ucs2_hex(sender)
reading_body = True
continue
if reading_body:
if line == "OK":
break
if line:
body_lines.append(line)
full_body = "\n".join(body_lines)
clean_body = "".join(full_body.split())
if clean_body.startswith(("@S:", "@G:")):
print(f"[GSM-Debug] Raw Payload Hex: {full_body.encode('utf-8', errors='replace').hex()[:100]}...")
sanitized_body = _sanitize_protocol_body(full_body)
if sanitized_body != full_body:
print("[GSM] Sanitized protocol body to remove modem control artifacts.")
full_body = sanitized_body
clean_body = "".join(full_body.split())
if not clean_body.startswith(("@S:", "@G:")):
if len(clean_body) >= 4 and re.fullmatch(r"[0-9A-Fa-f]+", clean_body):
trimmed = clean_body[: (len(clean_body) // 4) * 4]
if trimmed:
decoded = _decode_ucs2_hex(trimmed)
if decoded and decoded != trimmed:
full_body = decoded
print(f"[GSM] Message from {sender}: {full_body[:50]}...")
if self.message_callback:
self.message_callback(sender, full_body)
self._send_at_simple(f"AT+CMGD={index}", wait=1)
finally:
with self._processing_lock:
self._processing_indexes.discard(index)