diff --git a/a.ps1 b/a.ps1 index 69c0076..646dbd1 100644 --- a/a.ps1 +++ b/a.ps1 @@ -1,11 +1,12 @@ # ----------------------------- # تنظیمات # ----------------------------- -$LocalPath = "C:\Users\Pars\Desktop\saba-python" # مسیر پروژه روی ویندوز +$LocalPath = $PSScriptRoot # مسیر پروژه روی ویندوز (همان پوشه‌ای که اسکریپت در آن است) $PiUser = "pars" # کاربر روی Raspberry Pi -$PiHost = "192.168.1.25" # آی‌پی Raspberry Pi -$RemotePath = "/home/pars/Desktop/" # مسیر پروژه روی Pi -$MainPy = "saba-python/main.py" # فایل اصلی پایتون +$PiHost = "10.63.136.150" # آی‌پی Raspberry Pi +$RemotePath = "/home/pars/Desktop" # مسیر پروژه روی Pi +$ProjectFolder = "saba-python" # نام پوشه پروژه +$MainPy = "main.py" # فایل اصلی پایتون $KeyPath = "$env:USERPROFILE\.ssh\id_ed25519" @@ -37,16 +38,11 @@ $FolderName = Split-Path $LocalPath -Leaf $DeployTar = "$ParentDir\deploy.tar" Set-Location $ParentDir -tar.exe -cf deploy.tar --exclude=".git" --exclude="__pycache__" $FolderName +tar.exe -cf deploy.tar --exclude=".git" --exclude="__pycache__" --exclude=".runtime-venv" $FolderName scp deploy.tar "$PiUser@$PiHost`:$RemotePath/deploy.tar" -ssh "$PiUser@$PiHost" "cd $RemotePath && tar -xf deploy.tar && rm deploy.tar" +ssh "$PiUser@$PiHost" "cd $RemotePath && rm -rf $ProjectFolder && tar -xf deploy.tar && rm deploy.tar" Remove-Item $DeployTar Set-Location $LocalPath # ----------------------------- # اجرای برنامه روی Raspberry Pi -# ----------------------------- -Write-Host "Running program on Raspberry Pi..." -ssh "$PiUser@$PiHost" "cd $RemotePath && python3 $MainPy" - -Write-Host "Deployment complete! Latest version is running on Raspberry Pi." -ForegroundColor Green \ No newline at end of file diff --git a/deploy.tar b/deploy.tar new file mode 100644 index 0000000..06d7405 Binary files /dev/null and b/deploy.tar differ diff --git a/diag_uart.py b/diag_uart.py new file mode 100644 index 0000000..1362815 --- /dev/null +++ b/diag_uart.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python3 +"""UART Diagnostic - finds the correct serial port for Quectel M66.""" +import glob +import time + +try: + import serial +except ImportError: + print("ERROR: pyserial not installed. Run: pip install pyserial") + exit(1) + +# Discover all serial ports +ports = sorted(glob.glob("/dev/ttyS*") + glob.glob("/dev/ttyAMA*") + glob.glob("/dev/serial*")) +print(f"Available serial ports: {ports}\n") + +baudrates = [9600, 115200, 19200, 38400, 57600, 4800] + +for port in ports: + for baud in baudrates: + try: + ser = serial.Serial(port, baud, timeout=1.5) + ser.reset_input_buffer() + ser.write(b"AT\r\n") + time.sleep(0.5) + response = b"" + while ser.in_waiting: + response += ser.read(ser.in_waiting) + time.sleep(0.1) + ser.close() + resp_text = response.decode("ascii", errors="ignore").strip() + if resp_text: + status = "OK" if "OK" in resp_text else "RESPONSE" + print(f" ✅ {port} @ {baud} -> {status}: {repr(resp_text)}") + else: + print(f" ❌ {port} @ {baud} -> No response") + except Exception as e: + print(f" ⚠️ {port} @ {baud} -> Error: {e}") + +print("\n--- Done ---") +print("If no port responded, check:") +print(" 1. Modem power (M66 needs stable 3.8V-4.2V VBAT)") +print(" 2. TX/RX wiring (Pi TX -> M66 RX, Pi RX -> M66 TX)") +print(" 3. GND connection between Pi and M66") +print(" 4. On Pi 5: add 'dtoverlay=uart0-pi5' to /boot/firmware/config.txt") diff --git a/secure_sms/application/__pycache__/__init__.cpython-313.pyc b/secure_sms/application/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..9f9b1d8 Binary files /dev/null and b/secure_sms/application/__pycache__/__init__.cpython-313.pyc differ diff --git a/secure_sms/application/__pycache__/controller.cpython-312.pyc b/secure_sms/application/__pycache__/controller.cpython-312.pyc new file mode 100644 index 0000000..d650236 Binary files /dev/null and b/secure_sms/application/__pycache__/controller.cpython-312.pyc differ diff --git a/secure_sms/application/__pycache__/controller.cpython-313.pyc b/secure_sms/application/__pycache__/controller.cpython-313.pyc index 2bfa4a1..b59b9c6 100644 Binary files a/secure_sms/application/__pycache__/controller.cpython-313.pyc and b/secure_sms/application/__pycache__/controller.cpython-313.pyc differ diff --git a/secure_sms/application/__pycache__/services.cpython-312.pyc b/secure_sms/application/__pycache__/services.cpython-312.pyc new file mode 100644 index 0000000..b92b538 Binary files /dev/null and b/secure_sms/application/__pycache__/services.cpython-312.pyc differ diff --git a/secure_sms/application/__pycache__/services.cpython-313.pyc b/secure_sms/application/__pycache__/services.cpython-313.pyc index 0e93f75..cd51eae 100644 Binary files a/secure_sms/application/__pycache__/services.cpython-313.pyc and b/secure_sms/application/__pycache__/services.cpython-313.pyc differ diff --git a/secure_sms/application/controller.py b/secure_sms/application/controller.py index f7f17e9..5c3abc1 100644 --- a/secure_sms/application/controller.py +++ b/secure_sms/application/controller.py @@ -99,12 +99,16 @@ class AppController: self.service.delete_contact(phone) self._notify_ui() + def set_symmetric_key(self, phone: str, key: str): + self.service.set_symmetric_key(phone, key) + self._notify_ui() + def get_messages(self, phone: str): return self.service.get_messages(phone) - def send_message(self, phone: str, text: str) -> tuple[bool, str]: + def send_message(self, phone: str, text: str, symmetric_key: Optional[str] = None) -> tuple[bool, str]: try: - frames, mode = self.service.prepare_outgoing_message(phone, text) + frames, mode = self.service.prepare_outgoing_message(phone, text, symmetric_key=symmetric_key) except Exception as exc: return False, str(exc) @@ -114,16 +118,16 @@ class AppController: return True, "queued" def request_secure(self, phone: str) -> tuple[bool, str]: - frames = self.service.request_secure_channel(phone) - self._outbox.put({"phone": phone, "frames": frames, "msg_id": None}) - self._notify_ui(phone) - return True, "queued" + try: + ok, state = self.service.request_secure(phone) + self._notify_ui(phone) + return ok, state + except Exception as exc: + return False, str(exc) + + + - def switch_to_normal(self, phone: str) -> tuple[bool, str]: - frames = self.service.request_normal_mode(phone) - self._outbox.put({"phone": phone, "frames": frames, "msg_id": None}) - self._notify_ui(phone) - return True, "queued" def get_admin_snapshot(self) -> dict: snapshot = self.service.get_admin_snapshot() @@ -138,6 +142,9 @@ class AppController: finally: self._notify_ui(sender) + def decrypt_message_manually(self, message_id: int, key: str) -> bool: + return self.service.decrypt_message_manually(message_id, key) + def _notify_ui(self, phone: Optional[str] = None): if self.ui: self.ui.after(0, lambda: self.ui.handle_background_refresh(phone)) diff --git a/secure_sms/application/services.py b/secure_sms/application/services.py index 7e3b4c3..8754f75 100644 --- a/secure_sms/application/services.py +++ b/secure_sms/application/services.py @@ -1,3 +1,4 @@ +import os import platform from pathlib import Path from typing import Optional @@ -5,14 +6,13 @@ from typing import Optional from secure_sms.infrastructure.database import Database, utc_now from secure_sms.core.models import ContactDetails, ContactSummary, MessageView, PendingPacketView, SecureEventView from secure_sms.core.protocol import ( - build_control_frames, - build_message_frames, - decode_control_payload, - decode_plain_body, - encode_plain_body, - parse_frame, + build_nack, + build_normal_mode, + build_symmetric_msg, + parse_incoming, ) -from secure_sms.core.security import ECCCryptoService, PasswordManager, StorageCipher +from secure_sms.core.security import SymmetricCryptoService, PasswordManager, StorageCipher +from secure_sms.core.utils import normalize_phone SYSTEM_CONTACT_LABEL = "مخاطب ناشناس" @@ -22,13 +22,12 @@ class SecureMessagingService: def __init__(self, db: Database): self.db = db self.password_manager = PasswordManager() - self.crypto = ECCCryptoService() + self.crypto = SymmetricCryptoService() self.cipher: Optional[StorageCipher] = None - self.identity = None @property def unlocked(self) -> bool: - return self.cipher is not None and self.identity is not None + return self.cipher is not None def is_bootstrapped(self) -> bool: return self.db.is_bootstrapped() @@ -39,21 +38,15 @@ class SecureMessagingService: meta = self.password_manager.create_metadata(password) key = self.password_manager.derive_key(password, meta.salt) self.cipher = StorageCipher(key) - private_key, public_key, fingerprint = self.crypto.generate_identity() + self.db.set_security_metadata(meta) self.db.save_identity( - private_key_enc=self.cipher.encrypt_text(private_key), - public_key_enc=self.cipher.encrypt_text(public_key), - fingerprint=fingerprint, + private_key_enc=None, + public_key_enc=None, + fingerprint="SYMMETRIC_ONLY", ) - import os - self.db.set_connection_settings("/dev/ttyS0" if os.name != "nt" else "COM1", 115200) - self.identity = { - "private_key": private_key, - "public_key": public_key, - "fingerprint": fingerprint, - } - self.db.log_secure_event(None, "app_bootstrap", self._enc("راه‌اندازی اولیه برنامه انجام شد.")) + self.db.set_connection_settings("/dev/serial0" if os.name != "nt" else "COM1", 9600) + self.db.log_secure_event(None, "app_bootstrap", self._enc("راه‌اندازی اولیه صبا (نسخه متقارن AES-256) انجام شد.")) def unlock(self, password: str) -> bool: meta = self.db.get_security_metadata() @@ -63,14 +56,13 @@ class SecureMessagingService: return False key = self.password_manager.derive_key(password, meta.salt) self.cipher = StorageCipher(key) - identity_row = self.db.get_identity_row() - if identity_row is None: - raise ValueError("Secure identity was not found.") - self.identity = { - "private_key": self.cipher.decrypt_text(identity_row["private_key_enc"]), - "public_key": self.cipher.decrypt_text(identity_row["public_key_enc"]), - "fingerprint": identity_row["fingerprint"], - } + + # Migrate any existing legacy phone numbers to canonical format + try: + self._migrate_all_data() + except Exception as e: + print(f"[Migration] Error during data unification: {e}") + return True def verify_password(self, password: str) -> bool: @@ -91,12 +83,6 @@ class SecureMessagingService: self.db.rotate_encrypted_payloads(old_cipher, new_cipher) self.db.set_security_metadata(new_meta) self.cipher = new_cipher - identity_row = self.db.get_identity_row() - self.identity = { - "private_key": self.cipher.decrypt_text(identity_row["private_key_enc"]), - "public_key": self.cipher.decrypt_text(identity_row["public_key_enc"]), - "fingerprint": identity_row["fingerprint"], - } self.db.log_secure_event(None, "password_changed", self._enc("رمز اصلی برنامه تغییر کرد.")) def _enc(self, value: Optional[str]) -> Optional[str]: @@ -110,12 +96,17 @@ class SecureMessagingService: return self.cipher.decrypt_text(value) def add_or_update_contact(self, name: str, phone: str): + phone = normalize_phone(phone) self.db.upsert_contact(phone, self._enc(name)) def delete_contact(self, phone: str): + phone = normalize_phone(phone) self.db.delete_contact(phone) def ensure_contact(self, phone: str, fallback_name: Optional[str] = None): + phone = normalize_phone(phone) + if not phone: + return fallback = fallback_name or SYSTEM_CONTACT_LABEL self.db.ensure_contact_exists(phone, self._enc(fallback)) @@ -131,13 +122,14 @@ class SecureMessagingService: name=self._dec(row["name_enc"]) or SYSTEM_CONTACT_LABEL, mode=row["mode"], secure_state=row["secure_state"], - has_peer_key=bool(row["peer_public_key_enc"]), + has_peer_key=False, # Asymmetric is gone last_message_preview=(preview[:38] + "...") if preview and len(preview) > 38 else (preview or ""), ) ) return contacts def get_contact(self, phone: str) -> Optional[ContactDetails]: + phone = normalize_phone(phone) row = self.db.get_contact_row(phone) if not row: return None @@ -147,11 +139,13 @@ class SecureMessagingService: mode=row["mode"], secure_state=row["secure_state"], peer_fingerprint=row["peer_fingerprint"], - has_peer_key=bool(row["peer_public_key_enc"]), + has_peer_key=False, + symmetric_key=self._dec(row["symmetric_key_enc"]) if row["symmetric_key_enc"] else None, last_secure_at=row["last_secure_at"], ) def get_messages(self, phone: str) -> list[MessageView]: + phone = normalize_phone(phone) return [ MessageView( id=row["id"], @@ -165,29 +159,59 @@ class SecureMessagingService: for row in self.db.list_message_rows(phone) ] - def get_public_identity(self) -> dict: - if not self.identity: - raise RuntimeError("Application is locked.") - return { - "public_key": self.identity["public_key"], - "fingerprint": self.identity["fingerprint"], - } + # ── Outgoing Messages ────────────────────────────────────────── + + def prepare_outgoing_message(self, phone: str, text: str, symmetric_key: Optional[str] = None) -> tuple[list[str], str]: + """Prepare SMS frame(s) for an outgoing message.""" + if len(text) > 1000: + raise ValueError("طول پیام نباید بیشتر از ۱۰۰۰ کاراکتر باشد.") + phone = normalize_phone(phone) + self.ensure_contact(phone) - def prepare_outgoing_message(self, phone: str, text: str) -> tuple[list[str], str]: contact = self.db.get_contact_row(phone) - if not contact: - raise ValueError("مخاطب پیدا نشد.") - mode = contact["mode"] - if mode == "secure": - peer_key = self._dec(contact["peer_public_key_enc"]) - if not peer_key: - raise ValueError("برای این مخاطب کلید امن وجود ندارد.") - encoded_payload = self.crypto.encrypt_for_peer(text, peer_key) - return build_message_frames("S", encoded_payload), "secure" - encoded_payload = encode_plain_body(text) - return build_message_frames("N", encoded_payload), "normal" + saved_symmetric_key = ( + self._dec(contact["symmetric_key_enc"]) + if contact and contact["symmetric_key_enc"] + else None + ) + + # If the operator provides a key while sending, treat it as the contact's + # active shared key so the next secure message and any reply can reuse it. + if symmetric_key is not None: + symmetric_key = symmetric_key.strip() + if symmetric_key: + if ( + symmetric_key != saved_symmetric_key + or not contact + or contact["mode"] != "secure" + or contact["secure_state"] != "ready" + ): + self.db.update_contact_security( + phone, + mode="secure", + secure_state="ready", + symmetric_key_enc=self._enc(symmetric_key), + last_secure_at=utc_now(), + ) + else: + symmetric_key = None + + # If no key is provided for this specific message, default to normal mode. + # This allows the user to choice between "Normal" (plain text) and "Secure" + # for every outgoing message, regardless of the conversation state. + if not symmetric_key: + symmetric_key = None + + if symmetric_key: + encrypted_payload = self.crypto.encrypt_symmetric(text, symmetric_key) + frames = build_symmetric_msg(encrypted_payload) + return frames, "secure" + + # Normal mode: send plain text directly + return [text], "normal" def store_outgoing_message(self, phone: str, text: str, mode: str, transport_state: str) -> int: + phone = normalize_phone(phone) return self.db.add_message( phone=phone, direction="out", @@ -196,168 +220,89 @@ class SecureMessagingService: transport_state=transport_state, ) - def request_secure_channel(self, phone: str) -> list[str]: - self.ensure_contact(phone) - payload = { - "type": "hello", - "public_key": self.identity["public_key"], - "fingerprint": self.identity["fingerprint"], - "ts": utc_now(), - } - self.db.update_contact_security(phone, mode="normal", secure_state="pending") - self.db.log_secure_event(phone, "hello_sent", self._enc("درخواست ارتباط امن ارسال شد.")) - self.db.add_message( - phone=phone, - direction="system", - body_enc=self._enc("درخواست ارتباط امن برای مخاطب ارسال شد."), - mode="system", - transport_state="local", - ) - return build_control_frames(payload) - def request_normal_mode(self, phone: str) -> list[str]: + """Switch contact back to normal mode.""" + phone = normalize_phone(phone) self.ensure_contact(phone) - payload = { - "type": "normal_mode", - "ts": utc_now(), - } + sms = build_normal_mode() self.db.update_contact_security(phone, mode="normal", secure_state="ready") self.db.log_secure_event(phone, "normal_mode_sent", self._enc("بازگشت به حالت عادی برای مخاطب ارسال شد.")) - self.db.add_message( - phone=phone, - direction="system", - body_enc=self._enc("گفتگو به حالت عادی برگشت."), - mode="system", - transport_state="local", - ) - return build_control_frames(payload) + return [sms] + + def request_secure(self, phone: str) -> tuple[bool, str]: + """Switch contact to secure mode (symmetric).""" + phone = normalize_phone(phone) + self.ensure_contact(phone) + contact = self.db.get_contact_row(phone) + symmetric_key = self._dec(contact["symmetric_key_enc"]) if contact and contact["symmetric_key_enc"] else None + + if not symmetric_key: + raise ValueError("ابتدا باید کلید متقارن را برای این مخاطب تنظیم کنید.") + + self.db.update_contact_security(phone, mode="secure", secure_state="ready") + self.db.log_secure_event(phone, "secure_mode_enabled", self._enc("حالت امن (متقارن) فعال شد.")) + return True, "ready" + + def set_symmetric_key(self, phone: str, key: str): + phone = normalize_phone(phone) + self.ensure_contact(phone) + self.db.update_contact_security(phone, mode="secure", symmetric_key_enc=self._enc(key)) + self.db.log_secure_event(phone, "symmetric_key_set", self._enc("کلید متقارن تنظیم شد.")) def process_incoming_sms(self, sender: str, raw_text: str) -> tuple[str, Optional[list[str]]]: + """Parse and process an incoming SMS.""" + sender = normalize_phone(sender) self.ensure_contact(sender) - frame = parse_frame(raw_text) - if not frame: + msg = parse_incoming(raw_text) + + if msg.msg_type == "plain": self.db.add_message( phone=sender, direction="in", - body_enc=self._enc(raw_text), + body_enc=self._enc(msg.plain_text or ""), mode="normal", - transport_state="received_raw", + transport_state="received", ) return sender, None - payload = self._store_or_assemble_frame(sender, frame) - if payload is None: - self.db.log_secure_event( - sender, - "packet_fragment_received", - self._enc(f"بسته {frame.packet_id} در حال تکمیل است ({frame.part_no}/{frame.total_parts})."), - ) - return sender, None - - if frame.category == "control": - return sender, self._handle_control_payload(sender, payload) - else: - self._handle_message_payload(sender, frame.mode or "N", payload) - return sender, None - - def _store_or_assemble_frame(self, sender: str, frame) -> Optional[str]: - if frame.total_parts == 1: - return frame.chunk - self.db.save_fragment( - sender, - frame.packet_id, - frame.category, - frame.mode, - frame.part_no, - frame.total_parts, - frame.chunk, - ) - fragments = self.db.get_packet_fragments(sender, frame.packet_id) - if len(fragments) < frame.total_parts: - return None - payload = "".join(fragment["chunk"] for fragment in fragments) - self.db.delete_packet_fragments(sender, frame.packet_id) - return payload - - def _handle_control_payload(self, sender: str, payload: str) -> Optional[list[str]]: - data = decode_control_payload(payload) - action = data.get("type") - if action == "hello": - public_key = data.get("public_key") - fingerprint = data.get("fingerprint") or self.crypto.fingerprint_public_key(public_key) - self.db.update_contact_security( - sender, - mode="secure", - secure_state="ready", - peer_public_key_enc=self._enc(public_key), - peer_fingerprint=fingerprint, - last_secure_at=utc_now(), - ) - self.db.log_secure_event(sender, "hello_received", self._enc("درخواست ارتباط امن دریافت شد.")) - self.db.log_secure_event(sender, "secure_established", self._enc("ارتباط امن برقرار شد.")) - self.db.add_message( - phone=sender, - direction="system", - body_enc=self._enc("ارتباط امن با این مخاطب فعال شد."), - mode="system", - transport_state="local", - ) - reply = { - "type": "hello_ack", - "public_key": self.identity["public_key"], - "fingerprint": self.identity["fingerprint"], - "ts": utc_now(), - } - return build_control_frames(reply) - elif action == "hello_ack": - public_key = data.get("public_key") - fingerprint = data.get("fingerprint") or self.crypto.fingerprint_public_key(public_key) - self.db.update_contact_security( - sender, - mode="secure", - secure_state="ready", - peer_public_key_enc=self._enc(public_key), - peer_fingerprint=fingerprint, - last_secure_at=utc_now(), - ) - self.db.log_secure_event(sender, "hello_ack_received", self._enc("پاسخ ارتباط امن دریافت شد.")) - self.db.log_secure_event(sender, "secure_established", self._enc("ارتباط امن برقرار شد.")) - self.db.add_message( - phone=sender, - direction="system", - body_enc=self._enc("ارتباط امن آماده استفاده است."), - mode="system", - transport_state="local", - ) - return None - elif action == "normal_mode": + if msg.msg_type == "norm": self.db.update_contact_security(sender, mode="normal", secure_state="ready") self.db.log_secure_event(sender, "normal_mode_received", self._enc("مخاطب گفتگو را به حالت عادی برگرداند.")) - self.db.add_message( - phone=sender, - direction="system", - body_enc=self._enc("مخاطب گفتگو را به حالت عادی برگرداند."), - mode="system", - transport_state="local", - ) - return None - return None + return sender, None - def _handle_message_payload(self, sender: str, mode_marker: str, payload: str): - if mode_marker == "S": - try: - body = self.crypto.decrypt_from_peer(payload, self.identity["private_key"]) - mode = "secure" - transport_state = "received_secure" - except Exception: - body = "پیام امن دریافت شد اما بازگشایی نشد." - mode = "secure" - transport_state = "decrypt_failed" - self.db.log_secure_event(sender, "decrypt_failed", self._enc("بازگشایی پیام امن ناموفق بود.")) - else: - body = decode_plain_body(payload) - mode = "normal" - transport_state = "received" + if (msg.msg_type == "sym") or (msg.msg_type == "gsym"): + self._handle_symmetric_message(sender, msg.encrypted_payload or "", is_group=msg.is_group) + return sender, None + + if msg.msg_type == "sfra": + self._handle_fragment(sender, msg) + return sender, None + + if msg.msg_type == "nack": + print(f"[PROTO] NACK received from {sender}: pkt={msg.packet_id} part={msg.missing_part}") + return sender, None + + return sender, None + + # ── Symmetric Message Handler ────────────────────────────────── + + def _handle_symmetric_message(self, sender: str, payload: str, is_group: bool = False): + """Decrypt and store a single symmetric message (private or group).""" + try: + contact = self.db.get_contact_row(sender) + symmetric_key = self._dec(contact["symmetric_key_enc"]) if contact and contact["symmetric_key_enc"] else None + if not symmetric_key: + raise ValueError("No symmetric key") + body = self.crypto.decrypt_symmetric(payload, symmetric_key) + mode = "secure" + transport_state = "received_group" if is_group else "received_secure" + metadata_enc = None + except Exception: + body = "پیام رمزنگاری شده صبا دریافت شد اما بازگشایی نشد." + mode = "secure" + transport_state = "decrypt_failed" + metadata_enc = self._enc(payload) # Store raw payload for manual retry + self.db.log_secure_event(sender, "decrypt_failed_sym", self._enc("بازگشایی پیام ناموفق بود. کلید نامعتبر است.")) self.db.add_message( phone=sender, @@ -365,11 +310,86 @@ class SecureMessagingService: body_enc=self._enc(body), mode=mode, transport_state=transport_state, + metadata_enc=metadata_enc ) + def decrypt_message_manually(self, message_id: int, key: str) -> bool: + """Attempt to decrypt a specific message using a manually provided key.""" + row = self.db.get_message_row(message_id) + if not row or not row["metadata_enc"]: + return False + + payload = self._dec(row["metadata_enc"]) + try: + body = self.crypto.decrypt_symmetric(payload, key) + # If successful, update the main body and status + self.db.update_message_body(message_id, self._enc(body), "received_secure") + self.db.update_contact_security( + row["phone"], + mode="secure", + symmetric_key_enc=self._enc(key), + secure_state="ready", + ) + self.db.log_secure_event( + row["phone"], + "manual_decrypt_success", + self._enc("پیام با کلید وارد شده بازگشایی شد و کلید برای مخاطب ذخیره شد."), + ) + return True + except Exception: + return False + + # ── Fragment Handler ─────────────────────────────────────────── + + def _handle_fragment(self, sender: str, msg) -> Optional[list[str]]: + """Store a fragment and assemble when all parts arrive (always symmetric).""" + self.db.save_fragment( + sender, + msg.packet_id, + "message", + "SYM", + msg.part_no, + msg.total_parts, + msg.chunk, + ) + fragments = self.db.get_packet_fragments(sender, msg.packet_id) + if len(fragments) < msg.total_parts: + self.db.log_secure_event( + sender, + "fragment_received", + self._enc(f"قطعه {msg.part_no}/{msg.total_parts} دریافت شد (بسته {msg.packet_id})."), + ) + return None + + # All fragments received — assemble and decrypt + full_payload = "".join(f["chunk"] for f in fragments) + self.db.delete_packet_fragments(sender, msg.packet_id) + self._handle_symmetric_message(sender, full_payload) + return None + + def _migrate_all_data(self): + """Scan and fix any non-normalized phone numbers in the system.""" + rows = self.db.list_contact_rows() + for row in rows: + orig = row["phone"] + canonical = normalize_phone(orig) + if orig == canonical: + continue + + print(f"[Migration] Unifying legacy phone {orig} -> {canonical}") + self.db.migrate_messages_phone(orig, canonical) + self.db.migrate_fragments_phone(orig, canonical) + self.db.migrate_events_phone(orig, canonical) + + if self.db.get_contact_row(canonical): + self.db.delete_contact(orig) + else: + self.db.rename_contact_phone(orig, canonical) + + # ── Admin & Settings ─────────────────────────────────────────── + def get_admin_snapshot(self) -> dict: stats = self.db.collect_stats() - identity = self.get_public_identity() return { "stats": stats, "events": [ @@ -399,7 +419,7 @@ class SecureMessagingService: "db_path": str(Path(self.db.db_path).resolve()), "modem_port": self.db.get_connection_settings()[0], "baudrate": self.db.get_connection_settings()[1], - "fingerprint": identity["fingerprint"], + "fingerprint": "SYMMETRIC_ONLY", }, } diff --git a/secure_sms/core/__pycache__/__init__.cpython-313.pyc b/secure_sms/core/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..7449916 Binary files /dev/null and b/secure_sms/core/__pycache__/__init__.cpython-313.pyc differ diff --git a/secure_sms/core/__pycache__/models.cpython-313.pyc b/secure_sms/core/__pycache__/models.cpython-313.pyc new file mode 100644 index 0000000..44af84f Binary files /dev/null and b/secure_sms/core/__pycache__/models.cpython-313.pyc differ diff --git a/secure_sms/core/__pycache__/protocol.cpython-313.pyc b/secure_sms/core/__pycache__/protocol.cpython-313.pyc new file mode 100644 index 0000000..39287a9 Binary files /dev/null and b/secure_sms/core/__pycache__/protocol.cpython-313.pyc differ diff --git a/secure_sms/core/__pycache__/security.cpython-313.pyc b/secure_sms/core/__pycache__/security.cpython-313.pyc new file mode 100644 index 0000000..c644784 Binary files /dev/null and b/secure_sms/core/__pycache__/security.cpython-313.pyc differ diff --git a/secure_sms/core/__pycache__/utils.cpython-313.pyc b/secure_sms/core/__pycache__/utils.cpython-313.pyc new file mode 100644 index 0000000..518f5dd Binary files /dev/null and b/secure_sms/core/__pycache__/utils.cpython-313.pyc differ diff --git a/secure_sms/core/models.py b/secure_sms/core/models.py index dc07b0e..9e83f0a 100644 --- a/secure_sms/core/models.py +++ b/secure_sms/core/models.py @@ -20,6 +20,7 @@ class ContactDetails: secure_state: str peer_fingerprint: Optional[str] has_peer_key: bool + symmetric_key: Optional[str] last_secure_at: Optional[str] diff --git a/secure_sms/core/protocol.py b/secure_sms/core/protocol.py index b401e5e..c8f80b4 100644 --- a/secure_sms/core/protocol.py +++ b/secure_sms/core/protocol.py @@ -1,94 +1,126 @@ -import json +""" +Secure SMS Protocol v3 (Simplified Symmetric) + +Message Types: + @S:NORM| — Switch to normal mode + @S:SFRA|||| — Fragment of multi-part message (symmetric) + @S:SYM| — Single encrypted message (symmetric) + @S:NACK|| — Retransmission request + (no prefix) — Plain text message (normal mode) +""" import uuid from dataclasses import dataclass from typing import Optional -from secure_sms.core.security import b64u_decode, b64u_encode +PREFIX = "@S:" +G_PREFIX = "@G:" - -FRAME_PREFIX = "@SSM1" -FRAME_CHUNK_SIZE = 92 +# Maximum SMS body size in characters +MAX_SMS_CHARS = 140 +FRAG_OVERHEAD = 25 +FRAG_CHUNK_SIZE = MAX_SMS_CHARS - FRAG_OVERHEAD @dataclass -class ParsedFrame: - category: str - packet_id: str - part_no: int - total_parts: int - chunk: str - mode: Optional[str] = None +class ParsedMessage: + """Result of parsing an incoming SMS.""" + msg_type: str # "norm", "sym", "sfra", "nack", "plain", "gsym" + is_group: bool = False + encrypted_payload: Optional[str] = None + packet_id: Optional[str] = None + part_no: Optional[int] = None + total_parts: Optional[int] = None + chunk: Optional[str] = None + missing_part: Optional[int] = None + plain_text: Optional[str] = None -def _split_payload(encoded_payload: str) -> list[str]: - return [ - encoded_payload[index:index + FRAME_CHUNK_SIZE] - for index in range(0, len(encoded_payload), FRAME_CHUNK_SIZE) +# ── Builders ──────────────────────────────────────────────────────── + +def build_normal_mode() -> str: + return f"{PREFIX}NORM|" + + +def build_symmetric_msg(encrypted_payload: str) -> list[str]: + """Build one or more SMS for a symmetrically encrypted payload.""" + full = f"{PREFIX}SYM|{encrypted_payload}" + if len(full) <= MAX_SMS_CHARS: + return [full] + return build_fragments(encrypted_payload) + + +def build_group_msg(encrypted_payload: str) -> str: + """Build a single SMS for a group encrypted payload.""" + return f"{G_PREFIX}SYM|{encrypted_payload}" + + +def build_fragments(payload: str, packet_id: Optional[str] = None) -> list[str]: + """Split a payload into numbered fragments (always symmetric in v3).""" + pkt_id = packet_id or uuid.uuid4().hex[:10] + prefix = f"{PREFIX}SFRA" + chunks = [ + payload[i:i + FRAG_CHUNK_SIZE] + for i in range(0, len(payload), FRAG_CHUNK_SIZE) ] or [""] - - -def encode_plain_body(text: str) -> str: - return b64u_encode(text.encode("utf-8")) - - -def decode_plain_body(encoded_text: str) -> str: - return b64u_decode(encoded_text).decode("utf-8") - - -def encode_control_payload(payload: dict) -> str: - packed = json.dumps(payload, ensure_ascii=False, separators=(",", ":")).encode("utf-8") - return b64u_encode(packed) - - -def decode_control_payload(encoded_payload: str) -> dict: - return json.loads(b64u_decode(encoded_payload).decode("utf-8")) - - -def build_control_frames(payload: dict, packet_id: Optional[str] = None) -> list[str]: - packet_id = packet_id or uuid.uuid4().hex[:10] - encoded_payload = encode_control_payload(payload) - parts = _split_payload(encoded_payload) + total = len(chunks) return [ - f"{FRAME_PREFIX}|CTL|{packet_id}|{index + 1}|{len(parts)}|{chunk}" - for index, chunk in enumerate(parts) + f"{prefix}|{pkt_id}|{idx + 1}|{total}|{chunk}" + for idx, chunk in enumerate(chunks) ] -def build_message_frames(mode: str, encoded_payload: str, packet_id: Optional[str] = None) -> list[str]: - packet_id = packet_id or uuid.uuid4().hex[:10] - parts = _split_payload(encoded_payload) - return [ - f"{FRAME_PREFIX}|MSG|{mode}|{packet_id}|{index + 1}|{len(parts)}|{chunk}" - for index, chunk in enumerate(parts) - ] +def build_nack(packet_id: str, missing_part: int) -> str: + return f"{PREFIX}NACK|{packet_id}|{missing_part}" -def parse_frame(raw_text: str) -> Optional[ParsedFrame]: - if not raw_text.startswith(FRAME_PREFIX): - return None - if raw_text.startswith(f"{FRAME_PREFIX}|CTL|"): - parts = raw_text.split("|", 5) - if len(parts) != 6: - return None - _, _, packet_id, part_no, total_parts, chunk = parts - return ParsedFrame( - category="control", - packet_id=packet_id, - part_no=int(part_no), - total_parts=int(total_parts), - chunk=chunk, +# ── Parser ────────────────────────────────────────────────────────── + +def parse_incoming(raw_text: str) -> ParsedMessage: + """Parse an incoming SMS and determine its type.""" + is_group = raw_text.startswith(G_PREFIX) + if not raw_text.startswith(PREFIX) and not is_group: + return ParsedMessage(msg_type="plain", plain_text=raw_text) + + # Strip prefix + prefix_len = len(G_PREFIX) if is_group else len(PREFIX) + body = raw_text[prefix_len:] + + if body.startswith("NORM|"): + return ParsedMessage(msg_type="norm", is_group=is_group) + + elif body.startswith("SYM|"): + return ParsedMessage( + msg_type="sym" if not is_group else "gsym", + is_group=is_group, + encrypted_payload=body[4:], ) - if raw_text.startswith(f"{FRAME_PREFIX}|MSG|"): - parts = raw_text.split("|", 6) - if len(parts) != 7: - return None - _, _, mode, packet_id, part_no, total_parts, chunk = parts - return ParsedFrame( - category="message", - mode=mode, - packet_id=packet_id, - part_no=int(part_no), - total_parts=int(total_parts), - chunk=chunk, - ) - return None + + elif body.startswith("SFRA|"): + parts = body[5:].split("|", 3) + if len(parts) == 4: + try: + return ParsedMessage( + msg_type="sfra", + is_group=is_group, + packet_id=parts[0], + part_no=int(parts[1]), + total_parts=int(parts[2]), + chunk=parts[3], + ) + except ValueError: + pass + + elif body.startswith("NACK|"): + parts = body[5:].split("|", 1) + if len(parts) == 2: + try: + return ParsedMessage( + msg_type="nack", + is_group=is_group, + packet_id=parts[0], + missing_part=int(parts[1]), + ) + except ValueError: + pass + + return ParsedMessage(msg_type="plain", plain_text=raw_text) diff --git a/secure_sms/core/security.py b/secure_sms/core/security.py index e90ac55..7a7fa46 100644 --- a/secure_sms/core/security.py +++ b/secure_sms/core/security.py @@ -1,23 +1,88 @@ import base64 import hashlib import os +import re +import unicodedata from dataclasses import dataclass from typing import Optional -from cryptography.hazmat.primitives import hashes, serialization -from cryptography.hazmat.primitives.asymmetric import x25519 from cryptography.hazmat.primitives.ciphers.aead import AESGCM -from cryptography.hazmat.primitives.kdf.hkdf import HKDF from cryptography.hazmat.primitives.kdf.scrypt import Scrypt +_LEGACY_INVISIBLE_CHARS = dict.fromkeys(map(ord, "\u200c\u200d\u200e\u200f\ufeff"), None) +_ARABIC_VARIANT_TRANSLATION = str.maketrans( + { + "ك": "ک", + "ي": "ی", + "ى": "ی", + "ة": "ه", + "ۀ": "ه", + "٠": "0", + "١": "1", + "٢": "2", + "٣": "3", + "٤": "4", + "٥": "5", + "٦": "6", + "٧": "7", + "٨": "8", + "٩": "9", + "۰": "0", + "۱": "1", + "۲": "2", + "۳": "3", + "۴": "4", + "۵": "5", + "۶": "6", + "۷": "7", + "۸": "8", + "۹": "9", + } +) + + def b64u_encode(data: bytes) -> str: + """URL-safe Base64 encoding without padding, matching Flutter's implementation.""" return base64.urlsafe_b64encode(data).decode("ascii").rstrip("=") +def _decode_transport_payload(value: str) -> bytes: + """Decode either legacy Base64URL payloads or the new hex transport format.""" + if value.startswith("h1:"): + clean_hex = re.sub(r"[^0-9A-Fa-f]", "", value[3:]) + if len(clean_hex) % 2 != 0: + clean_hex = clean_hex[:-1] + return bytes.fromhex(clean_hex) if clean_hex else b"" + return b64u_decode(value) + + def b64u_decode(value: str) -> bytes: - padding = "=" * (-len(value) % 4) - return base64.urlsafe_b64decode((value + padding).encode("ascii")) + """Universal Base64 decode with robust cleanup and padding repair. + Handles both standard and URL-safe Base64, ensuring both English (short) + and Persian (long) messages are decoded correctly. + """ + # 1. Strip all whitespace and modem artifacts + clean = "".join(value.split()) + # 2. Normalize characters (URL-safe to Standard) + clean = clean.replace("-", "+").replace("_", "/") + # 3. Strip existing padding to avoid double-padding issues + clean = clean.split("=")[0] + # 4. Filter only valid Base64 characters (A-Z, a-z, 0-9, +, /) + clean = re.sub(r'[^A-Za-z0-9+/]', '', clean) + + # 5. Add correct padding for decoding + padding_len = (4 - len(clean) % 4) % 4 + padded = clean + ("=" * padding_len) + + # DEBUG: Show what happened to the payload + # print(f"[B64] Original: {value[:16]}... Padded: {padded[:16]}... (len={len(padded)})") + + try: + return base64.b64decode(padded.encode("ascii")) + except Exception as e: + print(f"[B64] ERROR decoding: {e}") + return b"" @dataclass @@ -73,61 +138,79 @@ class StorageCipher: return plaintext.decode("utf-8") -class ECCCryptoService: - INFO = b"sms-secure-channel-v2" +class SymmetricCryptoService: + """ + Symmetric encryption service using AES-GCM-256. + Matches the Flutter implementation while keeping compatibility with + older Python builds that normalized keys before hashing. + """ - def generate_identity(self) -> tuple[str, str, str]: - private_key = x25519.X25519PrivateKey.generate() - public_key = private_key.public_key() - private_raw = private_key.private_bytes( - encoding=serialization.Encoding.Raw, - format=serialization.PrivateFormat.Raw, - encryption_algorithm=serialization.NoEncryption(), - ) - public_raw = public_key.public_bytes( - encoding=serialization.Encoding.Raw, - format=serialization.PublicFormat.Raw, - ) - public_b64 = b64u_encode(public_raw) - return b64u_encode(private_raw), public_b64, self.fingerprint_public_key(public_b64) + def _clean_key_variants(self, password: str) -> list[tuple[str, str]]: + raw = password.strip() + legacy_nfc = unicodedata.normalize("NFC", raw) + visual_safe = raw.translate(_LEGACY_INVISIBLE_CHARS).translate(_ARABIC_VARIANT_TRANSLATION) + legacy_nfc_visual_safe = legacy_nfc.translate(_LEGACY_INVISIBLE_CHARS).translate(_ARABIC_VARIANT_TRANSLATION) - def fingerprint_public_key(self, public_key_b64: str) -> str: - digest = hashlib.sha256(b64u_decode(public_key_b64)).hexdigest() - return digest[:16].upper() + variants: list[tuple[str, str]] = [] + seen: set[str] = set() + for label, value in ( + ("flutter_raw", raw), + ("legacy_python_nfc", legacy_nfc), + ("visual_safe", visual_safe), + ("legacy_python_nfc_visual_safe", legacy_nfc_visual_safe), + ): + if value not in seen: + variants.append((label, value)) + seen.add(value) + return variants - def encrypt_for_peer(self, message: str, peer_public_key_b64: str) -> str: - peer_public = x25519.X25519PublicKey.from_public_bytes(b64u_decode(peer_public_key_b64)) - ephemeral_private = x25519.X25519PrivateKey.generate() - ephemeral_public = ephemeral_private.public_key().public_bytes( - encoding=serialization.Encoding.Raw, - format=serialization.PublicFormat.Raw, - ) - shared_key = ephemeral_private.exchange(peer_public) - derived_key = HKDF( - algorithm=hashes.SHA256(), - length=32, - salt=None, - info=self.INFO, - ).derive(shared_key) + def _derive_symmetric_key_from_text(self, key_text: str, *, debug: bool = False) -> bytes: + key = hashlib.sha256(key_text.encode("utf-8")).digest() + if debug: + print(f"[Crypto] Derived key fingerprint: {key.hex()[:4]}...") + return key + + def _derive_symmetric_key(self, password: str) -> bytes: + """Derive the primary 32-byte key exactly like Flutter: SHA-256(trimmed text).""" + raw_trimmed = password.strip() + return self._derive_symmetric_key_from_text(raw_trimmed, debug=True) + + def encrypt_symmetric(self, message: str, password: str) -> str: + """Encrypt message using AES-GCM with a password-derived key.""" + key = self._derive_symmetric_key(password) + aesgcm = AESGCM(key) nonce = os.urandom(12) - ciphertext = AESGCM(derived_key).encrypt(nonce, message.encode("utf-8"), None) - return b64u_encode(ephemeral_public + nonce + ciphertext) + ciphertext = aesgcm.encrypt(nonce, message.encode("utf-8"), None) + # SMS-safe transport: explicit hex payload, still backward-compatible on decode. + return "h1:" + (nonce + ciphertext).hex() - def decrypt_from_peer(self, payload_b64: str, private_key_b64: str) -> str: - payload = b64u_decode(payload_b64) - if len(payload) < 60: - raise ValueError("Secure payload is too short.") - ephemeral_public_raw = payload[:32] - nonce = payload[32:44] - ciphertext = payload[44:] - private_key = x25519.X25519PrivateKey.from_private_bytes(b64u_decode(private_key_b64)) - ephemeral_public = x25519.X25519PublicKey.from_public_bytes(ephemeral_public_raw) - shared_key = private_key.exchange(ephemeral_public) - derived_key = HKDF( - algorithm=hashes.SHA256(), - length=32, - salt=None, - info=self.INFO, - ).derive(shared_key) - plaintext = AESGCM(derived_key).decrypt(nonce, ciphertext, None) - return plaintext.decode("utf-8") + def decrypt_symmetric(self, payload_b64: str, password: str) -> str: + """Decrypt message using AES-GCM with a password-derived key.""" + payload = _decode_transport_payload(payload_b64) + + if len(payload) < 28: + print(f"[Symmetric] ERROR: Payload too short ({len(payload)})") + raise ValueError("Symmetric payload is too short.") + + nonce = payload[:12] + ciphertext_with_tag = payload[12:] + + last_error: Optional[Exception] = None + tried_labels: list[str] = [] + for label, key_text in self._clean_key_variants(password): + tried_labels.append(label) + key = self._derive_symmetric_key_from_text(key_text, debug=(label == "flutter_raw")) + aesgcm = AESGCM(key) + try: + plaintext = aesgcm.decrypt(nonce, ciphertext_with_tag, None) + if label != "flutter_raw": + print(f"[Symmetric] Compatibility decrypt succeeded using: {label}") + return plaintext.decode("utf-8") + except Exception as exc: + last_error = exc + + print(f"[Symmetric] Decryption FAILED after trying: {', '.join(tried_labels)}") + print("[Symmetric] Check: Key and payload must match bit-perfectly.") + if last_error is not None: + raise last_error + raise ValueError("Symmetric decryption failed.") diff --git a/secure_sms/core/utils.py b/secure_sms/core/utils.py new file mode 100644 index 0000000..0ea5e59 --- /dev/null +++ b/secure_sms/core/utils.py @@ -0,0 +1,23 @@ +def normalize_phone(phone: str) -> str: + """Normalize phone number to a canonical 09XXXXXXXXX format for Iranian numbers. + Handles +98, 0098, 9, and 09 prefixes by focusing on the last 10 digits. + """ + if not phone: + return "" + + # Remove all non-digit characters + digits = "".join(filter(str.isdigit, phone)) + + # If no digits are found, return the original string stripped of whitespace. + # This handles cases like "abc" -> "abc", " " -> "" + if not digits: + return phone.strip() + + # Standard Iranian mobile numbers end with 10 digits starting with '9' + # Example: 9123456789. We want to normalize this to 09123456789. + if len(digits) >= 10: + suffix = digits[-10:] + if suffix.startswith("9"): + return "0" + suffix + + return digits diff --git a/secure_sms/infrastructure/__pycache__/__init__.cpython-313.pyc b/secure_sms/infrastructure/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..f32cca7 Binary files /dev/null and b/secure_sms/infrastructure/__pycache__/__init__.cpython-313.pyc differ diff --git a/secure_sms/infrastructure/__pycache__/database.cpython-313.pyc b/secure_sms/infrastructure/__pycache__/database.cpython-313.pyc index 228c0cf..3dc5d7c 100644 Binary files a/secure_sms/infrastructure/__pycache__/database.cpython-313.pyc and b/secure_sms/infrastructure/__pycache__/database.cpython-313.pyc differ diff --git a/secure_sms/infrastructure/__pycache__/gsm.cpython-313.pyc b/secure_sms/infrastructure/__pycache__/gsm.cpython-313.pyc new file mode 100644 index 0000000..c7792ac Binary files /dev/null and b/secure_sms/infrastructure/__pycache__/gsm.cpython-313.pyc differ diff --git a/secure_sms/infrastructure/database.py b/secure_sms/infrastructure/database.py index bcc0696..5f61d79 100644 --- a/secure_sms/infrastructure/database.py +++ b/secure_sms/infrastructure/database.py @@ -54,6 +54,7 @@ class Database: secure_state TEXT NOT NULL DEFAULT 'none', peer_public_key_enc TEXT, peer_fingerprint TEXT, + symmetric_key_enc TEXT, last_secure_at TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL @@ -103,6 +104,22 @@ class Database: ) conn.commit() + # --- Migrations --- + # Ensure columns exist in older databases + cursor = conn.cursor() + cursor.execute("PRAGMA table_info(contacts)") + cols = [c["name"] for c in cursor.fetchall()] + if "symmetric_key_enc" not in cols: + conn.execute("ALTER TABLE contacts ADD COLUMN symmetric_key_enc TEXT") + if "last_secure_at" not in cols: + conn.execute("ALTER TABLE contacts ADD COLUMN last_secure_at TEXT") + + cursor.execute("PRAGMA table_info(messages)") + cols = [c["name"] for c in cursor.fetchall()] + if "metadata_enc" not in cols: + conn.execute("ALTER TABLE messages ADD COLUMN metadata_enc TEXT") + conn.commit() + def is_bootstrapped(self) -> bool: return self.get_security_metadata() is not None @@ -138,11 +155,13 @@ class Database: def get_connection_settings(self) -> tuple[str, int]: import os - default_port = "/dev/ttyS0" if os.name != "nt" else "COM1" + default_port = "/dev/serial0" if os.name != "nt" else "COM1" port = self.get_config("gsm_port") if not port or port == "COM1": port = default_port - baudrate = int(self.get_config("gsm_baudrate", "115200") or "115200") + # Strictly default to 9600 + raw_baud = self.get_config("gsm_baudrate", "9600") + baudrate = int(raw_baud if raw_baud else "9600") return port, baudrate def set_connection_settings(self, port: str, baudrate: int): @@ -212,6 +231,22 @@ class Database: conn.execute("DELETE FROM secure_events WHERE phone = ?", (phone,)) conn.commit() + def rename_contact_phone(self, old_phone: str, new_phone: str): + with self._connect() as conn: + conn.execute("UPDATE contacts SET phone = ? WHERE phone = ?", (new_phone, old_phone)) + + def migrate_messages_phone(self, old_phone: str, new_phone: str): + with self._connect() as conn: + conn.execute("UPDATE messages SET phone = ? WHERE phone = ?", (new_phone, old_phone)) + + def migrate_fragments_phone(self, old_phone: str, new_phone: str): + with self._connect() as conn: + conn.execute("UPDATE packet_fragments SET phone = ? WHERE phone = ?", (new_phone, old_phone)) + + def migrate_events_phone(self, old_phone: str, new_phone: str): + with self._connect() as conn: + conn.execute("UPDATE secure_events SET phone = ? WHERE phone = ?", (new_phone, old_phone)) + def list_contact_rows(self) -> list[sqlite3.Row]: with self._connect() as conn: cursor = conn.cursor() @@ -241,6 +276,7 @@ class Database: secure_state: Optional[str] = None, peer_public_key_enc: Optional[str] = None, peer_fingerprint: Optional[str] = None, + symmetric_key_enc: Optional[str] = None, last_secure_at: Optional[str] = None, ): updates = [] @@ -257,6 +293,9 @@ class Database: if peer_fingerprint is not None: updates.append("peer_fingerprint = ?") values.append(peer_fingerprint) + if symmetric_key_enc is not None: + updates.append("symmetric_key_enc = ?") + values.append(symmetric_key_enc) if last_secure_at is not None: updates.append("last_secure_at = ?") values.append(last_secure_at) @@ -299,6 +338,20 @@ class Database: ) conn.commit() + def update_message_body(self, message_id: int, body_enc: str, transport_state: str): + with self._connect() as conn: + conn.execute( + "UPDATE messages SET body_enc = ?, transport_state = ? WHERE id = ?", + (body_enc, transport_state, message_id), + ) + conn.commit() + + def get_message_row(self, message_id: int) -> Optional[sqlite3.Row]: + with self._connect() as conn: + cursor = conn.cursor() + cursor.execute("SELECT * FROM messages WHERE id = ?", (message_id,)) + return cursor.fetchone() + def list_message_rows(self, phone: str) -> list[sqlite3.Row]: with self._connect() as conn: cursor = conn.cursor() @@ -400,7 +453,7 @@ class Database: def rotate_encrypted_payloads(self, old_cipher: StorageCipher, new_cipher: StorageCipher): table_map = { - "contacts": ("phone", ["name_enc", "peer_public_key_enc"]), + "contacts": ("phone", ["name_enc", "peer_public_key_enc", "symmetric_key_enc"]), "messages": ("id", ["body_enc", "metadata_enc"]), "secure_events": ("id", ["details_enc"]), "identity": ("id", ["private_key_enc", "public_key_enc"]), diff --git a/secure_sms/infrastructure/gsm.py b/secure_sms/infrastructure/gsm.py index 0d807bf..0b059f1 100644 --- a/secure_sms/infrastructure/gsm.py +++ b/secure_sms/infrastructure/gsm.py @@ -7,19 +7,67 @@ from typing import Callable, Optional import serial +_TERMINAL_STATUS_RE = re.compile(r"(?:^|\r?\n)(OK|ERROR)\r?\n?\s*$", re.DOTALL) +_PROTOCOL_ALLOWED_RE = re.compile(r"[^A-Za-z0-9@:\|\-_=+/]") + + +def _has_non_ascii(text: str) -> bool: + """Check if text contains non-ASCII characters.""" + try: + text.encode("ascii") + return False + except UnicodeEncodeError: + return True + + +def _to_ucs2_hex(text: str) -> str: + """Encode text as UCS2 hex using utf-16-be.""" + return text.encode("utf-16-be").hex().upper() + + +def _decode_ucs2_hex(hex_str: str) -> str: + """Decode UCS2 hex text and ignore junk after the valid prefix.""" + try: + match = re.match(r"^([0-9A-Fa-f]{4})+", hex_str) + if not match: + return hex_str + valid_hex = match.group(0) + return bytes.fromhex(valid_hex).decode("utf-16-be") + except Exception: + return hex_str + + +def _has_terminal_status(text: str) -> bool: + """Only treat OK/ERROR as complete when it is the final modem status line.""" + return bool(_TERMINAL_STATUS_RE.search(text)) + + +def _sanitize_protocol_body(text: str) -> str: + """Remove control bytes and modem junk from protocol SMS bodies.""" + no_controls = re.sub(r"[\x00-\x1F\x7F]", "", text) + compact = "".join(no_controls.split()) + if compact.startswith(("@S:", "@G:")): + return _PROTOCOL_ALLOWED_RE.sub("", compact) + return text + + class IMessageGateway(abc.ABC): @property @abc.abstractmethod - def is_connected(self) -> bool: pass + def is_connected(self) -> bool: + pass @abc.abstractmethod - def connect(self) -> bool: pass + def connect(self) -> bool: + pass @abc.abstractmethod - def disconnect(self) -> None: pass + def disconnect(self) -> None: + pass @abc.abstractmethod - def send_frames(self, phone: str, frames: list[str]) -> bool: pass + def send_frames(self, phone: str, frames: list[str]) -> bool: + pass class GSMGateway(IMessageGateway): @@ -35,7 +83,12 @@ class GSMGateway(IMessageGateway): self.serial_conn = None self.is_running = False self.read_thread = None + self.poll_thread = None self.lock = threading.Lock() + self.buffer_lock = threading.Lock() + self._unread_buffer: bytes = b"" + self._processing_lock = threading.Lock() + self._processing_indexes: set[int] = set() @property def is_connected(self) -> bool: @@ -43,19 +96,38 @@ class GSMGateway(IMessageGateway): def connect(self) -> bool: try: + print(f"[GSM] Connecting to port={self.port} baudrate={self.baudrate}") self.serial_conn = serial.Serial( port=self.port, baudrate=self.baudrate, - timeout=1, + timeout=0.1, + xonxoff=False, + rtscts=False, + dsrdtr=False, ) self.is_running = True - self.send_at_cmd("AT") - self.send_at_cmd("AT+CMGF=1") - self.send_at_cmd('AT+CNMI=2,1,0,0,0') + + self._send_raw("AT\r\n") + time.sleep(0.5) + self._send_raw("ATE0\r\n") + time.sleep(0.5) + self._send_raw("AT+CMGF=1\r\n") + time.sleep(0.5) + self._send_raw('AT+CPMS="ME","ME","ME"\r\n') + time.sleep(0.5) + self._send_raw("AT+CNMI=2,1,0,0,0\r\n") + time.sleep(0.5) + self._send_raw("AT+IFC=0,0\r\n") + time.sleep(0.5) + self.read_thread = threading.Thread(target=self._read_loop, daemon=True) self.read_thread.start() + + self.poll_thread = threading.Thread(target=self._poll_loop, daemon=True) + self.poll_thread.start() return True - except Exception: + except Exception as exc: + print(f"[GSM] Connection failed: {exc}") self.serial_conn = None self.is_running = False return False @@ -67,24 +139,44 @@ class GSMGateway(IMessageGateway): if self.read_thread: self.read_thread.join(timeout=1.5) - def send_at_cmd(self, command: str, expected_response: str = "OK", timeout: float = 2.0) -> tuple[bool, list[str]]: + def _send_raw(self, cmd: str): + if self.serial_conn and self.serial_conn.is_open: + self.serial_conn.write(cmd.encode("utf-8")) + + def _send_at_simple(self, command: str, wait: float = 2.0) -> str: + """Send an AT command and wait for a complete modem response.""" with self.lock: if not self.is_connected: - return False, ["offline"] - self.serial_conn.reset_input_buffer() - self.serial_conn.write((command + "\r\n").encode("ascii")) - start = time.time() - lines = [] - while time.time() - start < timeout: - if self.serial_conn.in_waiting: - line = self.serial_conn.readline().decode("ascii", errors="ignore").strip() - if line: - lines.append(line) - if expected_response in line or "ERROR" in line: - break - else: - time.sleep(0.05) - return expected_response in "\n".join(lines), lines + return "" + + with self.buffer_lock: + self._unread_buffer = b"" + + print(f"[GSM] CMD: {command}") + self.serial_conn.write((command + "\r\n").encode("ascii", errors="ignore")) + + start_time = time.time() + while (time.time() - start_time) < (wait + 5): + time.sleep(0.1) + with self.buffer_lock: + text = self._unread_buffer.decode("ascii", errors="replace") + if _has_terminal_status(text): + break + + with self.buffer_lock: + final_resp = self._unread_buffer.decode("utf-8", errors="replace") + self._unread_buffer = b"" + return final_resp + + def send_at_cmd( + self, + command: str, + expected_response: str = "OK", + timeout: float = 2.0, + ) -> tuple[bool, list[str]]: + resp = self._send_at_simple(command, wait=timeout) + lines = [line.strip() for line in resp.split("\n") if line.strip()] + return _has_terminal_status(resp) and expected_response in resp, lines def send_frames(self, phone: str, frames: list[str]) -> bool: if not self.is_connected: @@ -92,89 +184,160 @@ class GSMGateway(IMessageGateway): for frame in frames: if not self._send_single_sms(phone, frame): return False - time.sleep(0.8) + time.sleep(1.0) return True def _send_single_sms(self, phone: str, body: str) -> bool: with self.lock: try: - print(f"[GSM] Sending SMS to {phone}, payload_len={len(body)}") - self.serial_conn.reset_input_buffer() - self.serial_conn.write(f'AT+CMGS="{phone}"\r\n'.encode("ascii")) - start = time.time() - prompt_ready = False - while time.time() - start < 5.0: - if self.serial_conn.in_waiting: - char = self.serial_conn.read().decode("ascii", errors="ignore") - if char == ">": - prompt_ready = True - print("[GSM] Received '>' prompt.") - break - time.sleep(0.05) - if not prompt_ready: - print("[GSM] Error: Did not receive '>' prompt in time. Canceling.") - self.serial_conn.write(chr(27).encode("ascii")) - return False - self.serial_conn.write(body.encode("ascii") + chr(26).encode("ascii")) - start = time.time() - while time.time() - start < 45.0: - if self.serial_conn.in_waiting: - line = self.serial_conn.readline().decode("ascii", errors="ignore").strip() - if line: - print(f"[GSM] Modem response: {line}") - if "OK" in line: - print("[GSM] SMS sent successfully.") - return True - if "ERROR" in line: - print("[GSM] SMS encountered an ERROR.") - return False - else: - time.sleep(0.1) - print("[GSM] SMS Send Timed Out waiting for OK/ERROR (45s).") - return False - except Exception as e: - print(f"[GSM] Exception during SMS transmission: {e}") - return False + needs_ucs2 = _has_non_ascii(body) + print(f"[GSM] Sending SMS to {phone}, ucs2={needs_ucs2}") + self.serial_conn.write(b'AT+CSCS="UCS2"\r\n' if needs_ucs2 else b'AT+CSCS="GSM"\r\n') + time.sleep(1) + + self.serial_conn.write(b'AT+CSMP=17,167,0,8\r\n' if needs_ucs2 else b'AT+CSMP=17,167,0,0\r\n') + time.sleep(1) + + self.serial_conn.write(b"AT+CMGF=1\r\n") + time.sleep(1) + + target = _to_ucs2_hex(phone) if needs_ucs2 else phone + self.serial_conn.write(f'AT+CMGS="{target}"\r\n'.encode("utf-8")) + time.sleep(1.5) + + payload = _to_ucs2_hex(body) if needs_ucs2 else body + self.serial_conn.write(payload.encode("utf-8")) + time.sleep(0.5) + self.serial_conn.write(bytes([26])) + + time.sleep(8) + with self.buffer_lock: + resp = self._unread_buffer.decode("utf-8", errors="replace") + self._unread_buffer = b"" + + if _has_terminal_status(resp) and "OK" in resp: + print("[GSM] SMS sent successfully.") + return True + + print(f"[GSM] SMS fail or uncertain. Response: {repr(resp)}") + return False + except Exception as exc: + print(f"[GSM] Exception during SMS: {exc}") + return False def _read_loop(self): + """Continuously read serial and react to unsolicited +CMTI notifications.""" while self.is_running: try: - if self.is_connected and self.serial_conn.in_waiting and self.lock.acquire(blocking=False): - try: - line = self.serial_conn.readline().decode("ascii", errors="ignore").strip() - if line.startswith("+CMTI:"): - match = re.search(r'\+CMTI:\s*".*?",(\d+)', line) - if match: - index = int(match.group(1)) - threading.Thread( - target=self._process_incoming_sms, - args=(index,), - daemon=True, - ).start() - finally: - self.lock.release() - except Exception: - pass + if self.is_connected and self.serial_conn.in_waiting > 0: + data = self.serial_conn.read(self.serial_conn.in_waiting) + if data: + with self.buffer_lock: + self._unread_buffer += data + + try: + text = data.decode("ascii", errors="replace") + if "+CMTI:" in text: + match = re.search(r'\+CMTI:\s*".*?",(\d+)', text) + if match: + index = int(match.group(1)) + print(f"[GSM] New SMS notification at index {index}") + threading.Thread( + target=self._process_incoming_sms, + args=(index,), + daemon=True, + ).start() + except Exception: + pass + except Exception as exc: + if self.is_running: + print(f"[GSM] Read error: {exc}") time.sleep(0.1) - def _process_incoming_sms(self, index: int): - time.sleep(0.7) - ok, lines = self.send_at_cmd(f"AT+CMGR={index}", expected_response="OK", timeout=3) - if not ok: - return - sender = "ناشناس" - body_lines = [] - reading_body = False - for line in lines: - if line.startswith("+CMGR:"): - parts = line.split(",") - if len(parts) >= 2: - sender = parts[1].strip('"') - reading_body = True + def _poll_loop(self): + """Fail-safe polling in case +CMTI notifications are missed.""" + while self.is_running: + time.sleep(60) + if not self.is_connected: continue - if reading_body and line not in {"OK", "ERROR"}: - body_lines.append(line) - if self.message_callback: - self.message_callback(sender, "\n".join(body_lines)) - self.send_at_cmd(f"AT+CMGD={index}") + + try: + success, lines = self.send_at_cmd('AT+CMGL="ALL"', timeout=5) + if success: + for line in lines: + if line.startswith("+CMGL:"): + parts = line.split(",") + if len(parts) >= 1: + try: + index_str = parts[0].split(":")[1].strip() + index = int(index_str) + print(f"[GSM] Found message during polling at index {index}") + self._process_incoming_sms(index) + except Exception: + pass + except Exception as exc: + print(f"[GSM] Polling error: {exc}") + + def _process_incoming_sms(self, index: int): + """Read, decode, sanitize, and dispatch a message from one modem slot.""" + with self._processing_lock: + if index in self._processing_indexes: + return + self._processing_indexes.add(index) + + try: + time.sleep(0.25) + resp = self._send_at_simple(f"AT+CMGR={index}", wait=4) + if not _has_terminal_status(resp): + return + + sender = "unknown" + body_lines = [] + reading_body = False + + for line in resp.split("\n"): + line = line.strip() + if line.startswith("+CMGR:"): + parts = line.split(",") + if len(parts) >= 2: + sender = parts[1].strip('"') + if sender.startswith("00") and len(sender) >= 8: + sender = _decode_ucs2_hex(sender) + reading_body = True + continue + + if reading_body: + if line == "OK": + break + if line: + body_lines.append(line) + + full_body = "\n".join(body_lines) + clean_body = "".join(full_body.split()) + + if clean_body.startswith(("@S:", "@G:")): + print(f"[GSM-Debug] Raw Payload Hex: {full_body.encode('utf-8', errors='replace').hex()[:100]}...") + + sanitized_body = _sanitize_protocol_body(full_body) + if sanitized_body != full_body: + print("[GSM] Sanitized protocol body to remove modem control artifacts.") + full_body = sanitized_body + clean_body = "".join(full_body.split()) + + if not clean_body.startswith(("@S:", "@G:")): + if len(clean_body) >= 4 and re.fullmatch(r"[0-9A-Fa-f]+", clean_body): + trimmed = clean_body[: (len(clean_body) // 4) * 4] + if trimmed: + decoded = _decode_ucs2_hex(trimmed) + if decoded and decoded != trimmed: + full_body = decoded + + print(f"[GSM] Message from {sender}: {full_body[:50]}...") + if self.message_callback: + self.message_callback(sender, full_body) + + self._send_at_simple(f"AT+CMGD={index}", wait=1) + finally: + with self._processing_lock: + self._processing_indexes.discard(index) diff --git a/secure_sms/ui/__pycache__/main_window.cpython-312.pyc b/secure_sms/ui/__pycache__/main_window.cpython-312.pyc new file mode 100644 index 0000000..b9e77fd Binary files /dev/null and b/secure_sms/ui/__pycache__/main_window.cpython-312.pyc differ diff --git a/secure_sms/ui/__pycache__/main_window.cpython-313.pyc b/secure_sms/ui/__pycache__/main_window.cpython-313.pyc index 737a6c8..23b2c6e 100644 Binary files a/secure_sms/ui/__pycache__/main_window.cpython-313.pyc and b/secure_sms/ui/__pycache__/main_window.cpython-313.pyc differ diff --git a/secure_sms/ui/main_window.py b/secure_sms/ui/main_window.py index 4dd30dd..5a56b71 100644 --- a/secure_sms/ui/main_window.py +++ b/secure_sms/ui/main_window.py @@ -1,8 +1,10 @@ -import os +import os import re from tkinter import TclError +from typing import Optional import customtkinter as ctk +from secure_sms.core.utils import normalize_phone try: import arabic_reshaper @@ -151,6 +153,8 @@ class RTLEntry(_RTLTextMixin, ctk.CTkEntry): class RTLTextbox(ctk.CTkTextbox): def __init__(self, *args, **kwargs): + self.max_length = kwargs.pop("max_length", None) + self.on_change = kwargs.pop("on_change", None) super().__init__(*args, **kwargs) bg_color = kwargs.get("fg_color", INPUT_BG) try: @@ -170,6 +174,20 @@ class RTLTextbox(ctk.CTkTextbox): def _sync_display(self, text=None): raw = text if text is not None else self.get("1.0", "end-1c") + + # Enforce max_length if defined + if self.max_length is not None and len(raw) > self.max_length: + truncated = raw[:self.max_length] + # Temporarily unbind or use a flag to avoid recursion if needed, + # but delete/insert will trigger sync_display again. + # A simple way is to check if it's already truncated. + self.delete("1.0", "end") + super().insert("1.0", truncated) # Use super().insert to avoid immediate recursion + raw = truncated + + if self.on_change: + self.on_change(raw) + if raw: self._display_label.configure(text=ui_text(raw) + " |") else: @@ -229,7 +247,7 @@ class SecureSmsApp(ctk.CTk): self._touch_start_y = None self._active_scroll_frame = None - self._show_lock_screen() + self._show_mobile_launcher() self.after(50, self._enable_touch_kiosk_mode) def _enable_touch_kiosk_mode(self): @@ -675,6 +693,51 @@ class SecureSmsApp(ctk.CTk): for widget in self.root_frame.winfo_children(): widget.destroy() + def _show_mobile_launcher(self): + """Simulates a mobile home screen with a Saba icon.""" + self._clear_root() + + # We can use a slightly different background for the 'wallpaper' feel + # Using a dark gradient-like feel or just a premium dark surface + wallpaper_color = "#1A1C1E" + + wallpaper = ctk.CTkFrame(self.root_frame, fg_color=wallpaper_color, corner_radius=0) + wallpaper.place(relx=0, rely=0, relwidth=1, relheight=1) + + # Icon container + container = ctk.CTkFrame(wallpaper, fg_color="transparent") + container.place(relx=0.5, rely=0.4, anchor="center") + + # Large premium icon + # We use a rounded button to represent the app icon + icon_size = 92 + RTLButton( + container, + text='📨', + width=icon_size, + height=icon_size, + corner_radius=22, + fg_color=PRIMARY, + hover_color=PRIMARY_DARK, + font=ctk.CTkFont(family=FONT_TITLE, size=46), + command=self._show_lock_screen + ).pack(pady=(0, 12)) + + RTLLabel( + container, + text='صبا', + text_color="white", + font=ctk.CTkFont(family=FONT_TITLE, size=20, weight="bold") + ).pack() + + # Add a subtle 'mobile' hint at the bottom + RTLLabel( + wallpaper, + text='برای ورود روی آیکون بزنید', + text_color="#636A73", + font=ctk.CTkFont(family=FONT_BODY, size=13) + ).place(relx=0.5, rely=0.9, anchor="center") + def _show_lock_screen(self): self._clear_root() frame = ctk.CTkFrame(self.root_frame, fg_color=CARD, corner_radius=16, border_width=1, border_color=BORDER) @@ -1027,19 +1090,6 @@ class SecureSmsApp(ctk.CTk): self.header_card.grid(row=0, column=0, sticky="ew") self.header_card.grid_columnconfigure(0, weight=1) self.header_card.grid_columnconfigure(1, weight=0) - self.header_card.grid_columnconfigure(2, weight=0) - - self.mode_badge = RTLLabel( - self.header_card, - text='عادی', - fg_color=PRIMARY_SOFT, - text_color=PRIMARY, - corner_radius=6, - padx=14, - pady=6, - font=ctk.CTkFont(family=FONT_BODY, size=12, weight="bold"), - ) - self.mode_badge.grid(row=0, column=0, rowspan=2, padx=14, sticky="w") self.chat_title = RTLLabel( self.header_card, @@ -1047,21 +1097,12 @@ class SecureSmsApp(ctk.CTk): text_color=TEXT, font=ctk.CTkFont(family=FONT_TITLE, size=16 if self.is_portrait else 18, weight="bold"), ) - self.chat_title.grid(row=0, column=1, padx=8, pady=(10, 2), sticky="e") - self.chat_subtitle = RTLLabel( - self.header_card, - text='در اینجا فقط دو حالت داری: عادی یا امن', - text_color=MUTED, - font=ctk.CTkFont(family=FONT_BODY, size=12), - ) - self.chat_subtitle.grid(row=1, column=1, padx=8, pady=(0, 10), sticky="e") - def open_contact_page(e=None): if self.current_contact_phone: self._show_contact_details_page() + self.chat_title.grid(row=0, column=0, padx=16, pady=20, sticky="e") self.chat_title.bind("", open_contact_page, add="+") - self.chat_subtitle.bind("", open_contact_page, add="+") RTLButton( self.header_card, @@ -1074,7 +1115,7 @@ class SecureSmsApp(ctk.CTk): text_color=TEXT, font=ctk.CTkFont(family=FONT_BODY, size=14, weight="bold"), command=self._show_home_screen - ).grid(row=0, column=2, rowspan=2, padx=(4, 14), sticky="e") + ).grid(row=0, column=1, padx=(4, 14), sticky="e") content = ctk.CTkFrame(self.main_panel, fg_color=BACKGROUND, corner_radius=0) content.grid(row=1, column=0, padx=outer_pad, pady=(0, inner_pad), sticky="nsew") @@ -1119,18 +1160,10 @@ class SecureSmsApp(ctk.CTk): font=ctk.CTkFont(family=FONT_BODY, size=16), ) self.profile_phone.grid(row=2, column=0, padx=18, pady=(0, 18), sticky="e") - self.profile_hint = RTLLabel( + + self.profile_key_btn = RTLButton( self.profile_card, - text='برای این مخاطب هنوز حالت امن فعال نشده است.', - wraplength=220, - justify="right", - text_color=MUTED, - font=ctk.CTkFont(family=FONT_BODY, size=15), - ) - self.profile_hint.grid(row=3, column=0, padx=18, pady=(0, 18), sticky="e") - self.secure_button = RTLButton( - self.profile_card, - text='فعال\u200cسازی ارتباط امن', + text='تنظیم کلید متقارن', corner_radius=21, fg_color=CARD, hover_color=PRIMARY_SOFT, @@ -1138,25 +1171,10 @@ class SecureSmsApp(ctk.CTk): border_color=PRIMARY, border_width=2, font=ctk.CTkFont(family=FONT_BODY, size=15, weight="bold"), - command=self._toggle_secure_mode, + command=self._handle_profile_key_setup, height=42, ) - self.secure_button.grid(row=4, column=0, padx=16, pady=(0, 8), sticky="ew") - self.normal_button = RTLButton( - self.profile_card, - text='بازگشت به حالت عادی', - corner_radius=20, - fg_color=CARD, - hover_color="#F4F4F4", - text_color=TEXT, - border_color=BORDER, - border_width=2, - font=ctk.CTkFont(family=FONT_BODY, size=15, weight="bold"), - command=self._switch_to_normal, - height=40, - ) - self.normal_button.grid(row=5, column=0, padx=16, pady=(0, 14), sticky="ew") - self._configure_profile_card_layout() + self.profile_key_btn.grid(row=3, column=0, padx=16, pady=(0, 14), sticky="ew") composer = ctk.CTkFrame(self.main_panel, fg_color=CARD, corner_radius=0, border_width=0) composer.grid(row=2, column=0, sticky="ew") @@ -1170,10 +1188,20 @@ class SecureSmsApp(ctk.CTk): corner_radius=10, font=ctk.CTkFont(family=FONT_BODY, size=15), wrap="word", + max_length=1000, + on_change=self._update_message_counter ) self.message_entry.grid(row=0, column=0, padx=(10, 6), pady=8, sticky="ew") actions = ctk.CTkFrame(composer, fg_color="transparent") actions.grid(row=0, column=1, padx=(0, 10), pady=8, sticky="ns") + self.char_counter_label = RTLLabel( + actions, + text="0/1000", + text_color=MUTED, + font=ctk.CTkFont(family=FONT_BODY, size=10), + ) + self.char_counter_label.pack(pady=(0, 2)) + self.send_state_label = RTLLabel( actions, text="", @@ -1248,6 +1276,15 @@ class SecureSmsApp(ctk.CTk): color = "#2E7D62" if modem['connected'] else "#B6465F" self.drawer_modem_label.configure(text=text, text_color=color) + def _update_message_counter(self, text): + count = len(text) + limit = 1000 + if hasattr(self, 'char_counter_label'): + self.char_counter_label.configure( + text=f"{count}/{limit}", + text_color=DANGER if count >= limit else MUTED + ) + def _refresh_contacts(self): for widget in self.contacts_frame.winfo_children(): widget.destroy() @@ -1310,35 +1347,41 @@ class SecureSmsApp(ctk.CTk): def _refresh_current_chat(self): if not self.current_contact_phone: self.chat_title.configure(text='یک مخاطب را انتخاب کن') - self.chat_subtitle.configure(text='در اینجا فقط دو حالت داری: عادی یا امن') - self.mode_badge.configure(text='عادی', fg_color=PRIMARY_SOFT, text_color=PRIMARY) self.profile_name.configure(text='نام مخاطب') self.profile_phone.configure(text='شماره') - self.profile_hint.configure(text='برای شروع، یک مخاطب از ستون سمت راست انتخاب کن.') self._render_messages([]) return contact = self.controller.get_contact(self.current_contact_phone) messages = self.controller.get_messages(self.current_contact_phone) self.chat_title.configure(text=contact.name) - self.chat_subtitle.configure(text=contact.phone) self.profile_name.configure(text=contact.name) self.profile_phone.configure(text=contact.phone) - if contact.secure_state == "pending": - self.mode_badge.configure(text='در انتظار', fg_color="#FCEBD7", text_color="#9A6C3C") - self.profile_hint.configure(text='درخواست ارتباط امن ارسال شده و برنامه منتظر پاسخ طرف مقابل است.') - elif contact.mode == "secure": - self.mode_badge.configure(text='امن', fg_color="#D9F5E8", text_color="#0F8A5F") - self.profile_hint.configure(text='ارتباط امن فعال است. هر زمان بخواهی می\u200cتوانی به حالت عادی برگردی.') - else: - self.mode_badge.configure(text='عادی', fg_color=PRIMARY_SOFT, text_color=PRIMARY) - if contact.has_peer_key: - self.profile_hint.configure(text='کلید این مخاطب آماده است. اگر بخواهی می\u200cتوانی دوباره ارتباط امن را فعال کنی.') - else: - self.profile_hint.configure(text='برای امن شدن گفتگو، فقط روی دکمه فعال\u200cسازی ارتباط امن بزن.') - self.secure_button.configure(state="normal") - self.normal_button.configure(state="normal" if contact.mode == "secure" or contact.secure_state == "pending" else "disabled") + + # Update key button text + has_key = contact.symmetric_key is not None and contact.symmetric_key != "" + btn_text = 'تغییر کلید متقارن' if has_key else 'تنظیم کلید متقارن' + self.profile_key_btn.configure(text=btn_text) + self._render_messages(messages) + def handle_background_refresh(self, phone: Optional[str] = None): + """Called by controller when background events (like incoming SMS) occur.""" + self._refresh_contacts() + if phone and self.current_contact_phone == phone: + self._refresh_current_chat() + + if phone: + contact = self.controller.get_contact(phone) + name = contact.name if contact else phone + self._show_toast(f"پیام جدید از {name}") + + def _show_toast(self, message: str): + """Show a temporary notification at the top of the screen.""" + toast = ctk.CTkFrame(self, fg_color=PRIMARY, corner_radius=20) + toast.place(relx=0.5, rely=0.08, anchor="center") + RTLLabel(toast, text=message, text_color="white", font=ctk.CTkFont(family=FONT_BODY, size=15, weight="bold")).pack(padx=20, pady=10) + self.after(3000, toast.destroy) + def _render_messages(self, messages): for widget in self.chat_container.winfo_children(): widget.destroy() @@ -1378,7 +1421,10 @@ class SecureSmsApp(ctk.CTk): bubble.pack(anchor=anchor, padx=8, pady=3, fill="none") state_val = getattr(message, 'transport_state', 'unknown').lower() - if state_val in ["sent"]: state_text = "وضعیت: ارسال شده ✓" + is_failed = state_val == "decrypt_failed" + if is_failed: + state_text = "🔒 وضعیت: رمزنگاری شده (نیاز به کلید)" + elif state_val in ["sent"]: state_text = "وضعیت: ارسال شده ✓" elif state_val in ["delivered", "read"]: state_text = "وضعیت: تحویل داده شده ✓✓" elif state_val in ["failed", "error"]: state_text = "وضعیت: ارسال ناموفق ✗" elif state_val in ["queued", "pending"]: state_text = "وضعیت: در صف ارسال ⏳" @@ -1388,7 +1434,7 @@ class SecureSmsApp(ctk.CTk): status_label = RTLLabel( bubble, text=state_text, - text_color="#6B8E85" if is_out else MUTED, + text_color=DANGER if is_failed else ("#6B8E85" if is_out else MUTED), font=ctk.CTkFont(family=FONT_BODY, size=11, weight="bold"), justify="right" ) @@ -1403,6 +1449,20 @@ class SecureSmsApp(ctk.CTk): ) text_label.pack(padx=12, pady=(8, 2), anchor="e") + if is_failed: + lock_btn = RTLButton( + bubble, + text="🔓 بازگشایی دستی پیام", + width=120, + height=28, + corner_radius=8, + fg_color=DANGER, + hover_color="#C0392B", + font=ctk.CTkFont(family=FONT_BODY, size=11, weight="bold"), + command=lambda m=message: self._show_manual_decrypt_overlay(m) + ) + lock_btn.pack(padx=12, pady=(2, 6), anchor="e") + badge_text = f"🛡️ {message.created_at}" if message.mode == "secure" else message.created_at badge_label = RTLLabel( bubble, @@ -1436,49 +1496,194 @@ class SecureSmsApp(ctk.CTk): if not self.current_contact_phone: self.send_state_label.configure(text='اول یک مخاطب را انتخاب کن.', text_color=DANGER) return + text = self.message_entry.get("1.0", "end-1c").strip() if not text: self.send_state_label.configure(text='متن پیام خالی است.', text_color=DANGER) return - ok, state = self.controller.send_message(self.current_contact_phone, text) + + # Show Send Mode Choice Overlay + self._show_overlay() + self._build_overlay_header("ارسال پیام", "نحوه ارسال را انتخاب کنید") + + container = ctk.CTkFrame(self.overlay_frame, fg_color="transparent") + container.pack(expand=True, fill="both", padx=20, pady=20) + + def on_send_normal(): + self._hide_overlay() + self._execute_send(self.current_contact_phone, text) + + def on_send_secure(): + self._hide_overlay() + self._prompt_symmetric_key_and_send(self.current_contact_phone, text) + + RTLButton( + container, + text='ارسال به صورت عادی', + corner_radius=27, + fg_color=BACKGROUND, + hover_color=PRIMARY_SOFT, + text_color=PRIMARY_DARK, + border_color=PRIMARY, + border_width=2, + font=ctk.CTkFont(family=FONT_BODY, size=16, weight="bold"), + height=54, + command=on_send_normal, + ).pack(pady=10, fill="x") + + RTLButton( + container, + text='ارسال به صورت امن (رمزنگاری متقارن)', + corner_radius=27, + fg_color=BACKGROUND, + hover_color=PRIMARY_SOFT, + text_color=PRIMARY_DARK, + border_color=PRIMARY, + border_width=2, + font=ctk.CTkFont(family=FONT_BODY, size=16, weight="bold"), + height=54, + command=on_send_secure, + ).pack(pady=10, fill="x") + + RTLButton( + container, + text='لغو', + corner_radius=27, + fg_color=BACKGROUND, + hover_color="#F4F4F4", + text_color=TEXT, + border_color=BORDER, + border_width=2, + font=ctk.CTkFont(family=FONT_BODY, size=16, weight="bold"), + height=54, + command=self._hide_overlay, + ).pack(pady=10, fill="x") + + def _handle_profile_key_setup(self): + if not self.current_contact_phone: return + contact = self.controller.get_contact(self.current_contact_phone) + self._show_symmetric_key_dialog(contact.phone, contact.symmetric_key or "") + + def _prompt_symmetric_key_and_send(self, phone: str, text: str): + contact = self.controller.get_contact(phone) + saved_key = contact.symmetric_key or "" + + self._show_overlay() + self._build_overlay_header("رمزنگاری متقارن", "کلید متقارن shared key را وارد کنید") + + container = ctk.CTkFrame(self.overlay_frame, fg_color="transparent") + container.pack(expand=True, fill="both", padx=20, pady=20) + + entry = RTLEntry(container, placeholder_text='کلید متقارن', height=48) + entry.pack(pady=10, fill="x") + entry.insert(0, saved_key) + self._register_text_input(entry, title="کلید متقارن", layout="fa") + + def submit(): + key = entry.get().strip() + if not key: + return + self._hide_overlay() + self._execute_send(phone, text, symmetric_key=key) + + RTLButton( + container, + text='تایید و ارسال', + corner_radius=27, + fg_color=PRIMARY, + text_color="white", + font=ctk.CTkFont(family=FONT_BODY, size=16, weight="bold"), + height=54, + command=submit, + ).pack(pady=10, fill="x") + + RTLButton( + container, + text='لغو', + corner_radius=27, + fg_color=BACKGROUND, + hover_color="#F4F4F4", + text_color=TEXT, + border_color=BORDER, + border_width=2, + font=ctk.CTkFont(family=FONT_BODY, size=16, weight="bold"), + height=54, + command=self._hide_overlay, + ).pack(pady=10, fill="x") + + def _execute_send(self, phone, text, symmetric_key=None): + ok, state = self.controller.send_message(phone, text, symmetric_key=symmetric_key) if ok: self.message_entry.delete("1.0", "end") - label = 'ارسال شد.' if state == "sent" else 'در حالت آفلاین، پیام به صورت شبیه\u200cسازی ثبت شد.' + label = 'در صف ارسال...' if state == "queued" else 'ارسال شد.' self.send_state_label.configure(text=label, text_color=PRIMARY) + self._refresh_contacts() + self._refresh_current_chat() else: self.send_state_label.configure(text=state, text_color=DANGER) self.refresh_all() - def _toggle_secure_mode(self): - if not self.current_contact_phone: - return - ok, state = self.controller.request_secure(self.current_contact_phone) - if ok: - self.send_state_label.configure( - text='درخواست ارتباط امن ارسال شد.' if state == "sent" else 'در حالت آفلاین، درخواست امن به صورت محلی ثبت شد.', - text_color=PRIMARY, - ) - else: - self.send_state_label.configure(text='ارسال درخواست امن ناموفق بود.', text_color=DANGER) - self.refresh_all() + def _show_manual_decrypt_overlay(self, message): + """Show an integrated overlay to manually enter a symmetric key for a specific message.""" + self._show_overlay() + header = self._build_overlay_header( + 'بازگشایی دستی پیام', + 'کلید متقارن برای بازگشایی این پیام را وارد کنید. در صورت صحت کلید، پیام رمزگشایی و ذخیره می\u200cشود.', + ) + header.pack(fill="x", padx=16, pady=(16, 10)) - def _switch_to_normal(self): - if not self.current_contact_phone: - return - ok, state = self.controller.switch_to_normal(self.current_contact_phone) - if ok: - self.send_state_label.configure( - text='گفتگو به حالت عادی برگشت.' if state == "sent" else 'در حالت آفلاین، بازگشت به عادی محلی ثبت شد.', - text_color=PRIMARY, - ) - else: - self.send_state_label.configure(text='بازگشت به حالت عادی انجام نشد.', text_color=DANGER) - self.refresh_all() + body = ctk.CTkFrame(self.overlay_frame, fg_color=SURFACE, corner_radius=22) + body.pack(fill="both", expand=True, padx=16, pady=(0, 16)) + + RTLLabel( + body, + text='کلید متقارن را وارد کن', + text_color=TEXT, + font=ctk.CTkFont(family=FONT_BODY, size=18, weight="bold"), + ).pack(anchor="e", padx=18, pady=(22, 10)) + + key_entry = RTLEntry( + body, + placeholder_text='مثلاً: my_secret_key_123', + height=46, + font=ctk.CTkFont(family=FONT_BODY, size=15), + ) + key_entry.pack(fill="x", padx=18, pady=6) + self._register_text_input(key_entry, title="کلید متقارن", layout="en") + + def handle_submit(): + key = key_entry.get().strip() + if not key: return + success = self.controller.decrypt_message_manually(message.id, key) + if success: + self._hide_overlay() + self._show_toast("پیام با موفقیت بازگشایی شد.") + self.refresh_all() + else: + self._show_toast("خطا: کلید نامعتبر است.") - def _open_contact_dialog(self): + RTLButton( + body, + text='تایید و بازگشایی', + height=48, + corner_radius=24, + fg_color=PRIMARY, + text_color="white", + font=ctk.CTkFont(family=FONT_BODY, size=16, weight="bold"), + command=handle_submit + ).pack(fill="x", padx=18, pady=20) + + + + def _open_contact_dialog(self, phone: str = "", name: str = ""): self.contact_form_message.configure(text="") self.contact_name_entry.delete(0, "end") self.contact_phone_entry.delete(0, "end") + if name and name != "مخاطب ناشناس": + self.contact_name_entry.insert(0, name) + if phone: + self.contact_phone_entry.insert(0, phone) + self.sidebar.grid_remove() self.add_contact_panel.grid(row=0, column=0, sticky="nsew") self.add_contact_panel.lift() @@ -1496,11 +1701,15 @@ class SecureSmsApp(ctk.CTk): self.contact_form_message.configure(text='نام و شماره هر دو لازم هستند.') return + # Translate Persian digits to English persian_to_english = str.maketrans('۰۱۲۳۴۵۶۷۸۹', '0123456789') phone = phone.translate(persian_to_english) - if len(phone) != 11 or not phone.isdigit() or not phone.startswith("09"): - self.contact_form_message.configure(text='شماره باید ۱۱ عدد باشد و با 09 شروع شود.') + # Normalize to canonical format + phone = normalize_phone(phone) + + if len(phone) != 11: + self.contact_form_message.configure(text='شماره نامعتبر است. باید ۱۱ رقم باشد (مثل 0912).') return self.controller.save_contact(name, phone) @@ -1609,7 +1818,42 @@ class SecureSmsApp(ctk.CTk): RTLLabel(avatar, text=initial, text_color="white", font=ctk.CTkFont(family=FONT_TITLE, size=36, weight="bold")).place(relx=0.5, rely=0.5, anchor="center") RTLLabel(body, text=contact.name, text_color=TEXT, font=ctk.CTkFont(family=FONT_TITLE, size=24, weight="bold")).pack(pady=(12, 2)) - RTLLabel(body, text=contact.phone, text_color=MUTED, font=ctk.CTkFont(family=FONT_BODY, size=16)).pack(pady=(0, 30)) + RTLLabel(body, text=contact.phone, text_color=MUTED, font=ctk.CTkFont(family=FONT_BODY, size=16)).pack(pady=(0, 6)) + + # Mode indicator + if contact.symmetric_key: + mode_text = "ارتباط امن (کلید متقارن)" + mode_color = "#2E7D32" # Dark Green + elif contact.mode == "secure": + mode_text = "ارتباط امن (ECC)" + mode_color = PRIMARY + else: + mode_text = "حالت عادی" + mode_color = MUTED + + RTLLabel(body, text=mode_text, text_color=mode_color, + font=ctk.CTkFont(family=FONT_BODY, size=14, weight="bold")).pack(pady=(0, 20)) + + # Add to Contacts button for unknown senders + if contact.name == "مخاطب ناشناس": + RTLButton( + body, text='افزودن به لیست مخاطبین', + corner_radius=24, + fg_color=PRIMARY, hover_color=PRIMARY_DARK, text_color="white", + font=ctk.CTkFont(family=FONT_BODY, size=16, weight="bold"), height=52, + command=lambda: (self._hide_contact_details_page(), self._open_contact_dialog(phone=contact.phone)) + ).pack(fill="x", padx=18, pady=(0, 15)) + + + # Symmetric Key button + RTLButton( + body, text='تنظیم کلید متقارن (Shared Key)', + corner_radius=24, + fg_color=BACKGROUND, hover_color=ACCENT, text_color=ACCENT_DARK, border_color=ACCENT, border_width=2, + font=ctk.CTkFont(family=FONT_BODY, size=16, weight="bold"), height=48, + command=lambda: self._show_symmetric_key_dialog(contact.phone, contact.symmetric_key or "") + ).pack(fill="x", padx=18, pady=(0, 8)) + RTLButton( body, text='پاک کردن پروفایل', @@ -1617,12 +1861,41 @@ class SecureSmsApp(ctk.CTk): fg_color=BACKGROUND, hover_color="#FDE8E8", text_color=DANGER, border_color=DANGER, border_width=2, font=ctk.CTkFont(family=FONT_BODY, size=16, weight="bold"), height=48, command=lambda: self._show_delete_contact_dialog(contact.phone, contact.name) - ).pack(fill="x", padx=18, pady=(24, 8)) + ).pack(fill="x", padx=18, pady=(0, 8)) def _hide_contact_details_page(self): self.contact_details_panel.grid_remove() self._show_chat_screen() + def _show_symmetric_key_dialog(self, phone, current_key): + self._show_overlay() + header = self._build_overlay_header('تنظیم کلید متقارن', 'کلیدی که در هر دو دستگاه وارد می‌شود را اینجا وارد کنید.') + header.pack(fill="x", padx=16, pady=(16, 10)) + + body = ctk.CTkFrame(self.overlay_frame, fg_color=SURFACE, corner_radius=22) + body.pack(fill="both", expand=True, padx=16, pady=(0, 16)) + + entry = RTLEntry(body, placeholder_text='مثلاً: saba123', height=48, font=ctk.CTkFont(family=FONT_BODY, size=16)) + entry.pack(fill="x", padx=36, pady=(30, 10)) + entry.insert(0, current_key) + + def save(): + key = entry.get().strip() + self.controller.set_symmetric_key(phone, key) + self._hide_overlay() + self._show_contact_details_page() + + RTLButton( + body, text='ذخیره', + corner_radius=24, + fg_color=PRIMARY, hover_color=PRIMARY_DARK, text_color="white", + font=ctk.CTkFont(family=FONT_BODY, size=16, weight="bold"), height=48, + command=save + ).pack(fill="x", padx=36, pady=(10, 20)) + + self._register_text_input(entry, title="کلید متقارن", layout="en", submit=save) + self.after(80, lambda: self._focus_registered_input(entry)) + def _open_settings_panel(self): self._show_overlay() header = self._build_overlay_header( diff --git a/tests/__pycache__/test_gsm_gateway.cpython-313.pyc b/tests/__pycache__/test_gsm_gateway.cpython-313.pyc new file mode 100644 index 0000000..5a0a3a9 Binary files /dev/null and b/tests/__pycache__/test_gsm_gateway.cpython-313.pyc differ diff --git a/tests/__pycache__/test_outgoing_secure_flow.cpython-313.pyc b/tests/__pycache__/test_outgoing_secure_flow.cpython-313.pyc new file mode 100644 index 0000000..ec0bb13 Binary files /dev/null and b/tests/__pycache__/test_outgoing_secure_flow.cpython-313.pyc differ diff --git a/tests/__pycache__/test_symmetric_crypto.cpython-313.pyc b/tests/__pycache__/test_symmetric_crypto.cpython-313.pyc new file mode 100644 index 0000000..8ca8c08 Binary files /dev/null and b/tests/__pycache__/test_symmetric_crypto.cpython-313.pyc differ diff --git a/tests/__tmp_outgoing_secure_flow.db b/tests/__tmp_outgoing_secure_flow.db new file mode 100644 index 0000000..a3429e7 Binary files /dev/null and b/tests/__tmp_outgoing_secure_flow.db differ diff --git a/tests/__tmp_outgoing_secure_flow_1c591e35ff344426aae96c4b1f2c4610.db b/tests/__tmp_outgoing_secure_flow_1c591e35ff344426aae96c4b1f2c4610.db new file mode 100644 index 0000000..0e49d83 Binary files /dev/null and b/tests/__tmp_outgoing_secure_flow_1c591e35ff344426aae96c4b1f2c4610.db differ diff --git a/tests/__tmp_outgoing_secure_flow_4cc013125e1c48ffa1d99090a7ad7356.db b/tests/__tmp_outgoing_secure_flow_4cc013125e1c48ffa1d99090a7ad7356.db new file mode 100644 index 0000000..db9195f Binary files /dev/null and b/tests/__tmp_outgoing_secure_flow_4cc013125e1c48ffa1d99090a7ad7356.db differ diff --git a/tests/__tmp_outgoing_secure_flow_51959d14b0af4620b8300d93b70c84c4.db b/tests/__tmp_outgoing_secure_flow_51959d14b0af4620b8300d93b70c84c4.db new file mode 100644 index 0000000..94cdee6 Binary files /dev/null and b/tests/__tmp_outgoing_secure_flow_51959d14b0af4620b8300d93b70c84c4.db differ diff --git a/tests/__tmp_outgoing_secure_flow_56dd9996c9c846caaf383cd8ee095bc7.db b/tests/__tmp_outgoing_secure_flow_56dd9996c9c846caaf383cd8ee095bc7.db new file mode 100644 index 0000000..68c3dc2 Binary files /dev/null and b/tests/__tmp_outgoing_secure_flow_56dd9996c9c846caaf383cd8ee095bc7.db differ diff --git a/tests/__tmp_outgoing_secure_flow_5c43d3fe0ae84c5b811f65e7e2536b3c.db b/tests/__tmp_outgoing_secure_flow_5c43d3fe0ae84c5b811f65e7e2536b3c.db new file mode 100644 index 0000000..c23b76e Binary files /dev/null and b/tests/__tmp_outgoing_secure_flow_5c43d3fe0ae84c5b811f65e7e2536b3c.db differ diff --git a/tests/__tmp_outgoing_secure_flow_6b2ac2e3d0a64e808e9fedbbe2b87934.db b/tests/__tmp_outgoing_secure_flow_6b2ac2e3d0a64e808e9fedbbe2b87934.db new file mode 100644 index 0000000..ccb54d6 Binary files /dev/null and b/tests/__tmp_outgoing_secure_flow_6b2ac2e3d0a64e808e9fedbbe2b87934.db differ diff --git a/tests/__tmp_outgoing_secure_flow_9a295a01ca6e4a8eaf46939f211106b1.db b/tests/__tmp_outgoing_secure_flow_9a295a01ca6e4a8eaf46939f211106b1.db new file mode 100644 index 0000000..dbfd060 Binary files /dev/null and b/tests/__tmp_outgoing_secure_flow_9a295a01ca6e4a8eaf46939f211106b1.db differ diff --git a/tests/__tmp_outgoing_secure_flow_af5096e592964d9ba68836b78e51238b.db b/tests/__tmp_outgoing_secure_flow_af5096e592964d9ba68836b78e51238b.db new file mode 100644 index 0000000..f607777 Binary files /dev/null and b/tests/__tmp_outgoing_secure_flow_af5096e592964d9ba68836b78e51238b.db differ diff --git a/tests/test_gsm_gateway.py b/tests/test_gsm_gateway.py new file mode 100644 index 0000000..a077f2f --- /dev/null +++ b/tests/test_gsm_gateway.py @@ -0,0 +1,21 @@ +import unittest + +from secure_sms.infrastructure.gsm import _has_terminal_status, _sanitize_protocol_body + + +class GsmGatewayParsingTests(unittest.TestCase): + def test_terminal_status_requires_final_ok_line(self): + partial = '\r\n+CMGR: "REC READ","+9891","",""\r\n@S:SYM|abcOKxyz' + complete = partial + "\r\n\r\nOK\r\n" + + self.assertFalse(_has_terminal_status(partial)) + self.assertTrue(_has_terminal_status(complete)) + + def test_protocol_body_sanitizer_removes_control_bytes(self): + corrupted = "@S:SYM|abc\x11\x11DEF-_=" + + self.assertEqual(_sanitize_protocol_body(corrupted), "@S:SYM|abcDEF-_=") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_outgoing_secure_flow.py b/tests/test_outgoing_secure_flow.py new file mode 100644 index 0000000..5876480 --- /dev/null +++ b/tests/test_outgoing_secure_flow.py @@ -0,0 +1,108 @@ +import unittest +from pathlib import Path +import uuid + +from secure_sms.application.services import SecureMessagingService +from secure_sms.core.security import PasswordManager, StorageCipher +from secure_sms.infrastructure.database import Database + + +class OutgoingSecureFlowTests(unittest.TestCase): + def _build_service(self) -> SecureMessagingService: + db_path = ( + Path(__file__).resolve().parent + / f"__tmp_outgoing_secure_flow_{uuid.uuid4().hex}.db" + ) + + def cleanup(): + try: + if db_path.exists(): + db_path.unlink() + except PermissionError: + pass + + self.addCleanup(cleanup) + + db = Database(str(db_path)) + service = SecureMessagingService(db) + password_manager = PasswordManager() + meta = password_manager.create_metadata("admin-password-123") + db.set_security_metadata(meta) + service.cipher = StorageCipher( + password_manager.derive_key("admin-password-123", meta.salt) + ) + return service + + def test_prepare_outgoing_message_builds_flutter_compatible_secure_frame(self): + service = self._build_service() + + phone = "09121234567" + shared_key = "shared-key-123" + message = "hello from raspberry" + + service.add_or_update_contact("Test Contact", phone) + service.set_symmetric_key(phone, shared_key) + + frames, mode = service.prepare_outgoing_message(phone, message, symmetric_key=shared_key) + + self.assertEqual(mode, "secure") + self.assertEqual(len(frames), 1) + self.assertTrue(frames[0].startswith("@S:SYM|h1:")) + + payload = frames[0][7:] + self.assertEqual( + service.crypto.decrypt_symmetric(payload, shared_key), + message, + ) + + def test_explicit_send_key_is_cached_for_follow_up_secure_messages(self): + service = self._build_service() + + phone = "09121234567" + shared_key = "shared-key-123" + + service.add_or_update_contact("Test Contact", phone) + + first_frames, first_mode = service.prepare_outgoing_message( + phone, + "hello from python", + symmetric_key=shared_key, + ) + + self.assertEqual(first_mode, "secure") + self.assertTrue(first_frames[0].startswith("@S:SYM|h1:")) + + contact = service.get_contact(phone) + self.assertIsNotNone(contact) + self.assertEqual(contact.symmetric_key, shared_key) + self.assertEqual(contact.mode, "secure") + self.assertEqual(contact.secure_state, "ready") + self.assertIsNotNone(contact.last_secure_at) + + follow_up_frames, follow_up_mode = service.prepare_outgoing_message( + phone, + "second normal ping", + ) + + # Bug fix verification: it should now be "normal" if no key is passed, + # even if a key was used before. + self.assertEqual(follow_up_mode, "normal") + self.assertEqual(follow_up_frames[0], "second normal ping") + + # But we can still send secure if we provide the key again + third_frames, third_mode = service.prepare_outgoing_message( + phone, + "third secure ping", + symmetric_key=shared_key + ) + self.assertEqual(third_mode, "secure") + self.assertTrue(third_frames[0].startswith("@S:SYM|h1:")) + payload = third_frames[0][7:] + self.assertEqual( + service.crypto.decrypt_symmetric(payload, shared_key), + "third secure ping", + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_symmetric_crypto.py b/tests/test_symmetric_crypto.py new file mode 100644 index 0000000..084c96d --- /dev/null +++ b/tests/test_symmetric_crypto.py @@ -0,0 +1,51 @@ +import hashlib +import unittest +import unicodedata + +from cryptography.hazmat.primitives.ciphers.aead import AESGCM + +from secure_sms.core.security import SymmetricCryptoService, b64u_encode + + +def _encrypt_for_key_text(message: str, key_text: str) -> str: + key = hashlib.sha256(key_text.encode("utf-8")).digest() + nonce = bytes(range(12)) + ciphertext = AESGCM(key).encrypt(nonce, message.encode("utf-8"), None) + return b64u_encode(nonce + ciphertext) + + +class SymmetricCryptoInteropTests(unittest.TestCase): + def setUp(self): + self.service = SymmetricCryptoService() + + def test_encrypts_new_hex_transport(self): + payload = self.service.encrypt_symmetric("hex transport", "shared-key-123") + + self.assertTrue(payload.startswith("h1:")) + self.assertEqual( + self.service.decrypt_symmetric(payload, "shared-key-123"), + "hex transport", + ) + + def test_decrypts_flutter_style_payload(self): + key = " shared-key-123 " + payload = _encrypt_for_key_text("hello from flutter", key.strip()) + + self.assertEqual( + self.service.decrypt_symmetric(payload, key), + "hello from flutter", + ) + + def test_decrypts_legacy_python_nfc_payload(self): + raw_key = "Cafe\u0301-123" + legacy_python_key = unicodedata.normalize("NFC", raw_key.strip()) + payload = _encrypt_for_key_text("legacy payload", legacy_python_key) + + self.assertEqual( + self.service.decrypt_symmetric(payload, raw_key), + "legacy payload", + ) + + +if __name__ == "__main__": + unittest.main()