109 lines
3.5 KiB
Python
109 lines
3.5 KiB
Python
import unittest
|
|
from pathlib import Path
|
|
import uuid
|
|
|
|
from secure_sms.application.services import SecureMessagingService
|
|
from secure_sms.core.security import PasswordManager, StorageCipher
|
|
from secure_sms.infrastructure.database import Database
|
|
|
|
|
|
class OutgoingSecureFlowTests(unittest.TestCase):
|
|
def _build_service(self) -> SecureMessagingService:
|
|
db_path = (
|
|
Path(__file__).resolve().parent
|
|
/ f"__tmp_outgoing_secure_flow_{uuid.uuid4().hex}.db"
|
|
)
|
|
|
|
def cleanup():
|
|
try:
|
|
if db_path.exists():
|
|
db_path.unlink()
|
|
except PermissionError:
|
|
pass
|
|
|
|
self.addCleanup(cleanup)
|
|
|
|
db = Database(str(db_path))
|
|
service = SecureMessagingService(db)
|
|
password_manager = PasswordManager()
|
|
meta = password_manager.create_metadata("admin-password-123")
|
|
db.set_security_metadata(meta)
|
|
service.cipher = StorageCipher(
|
|
password_manager.derive_key("admin-password-123", meta.salt)
|
|
)
|
|
return service
|
|
|
|
def test_prepare_outgoing_message_builds_flutter_compatible_secure_frame(self):
|
|
service = self._build_service()
|
|
|
|
phone = "09121234567"
|
|
shared_key = "shared-key-123"
|
|
message = "hello from raspberry"
|
|
|
|
service.add_or_update_contact("Test Contact", phone)
|
|
service.set_symmetric_key(phone, shared_key)
|
|
|
|
frames, mode = service.prepare_outgoing_message(phone, message, symmetric_key=shared_key)
|
|
|
|
self.assertEqual(mode, "secure")
|
|
self.assertEqual(len(frames), 1)
|
|
self.assertTrue(frames[0].startswith("@S:SYM|h1:"))
|
|
|
|
payload = frames[0][7:]
|
|
self.assertEqual(
|
|
service.crypto.decrypt_symmetric(payload, shared_key),
|
|
message,
|
|
)
|
|
|
|
def test_explicit_send_key_is_cached_for_follow_up_secure_messages(self):
|
|
service = self._build_service()
|
|
|
|
phone = "09121234567"
|
|
shared_key = "shared-key-123"
|
|
|
|
service.add_or_update_contact("Test Contact", phone)
|
|
|
|
first_frames, first_mode = service.prepare_outgoing_message(
|
|
phone,
|
|
"hello from python",
|
|
symmetric_key=shared_key,
|
|
)
|
|
|
|
self.assertEqual(first_mode, "secure")
|
|
self.assertTrue(first_frames[0].startswith("@S:SYM|h1:"))
|
|
|
|
contact = service.get_contact(phone)
|
|
self.assertIsNotNone(contact)
|
|
self.assertEqual(contact.symmetric_key, shared_key)
|
|
self.assertEqual(contact.mode, "secure")
|
|
self.assertEqual(contact.secure_state, "ready")
|
|
self.assertIsNotNone(contact.last_secure_at)
|
|
|
|
follow_up_frames, follow_up_mode = service.prepare_outgoing_message(
|
|
phone,
|
|
"second normal ping",
|
|
)
|
|
|
|
# Bug fix verification: it should now be "normal" if no key is passed,
|
|
# even if a key was used before.
|
|
self.assertEqual(follow_up_mode, "normal")
|
|
self.assertEqual(follow_up_frames[0], "second normal ping")
|
|
|
|
# But we can still send secure if we provide the key again
|
|
third_frames, third_mode = service.prepare_outgoing_message(
|
|
phone,
|
|
"third secure ping",
|
|
symmetric_key=shared_key
|
|
)
|
|
self.assertEqual(third_mode, "secure")
|
|
self.assertTrue(third_frames[0].startswith("@S:SYM|h1:"))
|
|
payload = third_frames[0][7:]
|
|
self.assertEqual(
|
|
service.crypto.decrypt_symmetric(payload, shared_key),
|
|
"third secure ping",
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|