412 lines
17 KiB
Python
412 lines
17 KiB
Python
import platform
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from secure_sms.infrastructure.database import Database, utc_now
|
|
from secure_sms.core.models import ContactDetails, ContactSummary, MessageView, PendingPacketView, SecureEventView
|
|
from secure_sms.core.protocol import (
|
|
build_control_frames,
|
|
build_message_frames,
|
|
decode_control_payload,
|
|
decode_plain_body,
|
|
encode_plain_body,
|
|
parse_frame,
|
|
)
|
|
from secure_sms.core.security import ECCCryptoService, PasswordManager, StorageCipher
|
|
|
|
|
|
SYSTEM_CONTACT_LABEL = "مخاطب ناشناس"
|
|
|
|
|
|
class SecureMessagingService:
|
|
def __init__(self, db: Database):
|
|
self.db = db
|
|
self.password_manager = PasswordManager()
|
|
self.crypto = ECCCryptoService()
|
|
self.cipher: Optional[StorageCipher] = None
|
|
self.identity = None
|
|
|
|
@property
|
|
def unlocked(self) -> bool:
|
|
return self.cipher is not None and self.identity is not None
|
|
|
|
def is_bootstrapped(self) -> bool:
|
|
return self.db.is_bootstrapped()
|
|
|
|
def bootstrap(self, password: str):
|
|
if self.db.is_bootstrapped():
|
|
raise ValueError("Application is already configured.")
|
|
meta = self.password_manager.create_metadata(password)
|
|
key = self.password_manager.derive_key(password, meta.salt)
|
|
self.cipher = StorageCipher(key)
|
|
private_key, public_key, fingerprint = self.crypto.generate_identity()
|
|
self.db.set_security_metadata(meta)
|
|
self.db.save_identity(
|
|
private_key_enc=self.cipher.encrypt_text(private_key),
|
|
public_key_enc=self.cipher.encrypt_text(public_key),
|
|
fingerprint=fingerprint,
|
|
)
|
|
import os
|
|
self.db.set_connection_settings("/dev/ttyS0" if os.name != "nt" else "COM1", 115200)
|
|
self.identity = {
|
|
"private_key": private_key,
|
|
"public_key": public_key,
|
|
"fingerprint": fingerprint,
|
|
}
|
|
self.db.log_secure_event(None, "app_bootstrap", self._enc("راهاندازی اولیه برنامه انجام شد."))
|
|
|
|
def unlock(self, password: str) -> bool:
|
|
meta = self.db.get_security_metadata()
|
|
if not meta:
|
|
raise ValueError("Application is not configured.")
|
|
if not self.password_manager.verify_password(password, meta):
|
|
return False
|
|
key = self.password_manager.derive_key(password, meta.salt)
|
|
self.cipher = StorageCipher(key)
|
|
identity_row = self.db.get_identity_row()
|
|
if identity_row is None:
|
|
raise ValueError("Secure identity was not found.")
|
|
self.identity = {
|
|
"private_key": self.cipher.decrypt_text(identity_row["private_key_enc"]),
|
|
"public_key": self.cipher.decrypt_text(identity_row["public_key_enc"]),
|
|
"fingerprint": identity_row["fingerprint"],
|
|
}
|
|
return True
|
|
|
|
def verify_password(self, password: str) -> bool:
|
|
meta = self.db.get_security_metadata()
|
|
if not meta:
|
|
return False
|
|
return self.password_manager.verify_password(password, meta)
|
|
|
|
def change_master_password(self, current_password: str, new_password: str):
|
|
meta = self.db.get_security_metadata()
|
|
if not meta or not self.password_manager.verify_password(current_password, meta):
|
|
raise ValueError("رمز فعلی درست نیست.")
|
|
old_key = self.password_manager.derive_key(current_password, meta.salt)
|
|
new_meta = self.password_manager.create_metadata(new_password)
|
|
new_key = self.password_manager.derive_key(new_password, new_meta.salt)
|
|
old_cipher = StorageCipher(old_key)
|
|
new_cipher = StorageCipher(new_key)
|
|
self.db.rotate_encrypted_payloads(old_cipher, new_cipher)
|
|
self.db.set_security_metadata(new_meta)
|
|
self.cipher = new_cipher
|
|
identity_row = self.db.get_identity_row()
|
|
self.identity = {
|
|
"private_key": self.cipher.decrypt_text(identity_row["private_key_enc"]),
|
|
"public_key": self.cipher.decrypt_text(identity_row["public_key_enc"]),
|
|
"fingerprint": identity_row["fingerprint"],
|
|
}
|
|
self.db.log_secure_event(None, "password_changed", self._enc("رمز اصلی برنامه تغییر کرد."))
|
|
|
|
def _enc(self, value: Optional[str]) -> Optional[str]:
|
|
if self.cipher is None:
|
|
raise RuntimeError("Application is locked.")
|
|
return self.cipher.encrypt_text(value)
|
|
|
|
def _dec(self, value: Optional[str]) -> Optional[str]:
|
|
if self.cipher is None:
|
|
raise RuntimeError("Application is locked.")
|
|
return self.cipher.decrypt_text(value)
|
|
|
|
def add_or_update_contact(self, name: str, phone: str):
|
|
self.db.upsert_contact(phone, self._enc(name))
|
|
|
|
def delete_contact(self, phone: str):
|
|
self.db.delete_contact(phone)
|
|
|
|
def ensure_contact(self, phone: str, fallback_name: Optional[str] = None):
|
|
fallback = fallback_name or SYSTEM_CONTACT_LABEL
|
|
self.db.ensure_contact_exists(phone, self._enc(fallback))
|
|
|
|
def list_contacts(self) -> list[ContactSummary]:
|
|
contacts = []
|
|
for row in self.db.list_contact_rows():
|
|
preview = self._dec(row["last_body_enc"]) if row["last_body_enc"] else ""
|
|
if preview:
|
|
preview = preview.replace("\n", " ").strip()
|
|
contacts.append(
|
|
ContactSummary(
|
|
phone=row["phone"],
|
|
name=self._dec(row["name_enc"]) or SYSTEM_CONTACT_LABEL,
|
|
mode=row["mode"],
|
|
secure_state=row["secure_state"],
|
|
has_peer_key=bool(row["peer_public_key_enc"]),
|
|
last_message_preview=(preview[:38] + "...") if preview and len(preview) > 38 else (preview or ""),
|
|
)
|
|
)
|
|
return contacts
|
|
|
|
def get_contact(self, phone: str) -> Optional[ContactDetails]:
|
|
row = self.db.get_contact_row(phone)
|
|
if not row:
|
|
return None
|
|
return ContactDetails(
|
|
phone=row["phone"],
|
|
name=self._dec(row["name_enc"]) or SYSTEM_CONTACT_LABEL,
|
|
mode=row["mode"],
|
|
secure_state=row["secure_state"],
|
|
peer_fingerprint=row["peer_fingerprint"],
|
|
has_peer_key=bool(row["peer_public_key_enc"]),
|
|
last_secure_at=row["last_secure_at"],
|
|
)
|
|
|
|
def get_messages(self, phone: str) -> list[MessageView]:
|
|
return [
|
|
MessageView(
|
|
id=row["id"],
|
|
phone=row["phone"],
|
|
direction=row["direction"],
|
|
body=self._dec(row["body_enc"]) or "",
|
|
mode=row["mode"],
|
|
transport_state=row["transport_state"],
|
|
created_at=row["created_at"],
|
|
)
|
|
for row in self.db.list_message_rows(phone)
|
|
]
|
|
|
|
def get_public_identity(self) -> dict:
|
|
if not self.identity:
|
|
raise RuntimeError("Application is locked.")
|
|
return {
|
|
"public_key": self.identity["public_key"],
|
|
"fingerprint": self.identity["fingerprint"],
|
|
}
|
|
|
|
def prepare_outgoing_message(self, phone: str, text: str) -> tuple[list[str], str]:
|
|
contact = self.db.get_contact_row(phone)
|
|
if not contact:
|
|
raise ValueError("مخاطب پیدا نشد.")
|
|
mode = contact["mode"]
|
|
if mode == "secure":
|
|
peer_key = self._dec(contact["peer_public_key_enc"])
|
|
if not peer_key:
|
|
raise ValueError("برای این مخاطب کلید امن وجود ندارد.")
|
|
encoded_payload = self.crypto.encrypt_for_peer(text, peer_key)
|
|
return build_message_frames("S", encoded_payload), "secure"
|
|
encoded_payload = encode_plain_body(text)
|
|
return build_message_frames("N", encoded_payload), "normal"
|
|
|
|
def store_outgoing_message(self, phone: str, text: str, mode: str, transport_state: str) -> int:
|
|
return self.db.add_message(
|
|
phone=phone,
|
|
direction="out",
|
|
body_enc=self._enc(text),
|
|
mode=mode,
|
|
transport_state=transport_state,
|
|
)
|
|
|
|
def request_secure_channel(self, phone: str) -> list[str]:
|
|
self.ensure_contact(phone)
|
|
payload = {
|
|
"type": "hello",
|
|
"public_key": self.identity["public_key"],
|
|
"fingerprint": self.identity["fingerprint"],
|
|
"ts": utc_now(),
|
|
}
|
|
self.db.update_contact_security(phone, mode="normal", secure_state="pending")
|
|
self.db.log_secure_event(phone, "hello_sent", self._enc("درخواست ارتباط امن ارسال شد."))
|
|
self.db.add_message(
|
|
phone=phone,
|
|
direction="system",
|
|
body_enc=self._enc("درخواست ارتباط امن برای مخاطب ارسال شد."),
|
|
mode="system",
|
|
transport_state="local",
|
|
)
|
|
return build_control_frames(payload)
|
|
|
|
def request_normal_mode(self, phone: str) -> list[str]:
|
|
self.ensure_contact(phone)
|
|
payload = {
|
|
"type": "normal_mode",
|
|
"ts": utc_now(),
|
|
}
|
|
self.db.update_contact_security(phone, mode="normal", secure_state="ready")
|
|
self.db.log_secure_event(phone, "normal_mode_sent", self._enc("بازگشت به حالت عادی برای مخاطب ارسال شد."))
|
|
self.db.add_message(
|
|
phone=phone,
|
|
direction="system",
|
|
body_enc=self._enc("گفتگو به حالت عادی برگشت."),
|
|
mode="system",
|
|
transport_state="local",
|
|
)
|
|
return build_control_frames(payload)
|
|
|
|
def process_incoming_sms(self, sender: str, raw_text: str) -> tuple[str, Optional[list[str]]]:
|
|
self.ensure_contact(sender)
|
|
frame = parse_frame(raw_text)
|
|
if not frame:
|
|
self.db.add_message(
|
|
phone=sender,
|
|
direction="in",
|
|
body_enc=self._enc(raw_text),
|
|
mode="normal",
|
|
transport_state="received_raw",
|
|
)
|
|
return sender, None
|
|
|
|
payload = self._store_or_assemble_frame(sender, frame)
|
|
if payload is None:
|
|
self.db.log_secure_event(
|
|
sender,
|
|
"packet_fragment_received",
|
|
self._enc(f"بسته {frame.packet_id} در حال تکمیل است ({frame.part_no}/{frame.total_parts})."),
|
|
)
|
|
return sender, None
|
|
|
|
if frame.category == "control":
|
|
return sender, self._handle_control_payload(sender, payload)
|
|
else:
|
|
self._handle_message_payload(sender, frame.mode or "N", payload)
|
|
return sender, None
|
|
|
|
def _store_or_assemble_frame(self, sender: str, frame) -> Optional[str]:
|
|
if frame.total_parts == 1:
|
|
return frame.chunk
|
|
self.db.save_fragment(
|
|
sender,
|
|
frame.packet_id,
|
|
frame.category,
|
|
frame.mode,
|
|
frame.part_no,
|
|
frame.total_parts,
|
|
frame.chunk,
|
|
)
|
|
fragments = self.db.get_packet_fragments(sender, frame.packet_id)
|
|
if len(fragments) < frame.total_parts:
|
|
return None
|
|
payload = "".join(fragment["chunk"] for fragment in fragments)
|
|
self.db.delete_packet_fragments(sender, frame.packet_id)
|
|
return payload
|
|
|
|
def _handle_control_payload(self, sender: str, payload: str) -> Optional[list[str]]:
|
|
data = decode_control_payload(payload)
|
|
action = data.get("type")
|
|
if action == "hello":
|
|
public_key = data.get("public_key")
|
|
fingerprint = data.get("fingerprint") or self.crypto.fingerprint_public_key(public_key)
|
|
self.db.update_contact_security(
|
|
sender,
|
|
mode="secure",
|
|
secure_state="ready",
|
|
peer_public_key_enc=self._enc(public_key),
|
|
peer_fingerprint=fingerprint,
|
|
last_secure_at=utc_now(),
|
|
)
|
|
self.db.log_secure_event(sender, "hello_received", self._enc("درخواست ارتباط امن دریافت شد."))
|
|
self.db.log_secure_event(sender, "secure_established", self._enc("ارتباط امن برقرار شد."))
|
|
self.db.add_message(
|
|
phone=sender,
|
|
direction="system",
|
|
body_enc=self._enc("ارتباط امن با این مخاطب فعال شد."),
|
|
mode="system",
|
|
transport_state="local",
|
|
)
|
|
reply = {
|
|
"type": "hello_ack",
|
|
"public_key": self.identity["public_key"],
|
|
"fingerprint": self.identity["fingerprint"],
|
|
"ts": utc_now(),
|
|
}
|
|
return build_control_frames(reply)
|
|
elif action == "hello_ack":
|
|
public_key = data.get("public_key")
|
|
fingerprint = data.get("fingerprint") or self.crypto.fingerprint_public_key(public_key)
|
|
self.db.update_contact_security(
|
|
sender,
|
|
mode="secure",
|
|
secure_state="ready",
|
|
peer_public_key_enc=self._enc(public_key),
|
|
peer_fingerprint=fingerprint,
|
|
last_secure_at=utc_now(),
|
|
)
|
|
self.db.log_secure_event(sender, "hello_ack_received", self._enc("پاسخ ارتباط امن دریافت شد."))
|
|
self.db.log_secure_event(sender, "secure_established", self._enc("ارتباط امن برقرار شد."))
|
|
self.db.add_message(
|
|
phone=sender,
|
|
direction="system",
|
|
body_enc=self._enc("ارتباط امن آماده استفاده است."),
|
|
mode="system",
|
|
transport_state="local",
|
|
)
|
|
return None
|
|
elif action == "normal_mode":
|
|
self.db.update_contact_security(sender, mode="normal", secure_state="ready")
|
|
self.db.log_secure_event(sender, "normal_mode_received", self._enc("مخاطب گفتگو را به حالت عادی برگرداند."))
|
|
self.db.add_message(
|
|
phone=sender,
|
|
direction="system",
|
|
body_enc=self._enc("مخاطب گفتگو را به حالت عادی برگرداند."),
|
|
mode="system",
|
|
transport_state="local",
|
|
)
|
|
return None
|
|
return None
|
|
|
|
def _handle_message_payload(self, sender: str, mode_marker: str, payload: str):
|
|
if mode_marker == "S":
|
|
try:
|
|
body = self.crypto.decrypt_from_peer(payload, self.identity["private_key"])
|
|
mode = "secure"
|
|
transport_state = "received_secure"
|
|
except Exception:
|
|
body = "پیام امن دریافت شد اما بازگشایی نشد."
|
|
mode = "secure"
|
|
transport_state = "decrypt_failed"
|
|
self.db.log_secure_event(sender, "decrypt_failed", self._enc("بازگشایی پیام امن ناموفق بود."))
|
|
else:
|
|
body = decode_plain_body(payload)
|
|
mode = "normal"
|
|
transport_state = "received"
|
|
|
|
self.db.add_message(
|
|
phone=sender,
|
|
direction="in",
|
|
body_enc=self._enc(body),
|
|
mode=mode,
|
|
transport_state=transport_state,
|
|
)
|
|
|
|
def get_admin_snapshot(self) -> dict:
|
|
stats = self.db.collect_stats()
|
|
identity = self.get_public_identity()
|
|
return {
|
|
"stats": stats,
|
|
"events": [
|
|
SecureEventView(
|
|
created_at=row["created_at"],
|
|
event_type=row["event_type"],
|
|
phone=row["phone"] or "-",
|
|
details=self._dec(row["details_enc"]) or "",
|
|
)
|
|
for row in self.db.list_secure_event_rows()
|
|
],
|
|
"pending_packets": [
|
|
PendingPacketView(
|
|
phone=row["phone"],
|
|
packet_id=row["packet_id"],
|
|
packet_kind=row["packet_kind"],
|
|
packet_mode=row["packet_mode"],
|
|
received_parts=row["received_parts"],
|
|
total_parts=row["total_parts"],
|
|
first_seen=row["first_seen"],
|
|
)
|
|
for row in self.db.list_pending_packets()
|
|
],
|
|
"system_info": {
|
|
"platform": platform.platform(),
|
|
"python": platform.python_version(),
|
|
"db_path": str(Path(self.db.db_path).resolve()),
|
|
"modem_port": self.db.get_connection_settings()[0],
|
|
"baudrate": self.db.get_connection_settings()[1],
|
|
"fingerprint": identity["fingerprint"],
|
|
},
|
|
}
|
|
|
|
def get_connection_settings(self) -> tuple[str, int]:
|
|
return self.db.get_connection_settings()
|
|
|
|
def update_connection_settings(self, port: str, baudrate: int):
|
|
self.db.set_connection_settings(port, baudrate)
|
|
self.db.log_secure_event(None, "connection_settings_changed", self._enc(f"تنظیمات مودم به {port}/{baudrate} تغییر کرد."))
|