Saba-python/secure_sms/application/services.py
2026-03-27 19:20:38 +03:30

432 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import platform
from pathlib import Path
from typing import Optional
from secure_sms.infrastructure.database import Database, utc_now
from secure_sms.core.models import ContactDetails, ContactSummary, MessageView, PendingPacketView, SecureEventView
from secure_sms.core.protocol import (
build_nack,
build_normal_mode,
build_symmetric_msg,
parse_incoming,
)
from secure_sms.core.security import SymmetricCryptoService, PasswordManager, StorageCipher
from secure_sms.core.utils import normalize_phone
SYSTEM_CONTACT_LABEL = "مخاطب ناشناس"
class SecureMessagingService:
def __init__(self, db: Database):
self.db = db
self.password_manager = PasswordManager()
self.crypto = SymmetricCryptoService()
self.cipher: Optional[StorageCipher] = None
@property
def unlocked(self) -> bool:
return self.cipher is not None
def is_bootstrapped(self) -> bool:
return self.db.is_bootstrapped()
def bootstrap(self, password: str):
if self.db.is_bootstrapped():
raise ValueError("Application is already configured.")
meta = self.password_manager.create_metadata(password)
key = self.password_manager.derive_key(password, meta.salt)
self.cipher = StorageCipher(key)
self.db.set_security_metadata(meta)
self.db.save_identity(
private_key_enc=None,
public_key_enc=None,
fingerprint="SYMMETRIC_ONLY",
)
self.db.set_connection_settings("/dev/serial0" if os.name != "nt" else "COM1", 9600)
self.db.log_secure_event(None, "app_bootstrap", self._enc("راه‌اندازی اولیه صبا (نسخه متقارن AES-256) انجام شد."))
def unlock(self, password: str) -> bool:
meta = self.db.get_security_metadata()
if not meta:
raise ValueError("Application is not configured.")
if not self.password_manager.verify_password(password, meta):
return False
key = self.password_manager.derive_key(password, meta.salt)
self.cipher = StorageCipher(key)
# Migrate any existing legacy phone numbers to canonical format
try:
self._migrate_all_data()
except Exception as e:
print(f"[Migration] Error during data unification: {e}")
return True
def verify_password(self, password: str) -> bool:
meta = self.db.get_security_metadata()
if not meta:
return False
return self.password_manager.verify_password(password, meta)
def change_master_password(self, current_password: str, new_password: str):
meta = self.db.get_security_metadata()
if not meta or not self.password_manager.verify_password(current_password, meta):
raise ValueError("رمز فعلی درست نیست.")
old_key = self.password_manager.derive_key(current_password, meta.salt)
new_meta = self.password_manager.create_metadata(new_password)
new_key = self.password_manager.derive_key(new_password, new_meta.salt)
old_cipher = StorageCipher(old_key)
new_cipher = StorageCipher(new_key)
self.db.rotate_encrypted_payloads(old_cipher, new_cipher)
self.db.set_security_metadata(new_meta)
self.cipher = new_cipher
self.db.log_secure_event(None, "password_changed", self._enc("رمز اصلی برنامه تغییر کرد."))
def _enc(self, value: Optional[str]) -> Optional[str]:
if self.cipher is None:
raise RuntimeError("Application is locked.")
return self.cipher.encrypt_text(value)
def _dec(self, value: Optional[str]) -> Optional[str]:
if self.cipher is None:
raise RuntimeError("Application is locked.")
return self.cipher.decrypt_text(value)
def add_or_update_contact(self, name: str, phone: str):
phone = normalize_phone(phone)
self.db.upsert_contact(phone, self._enc(name))
def delete_contact(self, phone: str):
phone = normalize_phone(phone)
self.db.delete_contact(phone)
def ensure_contact(self, phone: str, fallback_name: Optional[str] = None):
phone = normalize_phone(phone)
if not phone:
return
fallback = fallback_name or SYSTEM_CONTACT_LABEL
self.db.ensure_contact_exists(phone, self._enc(fallback))
def list_contacts(self) -> list[ContactSummary]:
contacts = []
for row in self.db.list_contact_rows():
preview = self._dec(row["last_body_enc"]) if row["last_body_enc"] else ""
if preview:
preview = preview.replace("\n", " ").strip()
contacts.append(
ContactSummary(
phone=row["phone"],
name=self._dec(row["name_enc"]) or SYSTEM_CONTACT_LABEL,
mode=row["mode"],
secure_state=row["secure_state"],
has_peer_key=False, # Asymmetric is gone
last_message_preview=(preview[:38] + "...") if preview and len(preview) > 38 else (preview or ""),
)
)
return contacts
def get_contact(self, phone: str) -> Optional[ContactDetails]:
phone = normalize_phone(phone)
row = self.db.get_contact_row(phone)
if not row:
return None
return ContactDetails(
phone=row["phone"],
name=self._dec(row["name_enc"]) or SYSTEM_CONTACT_LABEL,
mode=row["mode"],
secure_state=row["secure_state"],
peer_fingerprint=row["peer_fingerprint"],
has_peer_key=False,
symmetric_key=self._dec(row["symmetric_key_enc"]) if row["symmetric_key_enc"] else None,
last_secure_at=row["last_secure_at"],
)
def get_messages(self, phone: str) -> list[MessageView]:
phone = normalize_phone(phone)
return [
MessageView(
id=row["id"],
phone=row["phone"],
direction=row["direction"],
body=self._dec(row["body_enc"]) or "",
mode=row["mode"],
transport_state=row["transport_state"],
created_at=row["created_at"],
)
for row in self.db.list_message_rows(phone)
]
# ── Outgoing Messages ──────────────────────────────────────────
def prepare_outgoing_message(self, phone: str, text: str, symmetric_key: Optional[str] = None) -> tuple[list[str], str]:
"""Prepare SMS frame(s) for an outgoing message."""
if len(text) > 1000:
raise ValueError("طول پیام نباید بیشتر از ۱۰۰۰ کاراکتر باشد.")
phone = normalize_phone(phone)
self.ensure_contact(phone)
contact = self.db.get_contact_row(phone)
saved_symmetric_key = (
self._dec(contact["symmetric_key_enc"])
if contact and contact["symmetric_key_enc"]
else None
)
# If the operator provides a key while sending, treat it as the contact's
# active shared key so the next secure message and any reply can reuse it.
if symmetric_key is not None:
symmetric_key = symmetric_key.strip()
if symmetric_key:
if (
symmetric_key != saved_symmetric_key
or not contact
or contact["mode"] != "secure"
or contact["secure_state"] != "ready"
):
self.db.update_contact_security(
phone,
mode="secure",
secure_state="ready",
symmetric_key_enc=self._enc(symmetric_key),
last_secure_at=utc_now(),
)
else:
symmetric_key = None
# If no key is provided for this specific message, default to normal mode.
# This allows the user to choice between "Normal" (plain text) and "Secure"
# for every outgoing message, regardless of the conversation state.
if not symmetric_key:
symmetric_key = None
if symmetric_key:
encrypted_payload = self.crypto.encrypt_symmetric(text, symmetric_key)
frames = build_symmetric_msg(encrypted_payload)
return frames, "secure"
# Normal mode: send plain text directly
return [text], "normal"
def store_outgoing_message(self, phone: str, text: str, mode: str, transport_state: str) -> int:
phone = normalize_phone(phone)
return self.db.add_message(
phone=phone,
direction="out",
body_enc=self._enc(text),
mode=mode,
transport_state=transport_state,
)
def request_normal_mode(self, phone: str) -> list[str]:
"""Switch contact back to normal mode."""
phone = normalize_phone(phone)
self.ensure_contact(phone)
sms = build_normal_mode()
self.db.update_contact_security(phone, mode="normal", secure_state="ready")
self.db.log_secure_event(phone, "normal_mode_sent", self._enc("بازگشت به حالت عادی برای مخاطب ارسال شد."))
return [sms]
def request_secure(self, phone: str) -> tuple[bool, str]:
"""Switch contact to secure mode (symmetric)."""
phone = normalize_phone(phone)
self.ensure_contact(phone)
contact = self.db.get_contact_row(phone)
symmetric_key = self._dec(contact["symmetric_key_enc"]) if contact and contact["symmetric_key_enc"] else None
if not symmetric_key:
raise ValueError("ابتدا باید کلید متقارن را برای این مخاطب تنظیم کنید.")
self.db.update_contact_security(phone, mode="secure", secure_state="ready")
self.db.log_secure_event(phone, "secure_mode_enabled", self._enc("حالت امن (متقارن) فعال شد."))
return True, "ready"
def set_symmetric_key(self, phone: str, key: str):
phone = normalize_phone(phone)
self.ensure_contact(phone)
self.db.update_contact_security(phone, mode="secure", symmetric_key_enc=self._enc(key))
self.db.log_secure_event(phone, "symmetric_key_set", self._enc("کلید متقارن تنظیم شد."))
def process_incoming_sms(self, sender: str, raw_text: str) -> tuple[str, Optional[list[str]]]:
"""Parse and process an incoming SMS."""
sender = normalize_phone(sender)
self.ensure_contact(sender)
msg = parse_incoming(raw_text)
if msg.msg_type == "plain":
self.db.add_message(
phone=sender,
direction="in",
body_enc=self._enc(msg.plain_text or ""),
mode="normal",
transport_state="received",
)
return sender, None
if msg.msg_type == "norm":
self.db.update_contact_security(sender, mode="normal", secure_state="ready")
self.db.log_secure_event(sender, "normal_mode_received", self._enc("مخاطب گفتگو را به حالت عادی برگرداند."))
return sender, None
if (msg.msg_type == "sym") or (msg.msg_type == "gsym"):
self._handle_symmetric_message(sender, msg.encrypted_payload or "", is_group=msg.is_group)
return sender, None
if msg.msg_type == "sfra":
self._handle_fragment(sender, msg)
return sender, None
if msg.msg_type == "nack":
print(f"[PROTO] NACK received from {sender}: pkt={msg.packet_id} part={msg.missing_part}")
return sender, None
return sender, None
# ── Symmetric Message Handler ──────────────────────────────────
def _handle_symmetric_message(self, sender: str, payload: str, is_group: bool = False):
"""Decrypt and store a single symmetric message (private or group)."""
try:
contact = self.db.get_contact_row(sender)
symmetric_key = self._dec(contact["symmetric_key_enc"]) if contact and contact["symmetric_key_enc"] else None
if not symmetric_key:
raise ValueError("No symmetric key")
body = self.crypto.decrypt_symmetric(payload, symmetric_key)
mode = "secure"
transport_state = "received_group" if is_group else "received_secure"
metadata_enc = None
except Exception:
body = "پیام رمزنگاری شده صبا دریافت شد اما بازگشایی نشد."
mode = "secure"
transport_state = "decrypt_failed"
metadata_enc = self._enc(payload) # Store raw payload for manual retry
self.db.log_secure_event(sender, "decrypt_failed_sym", self._enc("بازگشایی پیام ناموفق بود. کلید نامعتبر است."))
self.db.add_message(
phone=sender,
direction="in",
body_enc=self._enc(body),
mode=mode,
transport_state=transport_state,
metadata_enc=metadata_enc
)
def decrypt_message_manually(self, message_id: int, key: str) -> bool:
"""Attempt to decrypt a specific message using a manually provided key."""
row = self.db.get_message_row(message_id)
if not row or not row["metadata_enc"]:
return False
payload = self._dec(row["metadata_enc"])
try:
body = self.crypto.decrypt_symmetric(payload, key)
# If successful, update the main body and status
self.db.update_message_body(message_id, self._enc(body), "received_secure")
self.db.update_contact_security(
row["phone"],
mode="secure",
symmetric_key_enc=self._enc(key),
secure_state="ready",
)
self.db.log_secure_event(
row["phone"],
"manual_decrypt_success",
self._enc("پیام با کلید وارد شده بازگشایی شد و کلید برای مخاطب ذخیره شد."),
)
return True
except Exception:
return False
# ── Fragment Handler ───────────────────────────────────────────
def _handle_fragment(self, sender: str, msg) -> Optional[list[str]]:
"""Store a fragment and assemble when all parts arrive (always symmetric)."""
self.db.save_fragment(
sender,
msg.packet_id,
"message",
"SYM",
msg.part_no,
msg.total_parts,
msg.chunk,
)
fragments = self.db.get_packet_fragments(sender, msg.packet_id)
if len(fragments) < msg.total_parts:
self.db.log_secure_event(
sender,
"fragment_received",
self._enc(f"قطعه {msg.part_no}/{msg.total_parts} دریافت شد (بسته {msg.packet_id})."),
)
return None
# All fragments received — assemble and decrypt
full_payload = "".join(f["chunk"] for f in fragments)
self.db.delete_packet_fragments(sender, msg.packet_id)
self._handle_symmetric_message(sender, full_payload)
return None
def _migrate_all_data(self):
"""Scan and fix any non-normalized phone numbers in the system."""
rows = self.db.list_contact_rows()
for row in rows:
orig = row["phone"]
canonical = normalize_phone(orig)
if orig == canonical:
continue
print(f"[Migration] Unifying legacy phone {orig} -> {canonical}")
self.db.migrate_messages_phone(orig, canonical)
self.db.migrate_fragments_phone(orig, canonical)
self.db.migrate_events_phone(orig, canonical)
if self.db.get_contact_row(canonical):
self.db.delete_contact(orig)
else:
self.db.rename_contact_phone(orig, canonical)
# ── Admin & Settings ───────────────────────────────────────────
def get_admin_snapshot(self) -> dict:
stats = self.db.collect_stats()
return {
"stats": stats,
"events": [
SecureEventView(
created_at=row["created_at"],
event_type=row["event_type"],
phone=row["phone"] or "-",
details=self._dec(row["details_enc"]) or "",
)
for row in self.db.list_secure_event_rows()
],
"pending_packets": [
PendingPacketView(
phone=row["phone"],
packet_id=row["packet_id"],
packet_kind=row["packet_kind"],
packet_mode=row["packet_mode"],
received_parts=row["received_parts"],
total_parts=row["total_parts"],
first_seen=row["first_seen"],
)
for row in self.db.list_pending_packets()
],
"system_info": {
"platform": platform.platform(),
"python": platform.python_version(),
"db_path": str(Path(self.db.db_path).resolve()),
"modem_port": self.db.get_connection_settings()[0],
"baudrate": self.db.get_connection_settings()[1],
"fingerprint": "SYMMETRIC_ONLY",
},
}
def get_connection_settings(self) -> tuple[str, int]:
return self.db.get_connection_settings()
def update_connection_settings(self, port: str, baudrate: int):
self.db.set_connection_settings(port, baudrate)
self.db.log_secure_event(None, "connection_settings_changed", self._enc(f"تنظیمات مودم به {port}/{baudrate} تغییر کرد."))