4 from pathlib
import Path
6 from gi
.repository
import Gio
, GLib
7 from logging
import getLogger
9 from typing
import TYPE_CHECKING
, Optional
13 InvalidBootDeviceErrorType
,
14 SYSTEM_PARTITION_MOUNT_POINT
,
15 LUKS_HEADER_BACKUP_PATH
,
16 IO_ERRORS_FLAG_FILE_PATH
,
18 from tps
.configuration
import features
19 from tps
.configuration
.config_file
import ConfigFile
, InvalidStatError
20 from tps
.configuration
.feature
import Feature
, ConflictingProcessesError
21 from tps
.dbus
.errors
import (
22 InvalidConfigFileError
,
23 FailedPreconditionError
,
24 FeatureActivationFailedError
,
25 ActivationFailedError
,
26 DeactivationFailedError
,
27 FilesystemErrorsLeftUncorrectedError
,
28 IOErrorsDetectedError
,
30 from tps
.dbus
.object import DBusObject
31 from tps
.device
import (
35 InvalidBootDeviceError
,
36 InvalidPartitionError
,
37 PartitionNotUnlockedError
,
39 from tps
.job
import ServiceUsingJobs
43 DBUS_ROOT_OBJECT_PATH
,
44 DBUS_SERVICE_INTERFACE
,
46 TPS_BACKUP_MOUNT_POINT
,
47 ON_ACTIVATED_HOOKS_DIR
,
48 ON_DEACTIVATED_HOOKS_DIR
,
53 from tps
.job
import Job
55 logger
= getLogger(__name__
)
58 class AlreadyCreatedError(Exception):
62 class NotCreatedError(Exception):
66 class AlreadyUnlockedError(Exception):
70 class NotUnlockedError(Exception):
74 class Service(DBusObject
, ServiceUsingJobs
):
77 <interface name='org.boum.tails.PersistentStorage'>
79 <method name='Reload'/>
80 <method name='GetFeatures'>
81 <arg name='features' direction='out' type='as'/>
83 <method name='Create'>
84 <arg name='passphrase' direction='in' type='s'/>
86 <method name='CreateBackup'>
87 <arg name='passphrase' direction='in' type='s'/>
88 <arg name='device' direction='in' type='s'/>
90 <method name='ChangePassphrase'>
91 <arg name='passphrase' direction='in' type='s'/>
92 <arg name='new_passphrase' direction='in' type='s'/>
94 <method name='Delete'/>
95 <method name='Activate'/>
96 <method name='Unlock'>
97 <arg name='passphrase' direction='in' type='s'/>
98 <arg name='forceful_fsck' direction='in' type='b'/>
100 <method name='UpgradeLUKS'>
101 <arg name='passphrase' direction='in' type='s'/>
103 <method name='RepairFilesystem'/>
104 <method name='AbortRepairFilesystem'/>
105 <property name="State" type="s" access="read" />
106 <property name="Error" type="u" access="read" />
107 <property name="IsCreated" type="b" access="read"/>
108 <property name="IsUnlocked" type="b" access="read"/>
109 <property name="IsUpgraded" type="b" access="read"/>
110 <property name="CanDelete" type="b" access="read"/>
111 <property name="CanUnlock" type="b" access="read"/>
112 <property name="BootDeviceIsSupported" type="b" access="read"/>
113 <property name="Device" type="s" access="read"/>
114 <property name="Job" type="o" access="read"/>
119 dbus_path
= DBUS_ROOT_OBJECT_PATH
121 def __init__(self
, connection
: Gio
.DBusConnection
, loop
: GLib
.MainLoop
):
122 super().__init
__(connection
=connection
)
123 self
.connection
= connection
125 self
.object_manager
= None # type: Optional[Gio.DBusObjectManagerServer]
126 self
.config_file
= ConfigFile(TPS_MOUNT_POINT
)
128 self
.features
: list[Feature
] = []
129 self
._tps
_partition
= None # type: Optional[TPSPartition]
130 self
._cleartext
_device
= None
132 self
.state
= State
.UNKNOWN
134 self
._unlocked
= False
135 self
._can
_delete
= False
136 self
._can
_unlock
= False
137 self
._upgraded
= False
138 self
._created
= False
139 self
.enable_features_lock
= threading
.Lock()
140 self
._boot
_device
= None # type: Optional[BootDevice]
142 # Check if the boot device is valid for creating or using a Persistent
143 # Storage. We only do this once and not in refresh_state(),
144 # because we don't expect the boot device to change while the
145 # service is running. Unfortunately, this implies we'll keep
146 # a potentially obsolete error state as-is even if the user
147 # corrects the partition table (e.g. deleting a manually
148 # created second partition) without restarting the service.
150 self
._boot
_device
= BootDevice
.get_tails_boot_device()
151 except InvalidBootDeviceError
as e
:
152 logger
.warning("Invalid boot device: %s", e
)
153 self
.State
= State
.NOT_CREATED
155 # This will allow the UI to give the user more specific guidance.
156 self
.Error
= e
.error_type
161 # ----- Exported methods ----- #
163 def Quit_method_call_handler(
165 connection
: Gio
.DBusConnection
,
166 parameters
: GLib
.Variant
,
167 invocation
: Gio
.DBusMethodInvocation
,
169 """Terminate the Persistent Storage service."""
170 # Make the D-Bus method return first, else our main thread
171 # might exit before we can call return, resulting in a NoReply
172 # error on the client.
173 invocation
.return_value(None)
174 connection
.flush_sync()
175 logger
.info("Quit() was called, terminating...")
177 self
.unregister(self
.connection
)
178 self
.wait_for_method_calls_to_finish(True)
182 """Reload the state of the service and all features"""
184 self
.refresh_features()
186 def GetFeatures(self
) -> list[str]:
187 """List the IDs of all features"""
188 self
.refresh_features()
189 return [f
.Id
for f
in self
.features
]
191 def Create(self
, passphrase
: str):
192 """Create the Persistent Storage partition and activate the
195 logger
.info("Creating Persistent Storage...")
197 # Check if we can create the Persistent Storage
198 if self
.state
!= State
.NOT_CREATED
:
199 msg
= "Can't create Persistent Storage when state is '%s'" % self
.state
.name
200 raise FailedPreconditionError(msg
)
203 self
.do_create(passphrase
)
205 self
.refresh_state(overwrite_in_progress
=True)
206 self
.refresh_features()
208 logger
.info("Done creating Persistent Storage")
210 def do_create(self
, passphrase
: str):
211 self
.State
= State
.CREATING
212 with self
.new_job() as job
:
213 self
._tps
_partition
= TPSPartition
.create(job
, passphrase
)
215 # Activate all features that should be enabled by default
216 for feature
in (f
for f
in self
.features
if f
.enabled_by_default
):
218 feature
.do_activate(None, non_blocking
=True)
219 except ConflictingProcessesError
as e
:
220 # We can't automatically activate the feature, but
221 # lets not bother the user about that, because they
222 # did not explicitly enable the feature. If they try
223 # to enable it explicitly, they will see a message
224 # about the conflicting process.
227 feature
.refresh_state()
229 self
.run_on_activated_hooks()
231 def CreateBackup(self
, passphrase
: str, device
: str):
232 """Create a backup of the Persistent Storage partition"""
233 logger
.info(f
"Creating Persistent Storage backup on device {device}...")
235 if self
.state
!= State
.UNLOCKED
:
237 "Can't create backup of Persistent Storage when state is '%s'"
240 raise FailedPreconditionError(msg
)
242 dev_num
= os
.stat(device
).st_rdev
243 device
= BootDevice(udisks().get_block_for_dev(dev_num
).get_object())
244 partition
= TPSPartition
.create(None, passphrase
, device
)
246 # Mount the cleartext device
247 Path(TPS_BACKUP_MOUNT_POINT
).mkdir(parents
=True, exist_ok
=True)
248 cleartext_device_path
= partition
.get_cleartext_device().device_path
249 executil
.check_call(["mount", cleartext_device_path
, TPS_BACKUP_MOUNT_POINT
])
251 # Copy the data from the Persistent Storage to the new device
252 executil
.check_call(["/usr/local/lib/tails-backup-rsync"])
254 # Unmount the cleartext device. We don't do that in a finally
255 # block, because we want to keep the device mounted in case
256 # of an error, so that the user can inspect which files (if any)
258 partition
.get_cleartext_device().force_unmount()
260 # Delete the mountpoint
261 Path(TPS_BACKUP_MOUNT_POINT
).rmdir()
263 # Close the LUKS device
264 partition
._get
_encrypted
().call_lock_sync( # noqa: SLF001
265 arg_options
=GLib
.Variant("a{sv}", {}),
269 """Delete the Persistent Storage partition"""
270 # Check if we can delete the Persistent Storage
271 if self
.state
not in (State
.NOT_UNLOCKED
, State
.UNLOCKED
):
272 msg
= "Can't delete Persistent Storage when state is '%s'" % self
.state
.name
273 raise FailedPreconditionError(msg
)
275 logger
.info("Deleting Persistent Storage...")
278 # Disable all features first to ensure that no process is
279 # accessing any of the bindings
280 for feature
in self
.features
:
285 self
.refresh_state(overwrite_in_progress
=True)
286 self
.refresh_features()
288 logger
.info("Done deleting Persistent Storage")
291 # Delete the partition
292 self
.State
= State
.DELETING
293 self
._tps
_partition
.delete()
296 """Activate all Persistent Storage features which are currently
297 configured in the persistence.conf config file."""
299 logger
.info("Activating Persistent Storage...")
301 # Wait for all udev and UDisks events to finish
302 executil
.check_call(["udevadm", "settle"])
305 # Check if we can activate the Persistent Storage
306 if self
.state
!= State
.UNLOCKED
:
307 msg
= "Can't activate features when state is '%s'" % self
.state
.name
308 raise FailedPreconditionError(msg
)
312 except (InvalidBootDeviceError
, InvalidPartitionError
) as e
:
313 raise NotCreatedError("No Persistent Storage found") from e
319 self
.refresh_features()
321 logger
.info("Done activating Persistent Storage")
323 def do_activate(self
):
324 # Ensure that the config file exists
325 if not self
.config_file
.exists():
326 self
.config_file
.save([])
328 # Check that the config file has secure ownership and
329 # permissions. If not, disable the file and create an empty
332 self
.config_file
.check_file_stat()
333 except InvalidStatError
as e
:
334 logger
.warning(f
"Disabling invalid config file: {e}")
336 self
.config_file
.disable_and_create_empty()
337 self
.run_on_activated_hooks()
339 raise InvalidConfigFileError(e
) from e
341 self
.refresh_features()
342 failed_feature_names
= list()
343 for feature
in [f
for f
in self
.features
if f
.IsEnabled
]:
345 feature
.do_activate(None, non_blocking
=True)
347 logger
.exception("Failed to activate feature")
348 failed_feature_names
.append(feature
.translatable_name
)
350 feature
.refresh_state(emit_properties_changed_signal
=True)
352 self
.run_on_activated_hooks()
354 if any(failed_feature_names
):
355 # We want to show a translatable error message to the user
356 # but because the Service.Activate method is called in the
357 # Welcome Screen (and only there), only the Welcome Screen
358 # knows which language the user has currently selected.
359 # So we let the Welcome Screen translate the error message
360 # instead and make it easy for it by just passing the list
361 # of translatable feature names in the error message.
362 msg
= ":".join(failed_feature_names
)
363 raise FeatureActivationFailedError(msg
)
365 def Unlock(self
, passphrase
: str, forceful_fsck
: bool):
366 """Unlock and mount the Persistent Storage"""
368 logger
.info("Unlocking Persistent Storage...")
370 # Check if we can unlock the Persistent Storage. We also support
371 # unlocking when the state is UNLOCKED to allow the caller to
372 # retry unlocking after an incomplete unlock attempt (e.g. when
373 # filesystem errors were left uncorrected).
374 if self
.state
not in (State
.NOT_UNLOCKED
, State
.UNLOCKED
):
375 msg
= "Can't unlock when state is '%s'" % self
.state
.name
376 raise FailedPreconditionError(msg
)
379 self
.do_unlock(passphrase
, forceful_fsck
)
381 self
.refresh_state(overwrite_in_progress
=True)
382 # We don't refresh the features here to avoid that any errors
383 # caused by unexpected state of the Persistent Storage are
384 # shown to the user as "Failed to Unlock", which would be
385 # misleading because it was unlocked successfully.
386 # We expect the caller to call Activate next after a
387 # successful Unlock call and we refresh the features there,
388 # so it should be fine to skip it here.
390 logger
.info("Done unlocking Persistent Storage")
392 def do_unlock(self
, passphrase
: str, forceful_fsck
: bool):
393 self
.state
= State
.UNLOCKING
395 # Unlock the Persistent Storage
396 if not self
._tps
_partition
.is_unlocked():
397 self
._tps
_partition
.unlock(passphrase
)
399 # Mount the Persistent Storage
400 cleartext_device
= self
._tps
_partition
.get_cleartext_device()
402 if not cleartext_device
.is_mounted():
403 cleartext_device
.fsck(forceful_fsck
)
404 cleartext_device
.mount(forceful_fsck
)
405 except FilesystemErrorsLeftUncorrectedError
as e
:
406 # Check if there are I/O errors for the device, in which
407 # case we want to raise a different error
408 io_errors_flag_file
= Path(IO_ERRORS_FLAG_FILE_PATH
)
409 # If the flag file already exists, we don't need to check
410 # the journal for I/O errors
411 if io_errors_flag_file
.exists():
412 raise IOErrorsDetectedError() from e
413 # Check the journal messages for IO errors
414 executil
.check_call(["tails-detect-disk-ioerrors", "--oneshot"])
415 if io_errors_flag_file
.exists():
416 raise IOErrorsDetectedError() from e
417 # If there are no I/O errors, we raise the original error
420 # Remove the LUKS header backup if it exists. It's not needed
421 # anymore and we don't want to keep it around to avoid that the
422 # master key can be recovered from it (for example in case that
423 # the user changes the passphrase of the Persistent Storage).
425 # Just unlinking the header allows it to be recovered until the
426 # physical memory is overwritten. Secure deletion on flash storage
427 # is a hard problem, simply overwriting the logical blocks once
428 # is not enough to ensure that the data is gone because of wear
429 # leveling. We still overwrite the header once with random data
430 # because that's what cryptsetup does when deleting a LUKS header
431 # on a non-rotational device (as of cryptsetup 2.6.1), see
432 # https://salsa.debian.org/cryptsetup-team/cryptsetup/-/blob/e99903d881ad15abacf16ffcb23207b85c052d55/lib/utils_wipe.c#L237-237
433 luks_header_backup
= Path(LUKS_HEADER_BACKUP_PATH
)
434 if luks_header_backup
.exists():
435 with self
.ensure_system_partition_mounted_read_write():
436 self
.erase_luks_header_backup()
438 def UpgradeLUKS(self
, passphrase
: str):
439 """Upgrade the LUKS header and key derivation function.
441 Note that due to wear leveling, the old LUKS header can likely
442 be restored via forensic analysis (until the physical memory is
445 logger
.info("Upgrading Persistent Storage...")
447 # Check if we can upgrade the Persistent Storage
448 if self
.state
!= State
.NOT_UNLOCKED
:
449 msg
= "Can't upgrade when state is '%s'" % self
.state
.name
450 raise FailedPreconditionError(msg
)
453 # The backup header is stored on the system partition which is
454 # not mounted read-write by default, so we need to mount it
456 with self
.ensure_system_partition_mounted_read_write():
457 self
.do_upgrade_luks(passphrase
)
459 self
.refresh_state(overwrite_in_progress
=True)
461 logger
.info("Done upgrading Persistent Storage")
463 def do_upgrade_luks(self
, passphrase
: str):
464 # This is for debugging #19728 and #19734 during the Tails
465 # 5.15 cycle. Since debug logging is enabled the output of
466 # this command will be logged in full.
467 executil
.check_call(["free"])
469 self
.state
= State
.UPGRADING
471 # Check that the passphrase is correct and the header is intact
472 self
._tps
_partition
.test_passphrase(passphrase
)
474 # To avoid that the backup call below fails because the file
475 # already exists, we remove the backup file if it exists. We
476 # don't need the backup file because we just checked that the
478 luks_header_backup
= Path(LUKS_HEADER_BACKUP_PATH
)
479 if luks_header_backup
.exists():
480 self
.erase_luks_header_backup()
482 # Create a backup of the LUKS header in case something goes
483 # wrong during the upgrade. If we can't successfully unlock the
484 # Persistent Storage after the upgrade, the backup header is
485 # automatically restored by the Unlock method. It is removed
486 # when Persistent Storage was successfully unlocked.
487 self
._tps
_partition
.backup_luks_header()
489 # Check that the backup header is intact
491 self
._tps
_partition
.test_passphrase(passphrase
, luks_header_backup
)
492 except Exception as e
:
493 # Remove the backup header because it is not intact, so we
494 # can't restore it if something goes wrong during the upgrade.
495 self
.erase_luks_header_backup()
498 # Upgrade the LUKS header and key derivation function
499 self
._tps
_partition
.upgrade_luks2()
500 self
._tps
_partition
.convert_pbkdf_argon2id(passphrase
)
502 def ChangePassphrase(self
, passphrase
: str, new_passphrase
: str):
503 """Change the passphrase of the Persistent Storage encrypted
506 logger
.info("Changing passphrase...")
509 partition
= TPSPartition
.find()
510 except (InvalidBootDeviceError
, InvalidPartitionError
) as e
:
511 raise NotCreatedError("No Persistent Storage found") from e
513 partition
.change_passphrase(passphrase
, new_passphrase
)
515 logger
.info("Done changing passphrase")
517 def RepairFilesystem(self
):
518 """Do a forceful filesystem check (e2fsck -f -y). Requires the
519 Persistent Storage to be unlocked and not mounted."""
520 logger
.info("Repairing filesystem")
523 partition
= TPSPartition
.find()
524 except (InvalidBootDeviceError
, InvalidPartitionError
) as e
:
525 raise NotCreatedError("No Persistent Storage found") from e
528 self
._cleartext
_device
= partition
.get_cleartext_device()
529 except PartitionNotUnlockedError
as e
:
530 raise NotUnlockedError("Persistent Storage is not unlocked") from e
533 self
._cleartext
_device
.fsck(forceful
=True)
535 self
._cleartext
_device
= None
537 logger
.info("Done repairing filesystem")
539 def AbortRepairFilesystem(self
):
540 """Abort any ongoing filesystem check of the Persistent
544 self
._cleartext
_device
is None
545 or self
._cleartext
_device
.fsck_process
is None
548 "Attempted to abort reparation of filesystem while none was running"
552 logger
.info("Aborting reparation of filesystem")
554 self
._cleartext
_device
.abort_fsck()
556 logger
.info("Done aborting reparation of filesystem")
558 # ----- Exported properties ----- #
561 def State(self
) -> str:
562 """The state of the Persistent Storage"""
563 return self
.state
.name
566 def State(self
, state
: State
):
567 if self
.state
== state
:
571 changed_properties
= {"State": GLib
.Variant("s", state
.name
)}
572 self
.emit_properties_changed_signal(
574 DBUS_SERVICE_INTERFACE
,
579 def Error(self
) -> int:
580 """The error code, if State is ERROR"""
584 def Error(self
, code
: int):
585 if self
._error
== code
:
589 changed_properties
= {"Error": GLib
.Variant("u", code
)}
590 self
.emit_properties_changed_signal(
592 DBUS_SERVICE_INTERFACE
,
597 def IsCreated(self
) -> bool:
598 """Whether the Persistent Storage partition is created."""
602 def IsCreated(self
, value
: bool):
603 if self
._created
== value
:
606 self
._created
= value
607 changed_properties
= {"IsCreated": GLib
.Variant("b", value
)}
608 self
.emit_properties_changed_signal(
610 DBUS_SERVICE_INTERFACE
,
615 def IsUnlocked(self
) -> bool:
616 """Whether the Persistent Storage partition is unlocked and
617 mounted (we also require it to be mounted to avoid having to add
618 a separate property IsMounted)."""
619 # We could use the cached value, self._unlocked, here, but the
620 # cost of refreshing seem to be low here, and the benefit is
621 # that the value will be correct even if the user unlocked the
622 # partition in some other way than by using this service's
623 # Unlock() method (for example GNOME Disks or cryptsetup).
625 return self
._unlocked
628 def IsUnlocked(self
, value
: bool):
629 if self
._unlocked
== value
:
632 self
._unlocked
= value
633 changed_properties
= {"IsUnlocked": GLib
.Variant("b", value
)}
634 self
.emit_properties_changed_signal(
636 DBUS_SERVICE_INTERFACE
,
641 def IsUpgraded(self
) -> bool:
642 """Whether the LUKS header and key derivation function have
643 been upgraded to LUKS2 and argon2id"""
645 return self
._upgraded
648 def IsUpgraded(self
, value
: bool):
649 if self
._upgraded
== value
:
652 self
._upgraded
= value
653 changed_properties
= {"IsUpgraded": GLib
.Variant("b", value
)}
654 self
.emit_properties_changed_signal(
656 DBUS_SERVICE_INTERFACE
,
661 def CanUnlock(self
) -> bool:
662 """Whether the Persistent Storage can be unlocked"""
664 return self
._can
_unlock
667 def CanUnlock(self
, value
: bool):
668 if self
._can
_unlock
== value
:
671 self
._can
_unlock
= value
672 changed_properties
= {"CanUnlock": GLib
.Variant("b", value
)}
673 self
.emit_properties_changed_signal(
675 DBUS_SERVICE_INTERFACE
,
680 def CanDelete(self
) -> bool:
681 """Whether the Persistent Storage can be deleted"""
683 return self
._can
_delete
686 def CanDelete(self
, value
: bool):
687 if self
._can
_delete
== value
:
690 self
._can
_delete
= value
691 changed_properties
= {"CanDelete": GLib
.Variant("b", value
)}
692 self
.emit_properties_changed_signal(
694 DBUS_SERVICE_INTERFACE
,
699 def BootDeviceIsSupported(self
) -> bool:
700 return bool(self
._boot
_device
)
703 def Device(self
) -> str:
707 def Device(self
, value
: str):
708 if self
._device
== value
:
712 changed_properties
= {"Device": GLib
.Variant("s", value
)}
713 self
.emit_properties_changed_signal(
715 DBUS_SERVICE_INTERFACE
,
720 def Job(self
) -> str:
721 return self
._job
.dbus_path
if self
._job
else "/"
724 def Job(self
, job
: "Job"):
726 changed_properties
= {"Job": GLib
.Variant("s", self
.Job
)}
727 self
.emit_properties_changed_signal(
729 DBUS_SERVICE_INTERFACE
,
733 # ----- Non-exported functions ----- #
736 """Start the Persistent Storage service."""
738 self
.register(self
.connection
)
740 # Create the object manager
741 object_manager_path
= DBUS_FEATURES_PATH
742 self
.object_manager
= Gio
.DBusObjectManagerServer(
743 object_path
=object_manager_path
,
746 for FeatureClass
in features
.get_classes():
747 feature
= FeatureClass(self
)
748 feature
.register(self
.connection
)
749 self
.object_manager
.export(
750 Gio
.DBusObjectSkeleton
.new(feature
.dbus_path
)
752 self
.features
.append(feature
)
754 # Export the object manager on the connection. We do this
755 # after exporting the features above to avoid
756 # InterfacesAdded signals being emitted.
757 self
.object_manager
.set_connection(self
.connection
)
759 self
.refresh_features()
761 logger
.debug("Done registering objects")
768 logger
.debug("Exiting")
772 # Wait until all pending events on the main loop were handled
773 context
= self
.mainloop
.get_context() # type: GLib.MainContext
774 while context
.iteration(may_block
=False):
775 logger
.debug("Waiting for mainloop events to be handled")
778 def enable_feature(self
, feature
: Feature
):
779 with self
.enable_features_lock
:
780 enabled_features
= [ftr
for ftr
in self
.features
if ftr
.IsEnabled
]
781 self
.config_file
.save([*enabled_features
, feature
])
782 feature
.refresh_state(["IsEnabled"])
783 if not feature
.IsEnabled
:
784 msg
= f
"Failed to enable feature '{feature.Id}' in config file"
785 raise ActivationFailedError(msg
)
787 def disable_feature(self
, feature
: Feature
):
788 with self
.enable_features_lock
:
789 enabled_features
= [ftr
for ftr
in self
.features
if ftr
.IsEnabled
]
790 enabled_features
.remove(feature
)
791 self
.config_file
.save(enabled_features
)
792 feature
.refresh_state(["IsEnabled"])
793 if feature
.IsEnabled
:
794 msg
= f
"Failed to disable feature '{feature.Id}' in config file"
795 raise DeactivationFailedError(msg
)
797 def refresh_features(self
):
798 # Refresh custom features
800 if self
.config_file
.exists():
801 bindings
= self
.config_file
.parse()
803 binding
for feature
in self
.features
for binding
in feature
.Bindings
806 binding
for binding
in bindings
if binding
not in known_bindings
808 for i
, binding
in enumerate(unknown_bindings
):
810 class CustomFeature(Feature
):
811 Id
= f
"CustomFeature{i}"
812 translatable_name
= f
"Custom Feature ({binding.dest_orig})"
813 Description
= str(binding
.dest_orig
)
814 Bindings
= [binding
] # noqa: RUF012
816 custom_feature
= CustomFeature(self
, is_custom
=True)
817 custom_feature
.register(self
.connection
)
818 self
.object_manager
.export(
819 Gio
.DBusObjectSkeleton
.new(custom_feature
.dbus_path
)
821 self
.features
.append(custom_feature
)
823 # Remove the ones whose binding entry was removed from the config
825 custom_features
= [f
for f
in self
.features
if f
.is_custom
]
826 for known_custom_feature
in custom_features
:
827 if known_custom_feature
.Bindings
[0] not in bindings
:
828 known_custom_feature
.unregister(self
.connection
)
829 self
.object_manager
.unexport(known_custom_feature
.dbus_path
)
830 self
.features
.remove(known_custom_feature
)
832 # Refresh state of all features
834 for feature
in self
.features
:
836 feature
.refresh_state(emit_properties_changed_signal
=True)
837 except Exception as e
:
838 logger
.exception("Failed to refresh state of feature")
843 def refresh_state(self
, overwrite_in_progress
: bool = False):
844 if not self
._boot
_device
:
845 # The boot device doesn't exist, which is already handled
846 # in the __init__ method, so we don't change the state here
849 # Don't overwrite the state if we're in the middle of something.
850 # In all methods which set any of these states, we ensure that
851 # the state is set to something else when the operation is done
853 if not overwrite_in_progress
and self
.state
in IN_PROGRESS_STATES
:
856 # Check if the partition exists
858 self
._tps
_partition
= TPSPartition
.find()
859 except (InvalidBootDeviceError
, InvalidPartitionError
):
860 self
._tps
_partition
= None
861 self
.State
= State
.NOT_CREATED
863 self
.IsCreated
= False
864 self
.IsUnlocked
= False
865 self
.IsUpgraded
= False
867 num_partitions
= len(self
._boot
_device
.partition_table
.props
.partitions
)
868 if num_partitions
> 1:
869 logger
.error("Too many partitions: %i", num_partitions
)
870 self
._boot
_device
= None
871 self
.Error
= InvalidBootDeviceErrorType
.TOO_MANY_PARTITIONS
875 self
.Device
= self
._tps
_partition
.device_path
876 self
.IsCreated
= True
877 self
.IsUpgraded
= self
._tps
_partition
.is_upgraded()
879 # Check if the partition is unlocked and mounted
880 if not self
._tps
_partition
.is_unlocked_and_mounted():
881 self
.State
= State
.NOT_UNLOCKED
882 self
.IsUnlocked
= False
883 if self
._boot
_device
.block
.props
.read_only
:
884 logger
.error("Boot device is read-only")
885 self
.CanDelete
= False
886 self
.CanUnlock
= False
887 self
.Error
= InvalidBootDeviceErrorType
.READ_ONLY
889 self
.CanDelete
= True
890 self
.CanUnlock
= True
893 self
.State
= State
.UNLOCKED
894 self
.IsUnlocked
= True
897 def run_on_activated_hooks():
898 executil
.execute_hooks(ON_ACTIVATED_HOOKS_DIR
)
901 def erase_luks_header_backup():
902 luks_header_backup
= Path(LUKS_HEADER_BACKUP_PATH
)
910 str(luks_header_backup
),
914 @contextlib.contextmanager
915 def ensure_system_partition_mounted_read_write(self
):
916 """Mount the system partition read-write if it isn't already,
917 and remount it read-only when the context manager exits."""
919 executil
.check_output(
926 SYSTEM_PARTITION_MOUNT_POINT
,
932 already_mounted_read_write
= "rw" in mount_options
934 if not already_mounted_read_write
:
940 SYSTEM_PARTITION_MOUNT_POINT
,
947 if not already_mounted_read_write
:
953 SYSTEM_PARTITION_MOUNT_POINT
,