344 lines
12 KiB
Python
344 lines
12 KiB
Python
import abc
|
|
import re
|
|
import threading
|
|
import time
|
|
from typing import Callable, Optional
|
|
|
|
import serial
|
|
|
|
|
|
_TERMINAL_STATUS_RE = re.compile(r"(?:^|\r?\n)(OK|ERROR)\r?\n?\s*$", re.DOTALL)
|
|
_PROTOCOL_ALLOWED_RE = re.compile(r"[^A-Za-z0-9@:\|\-_=+/]")
|
|
|
|
|
|
def _has_non_ascii(text: str) -> bool:
|
|
"""Check if text contains non-ASCII characters."""
|
|
try:
|
|
text.encode("ascii")
|
|
return False
|
|
except UnicodeEncodeError:
|
|
return True
|
|
|
|
|
|
def _to_ucs2_hex(text: str) -> str:
|
|
"""Encode text as UCS2 hex using utf-16-be."""
|
|
return text.encode("utf-16-be").hex().upper()
|
|
|
|
|
|
def _decode_ucs2_hex(hex_str: str) -> str:
|
|
"""Decode UCS2 hex text and ignore junk after the valid prefix."""
|
|
try:
|
|
match = re.match(r"^([0-9A-Fa-f]{4})+", hex_str)
|
|
if not match:
|
|
return hex_str
|
|
valid_hex = match.group(0)
|
|
return bytes.fromhex(valid_hex).decode("utf-16-be")
|
|
except Exception:
|
|
return hex_str
|
|
|
|
|
|
def _has_terminal_status(text: str) -> bool:
|
|
"""Only treat OK/ERROR as complete when it is the final modem status line."""
|
|
return bool(_TERMINAL_STATUS_RE.search(text))
|
|
|
|
|
|
def _sanitize_protocol_body(text: str) -> str:
|
|
"""Remove control bytes and modem junk from protocol SMS bodies."""
|
|
no_controls = re.sub(r"[\x00-\x1F\x7F]", "", text)
|
|
compact = "".join(no_controls.split())
|
|
if compact.startswith(("@S:", "@G:")):
|
|
return _PROTOCOL_ALLOWED_RE.sub("", compact)
|
|
return text
|
|
|
|
|
|
class IMessageGateway(abc.ABC):
|
|
@property
|
|
@abc.abstractmethod
|
|
def is_connected(self) -> bool:
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def connect(self) -> bool:
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def disconnect(self) -> None:
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def send_frames(self, phone: str, frames: list[str]) -> bool:
|
|
pass
|
|
|
|
|
|
class GSMGateway(IMessageGateway):
|
|
def __init__(
|
|
self,
|
|
port: str,
|
|
baudrate: int,
|
|
message_callback: Optional[Callable[[str, str], None]] = None,
|
|
):
|
|
self.port = port
|
|
self.baudrate = baudrate
|
|
self.message_callback = message_callback
|
|
self.serial_conn = None
|
|
self.is_running = False
|
|
self.read_thread = None
|
|
self.poll_thread = None
|
|
self.lock = threading.Lock()
|
|
self.buffer_lock = threading.Lock()
|
|
self._unread_buffer: bytes = b""
|
|
self._processing_lock = threading.Lock()
|
|
self._processing_indexes: set[int] = set()
|
|
|
|
@property
|
|
def is_connected(self) -> bool:
|
|
return bool(self.serial_conn and self.serial_conn.is_open)
|
|
|
|
def connect(self) -> bool:
|
|
try:
|
|
print(f"[GSM] Connecting to port={self.port} baudrate={self.baudrate}")
|
|
self.serial_conn = serial.Serial(
|
|
port=self.port,
|
|
baudrate=self.baudrate,
|
|
timeout=0.1,
|
|
xonxoff=False,
|
|
rtscts=False,
|
|
dsrdtr=False,
|
|
)
|
|
self.is_running = True
|
|
|
|
self._send_raw("AT\r\n")
|
|
time.sleep(0.5)
|
|
self._send_raw("ATE0\r\n")
|
|
time.sleep(0.5)
|
|
self._send_raw("AT+CMGF=1\r\n")
|
|
time.sleep(0.5)
|
|
self._send_raw('AT+CPMS="ME","ME","ME"\r\n')
|
|
time.sleep(0.5)
|
|
self._send_raw("AT+CNMI=2,1,0,0,0\r\n")
|
|
time.sleep(0.5)
|
|
self._send_raw("AT+IFC=0,0\r\n")
|
|
time.sleep(0.5)
|
|
|
|
self.read_thread = threading.Thread(target=self._read_loop, daemon=True)
|
|
self.read_thread.start()
|
|
|
|
self.poll_thread = threading.Thread(target=self._poll_loop, daemon=True)
|
|
self.poll_thread.start()
|
|
return True
|
|
except Exception as exc:
|
|
print(f"[GSM] Connection failed: {exc}")
|
|
self.serial_conn = None
|
|
self.is_running = False
|
|
return False
|
|
|
|
def disconnect(self):
|
|
self.is_running = False
|
|
if self.serial_conn and self.serial_conn.is_open:
|
|
self.serial_conn.close()
|
|
if self.read_thread:
|
|
self.read_thread.join(timeout=1.5)
|
|
|
|
def _send_raw(self, cmd: str):
|
|
if self.serial_conn and self.serial_conn.is_open:
|
|
self.serial_conn.write(cmd.encode("utf-8"))
|
|
|
|
def _send_at_simple(self, command: str, wait: float = 2.0) -> str:
|
|
"""Send an AT command and wait for a complete modem response."""
|
|
with self.lock:
|
|
if not self.is_connected:
|
|
return ""
|
|
|
|
with self.buffer_lock:
|
|
self._unread_buffer = b""
|
|
|
|
print(f"[GSM] CMD: {command}")
|
|
self.serial_conn.write((command + "\r\n").encode("ascii", errors="ignore"))
|
|
|
|
start_time = time.time()
|
|
while (time.time() - start_time) < (wait + 5):
|
|
time.sleep(0.1)
|
|
with self.buffer_lock:
|
|
text = self._unread_buffer.decode("ascii", errors="replace")
|
|
if _has_terminal_status(text):
|
|
break
|
|
|
|
with self.buffer_lock:
|
|
final_resp = self._unread_buffer.decode("utf-8", errors="replace")
|
|
self._unread_buffer = b""
|
|
return final_resp
|
|
|
|
def send_at_cmd(
|
|
self,
|
|
command: str,
|
|
expected_response: str = "OK",
|
|
timeout: float = 2.0,
|
|
) -> tuple[bool, list[str]]:
|
|
resp = self._send_at_simple(command, wait=timeout)
|
|
lines = [line.strip() for line in resp.split("\n") if line.strip()]
|
|
return _has_terminal_status(resp) and expected_response in resp, lines
|
|
|
|
def send_frames(self, phone: str, frames: list[str]) -> bool:
|
|
if not self.is_connected:
|
|
return False
|
|
for frame in frames:
|
|
if not self._send_single_sms(phone, frame):
|
|
return False
|
|
time.sleep(1.0)
|
|
return True
|
|
|
|
def _send_single_sms(self, phone: str, body: str) -> bool:
|
|
with self.lock:
|
|
try:
|
|
needs_ucs2 = _has_non_ascii(body)
|
|
print(f"[GSM] Sending SMS to {phone}, ucs2={needs_ucs2}")
|
|
|
|
self.serial_conn.write(b'AT+CSCS="UCS2"\r\n' if needs_ucs2 else b'AT+CSCS="GSM"\r\n')
|
|
time.sleep(1)
|
|
|
|
self.serial_conn.write(b'AT+CSMP=17,167,0,8\r\n' if needs_ucs2 else b'AT+CSMP=17,167,0,0\r\n')
|
|
time.sleep(1)
|
|
|
|
self.serial_conn.write(b"AT+CMGF=1\r\n")
|
|
time.sleep(1)
|
|
|
|
target = _to_ucs2_hex(phone) if needs_ucs2 else phone
|
|
self.serial_conn.write(f'AT+CMGS="{target}"\r\n'.encode("utf-8"))
|
|
time.sleep(1.5)
|
|
|
|
payload = _to_ucs2_hex(body) if needs_ucs2 else body
|
|
self.serial_conn.write(payload.encode("utf-8"))
|
|
time.sleep(0.5)
|
|
self.serial_conn.write(bytes([26]))
|
|
|
|
time.sleep(8)
|
|
with self.buffer_lock:
|
|
resp = self._unread_buffer.decode("utf-8", errors="replace")
|
|
self._unread_buffer = b""
|
|
|
|
if _has_terminal_status(resp) and "OK" in resp:
|
|
print("[GSM] SMS sent successfully.")
|
|
return True
|
|
|
|
print(f"[GSM] SMS fail or uncertain. Response: {repr(resp)}")
|
|
return False
|
|
except Exception as exc:
|
|
print(f"[GSM] Exception during SMS: {exc}")
|
|
return False
|
|
|
|
def _read_loop(self):
|
|
"""Continuously read serial and react to unsolicited +CMTI notifications."""
|
|
while self.is_running:
|
|
try:
|
|
if self.is_connected and self.serial_conn.in_waiting > 0:
|
|
data = self.serial_conn.read(self.serial_conn.in_waiting)
|
|
if data:
|
|
with self.buffer_lock:
|
|
self._unread_buffer += data
|
|
|
|
try:
|
|
text = data.decode("ascii", errors="replace")
|
|
if "+CMTI:" in text:
|
|
match = re.search(r'\+CMTI:\s*".*?",(\d+)', text)
|
|
if match:
|
|
index = int(match.group(1))
|
|
print(f"[GSM] New SMS notification at index {index}")
|
|
threading.Thread(
|
|
target=self._process_incoming_sms,
|
|
args=(index,),
|
|
daemon=True,
|
|
).start()
|
|
except Exception:
|
|
pass
|
|
except Exception as exc:
|
|
if self.is_running:
|
|
print(f"[GSM] Read error: {exc}")
|
|
time.sleep(0.1)
|
|
|
|
def _poll_loop(self):
|
|
"""Fail-safe polling in case +CMTI notifications are missed."""
|
|
while self.is_running:
|
|
time.sleep(60)
|
|
if not self.is_connected:
|
|
continue
|
|
|
|
try:
|
|
success, lines = self.send_at_cmd('AT+CMGL="ALL"', timeout=5)
|
|
if success:
|
|
for line in lines:
|
|
if line.startswith("+CMGL:"):
|
|
parts = line.split(",")
|
|
if len(parts) >= 1:
|
|
try:
|
|
index_str = parts[0].split(":")[1].strip()
|
|
index = int(index_str)
|
|
print(f"[GSM] Found message during polling at index {index}")
|
|
self._process_incoming_sms(index)
|
|
except Exception:
|
|
pass
|
|
except Exception as exc:
|
|
print(f"[GSM] Polling error: {exc}")
|
|
|
|
def _process_incoming_sms(self, index: int):
|
|
"""Read, decode, sanitize, and dispatch a message from one modem slot."""
|
|
with self._processing_lock:
|
|
if index in self._processing_indexes:
|
|
return
|
|
self._processing_indexes.add(index)
|
|
|
|
try:
|
|
time.sleep(0.25)
|
|
resp = self._send_at_simple(f"AT+CMGR={index}", wait=4)
|
|
if not _has_terminal_status(resp):
|
|
return
|
|
|
|
sender = "unknown"
|
|
body_lines = []
|
|
reading_body = False
|
|
|
|
for line in resp.split("\n"):
|
|
line = line.strip()
|
|
if line.startswith("+CMGR:"):
|
|
parts = line.split(",")
|
|
if len(parts) >= 2:
|
|
sender = parts[1].strip('"')
|
|
if sender.startswith("00") and len(sender) >= 8:
|
|
sender = _decode_ucs2_hex(sender)
|
|
reading_body = True
|
|
continue
|
|
|
|
if reading_body:
|
|
if line == "OK":
|
|
break
|
|
if line:
|
|
body_lines.append(line)
|
|
|
|
full_body = "\n".join(body_lines)
|
|
clean_body = "".join(full_body.split())
|
|
|
|
if clean_body.startswith(("@S:", "@G:")):
|
|
print(f"[GSM-Debug] Raw Payload Hex: {full_body.encode('utf-8', errors='replace').hex()[:100]}...")
|
|
|
|
sanitized_body = _sanitize_protocol_body(full_body)
|
|
if sanitized_body != full_body:
|
|
print("[GSM] Sanitized protocol body to remove modem control artifacts.")
|
|
full_body = sanitized_body
|
|
clean_body = "".join(full_body.split())
|
|
|
|
if not clean_body.startswith(("@S:", "@G:")):
|
|
if len(clean_body) >= 4 and re.fullmatch(r"[0-9A-Fa-f]+", clean_body):
|
|
trimmed = clean_body[: (len(clean_body) // 4) * 4]
|
|
if trimmed:
|
|
decoded = _decode_ucs2_hex(trimmed)
|
|
if decoded and decoded != trimmed:
|
|
full_body = decoded
|
|
|
|
print(f"[GSM] Message from {sender}: {full_body[:50]}...")
|
|
if self.message_callback:
|
|
self.message_callback(sender, full_body)
|
|
|
|
self._send_at_simple(f"AT+CMGD={index}", wait=1)
|
|
finally:
|
|
with self._processing_lock:
|
|
self._processing_indexes.discard(index)
|