Saba-python/tests/test_outgoing_secure_flow.py
2026-03-27 19:20:38 +03:30

109 lines
3.5 KiB
Python

import unittest
from pathlib import Path
import uuid
from secure_sms.application.services import SecureMessagingService
from secure_sms.core.security import PasswordManager, StorageCipher
from secure_sms.infrastructure.database import Database
class OutgoingSecureFlowTests(unittest.TestCase):
def _build_service(self) -> SecureMessagingService:
db_path = (
Path(__file__).resolve().parent
/ f"__tmp_outgoing_secure_flow_{uuid.uuid4().hex}.db"
)
def cleanup():
try:
if db_path.exists():
db_path.unlink()
except PermissionError:
pass
self.addCleanup(cleanup)
db = Database(str(db_path))
service = SecureMessagingService(db)
password_manager = PasswordManager()
meta = password_manager.create_metadata("admin-password-123")
db.set_security_metadata(meta)
service.cipher = StorageCipher(
password_manager.derive_key("admin-password-123", meta.salt)
)
return service
def test_prepare_outgoing_message_builds_flutter_compatible_secure_frame(self):
service = self._build_service()
phone = "09121234567"
shared_key = "shared-key-123"
message = "hello from raspberry"
service.add_or_update_contact("Test Contact", phone)
service.set_symmetric_key(phone, shared_key)
frames, mode = service.prepare_outgoing_message(phone, message, symmetric_key=shared_key)
self.assertEqual(mode, "secure")
self.assertEqual(len(frames), 1)
self.assertTrue(frames[0].startswith("@S:SYM|h1:"))
payload = frames[0][7:]
self.assertEqual(
service.crypto.decrypt_symmetric(payload, shared_key),
message,
)
def test_explicit_send_key_is_cached_for_follow_up_secure_messages(self):
service = self._build_service()
phone = "09121234567"
shared_key = "shared-key-123"
service.add_or_update_contact("Test Contact", phone)
first_frames, first_mode = service.prepare_outgoing_message(
phone,
"hello from python",
symmetric_key=shared_key,
)
self.assertEqual(first_mode, "secure")
self.assertTrue(first_frames[0].startswith("@S:SYM|h1:"))
contact = service.get_contact(phone)
self.assertIsNotNone(contact)
self.assertEqual(contact.symmetric_key, shared_key)
self.assertEqual(contact.mode, "secure")
self.assertEqual(contact.secure_state, "ready")
self.assertIsNotNone(contact.last_secure_at)
follow_up_frames, follow_up_mode = service.prepare_outgoing_message(
phone,
"second normal ping",
)
# Bug fix verification: it should now be "normal" if no key is passed,
# even if a key was used before.
self.assertEqual(follow_up_mode, "normal")
self.assertEqual(follow_up_frames[0], "second normal ping")
# But we can still send secure if we provide the key again
third_frames, third_mode = service.prepare_outgoing_message(
phone,
"third secure ping",
symmetric_key=shared_key
)
self.assertEqual(third_mode, "secure")
self.assertTrue(third_frames[0].startswith("@S:SYM|h1:"))
payload = third_frames[0][7:]
self.assertEqual(
service.crypto.decrypt_symmetric(payload, shared_key),
"third secure ping",
)
if __name__ == "__main__":
unittest.main()