System#
acoupi.system
#
Functions that manage Acoupi system.
This module contains utility functions for acoupi programs such as loading programs and getting celery apps from programs.
Classes#
AcoupiStatus
#
Bases: BaseModel
Model representing the status of the Acoupi system.
Attributes:
Name | Type | Description |
---|---|---|
acoupi_service |
ServiceStatus
|
The status of the acoupi systemd service. |
beat_service |
ServiceStatus
|
The status of the beat systemd service. |
celery |
CeleryStatus
|
The status of the Celery workers. |
program |
ProgramStatus
|
The status of the acoupi program. |
Settings
#
Bases: BaseSettings
Settings for acoupi system.
Attributes#
acoupi_beat_service_file: str = 'acoupi-beat.service'
class-attribute
instance-attribute
#
acoupi_service_file: str = 'acoupi.service'
class-attribute
instance-attribute
#
app_name: str = 'app'
class-attribute
instance-attribute
#
beat_script_path: Path = home / 'bin' / 'acoupi-beat.sh'
class-attribute
instance-attribute
#
celery_config_file: Path = home / 'config' / 'celery.json'
class-attribute
instance-attribute
#
celery_pool_type: Literal['threads', 'prefork', 'eventlet', 'gevent', 'solo', 'processes'] = 'threads'
class-attribute
instance-attribute
#
deployment_file: Path = home / 'config' / 'deployment.json'
class-attribute
instance-attribute
#
env_file: Path = home / 'config' / 'env'
class-attribute
instance-attribute
#
home: Path = Path.home() / '.acoupi'
class-attribute
instance-attribute
#
log_dir: Path = home / 'log'
class-attribute
instance-attribute
#
log_level: str = 'INFO'
class-attribute
instance-attribute
#
model_config = SettingsConfigDict(env_prefix='ACOUPI_')
class-attribute
instance-attribute
#
program_config_file: Path = home / 'config' / 'program.json'
class-attribute
instance-attribute
#
program_file: Path = home / app_name + '.py'
class-attribute
instance-attribute
#
program_name_file: Path = home / 'config' / 'name'
class-attribute
instance-attribute
#
restart_script_path: Path = home / 'bin' / 'acoupi-workers-restart.sh'
class-attribute
instance-attribute
#
run_dir: Path = home / 'run'
class-attribute
instance-attribute
#
start_script_path: Path = home / 'bin' / 'acoupi-workers-start.sh'
class-attribute
instance-attribute
#
stop_script_path: Path = home / 'bin' / 'acoupi-workers-stop.sh'
class-attribute
instance-attribute
#
Functions#
delete_recording(recording)
#
Delete the recording.
disable_services(settings, path=None, **kwargs)
#
Disable acoupi services.
dump_config(config, indent=2)
#
Dump a configuration object to a JSON string.
enable_services(settings, path=None, **kwargs)
#
Enable acoupi services.
end_deployment(settings)
#
End current deployment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
settings |
Settings
|
The settings to use. |
required |
Returns:
Type | Description |
---|---|
Deployment
|
The ended deployment. |
Raises:
Type | Description |
---|---|
DeploymentError
|
If the deployment has already ended or has not been started yet. |
get_celery_app(settings)
#
Get the currently setup celery app.
get_celery_status(settings)
#
get_config_field(config, field)
#
Retrieve a field or nested field from a configuration object.
This function allows you to access configuration values by specifying the field name, including nested fields using dot notation (e.g., 'section.subsection.value'). If no field is provided, the entire configuration object is returned.
It is possible to get nested fields by using a dot notation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
BaseModel
|
The configuration object, an instance of a Pydantic model. |
required |
field |
str
|
The name of the field to retrieve. Use dot notation for nested fields. |
required |
Returns:
Type | Description |
---|---|
Any
|
The value of the specified field. The type depends on the field's definition in the Pydantic model. |
Raises:
Type | Description |
---|---|
AttributeError
|
If the specified |
IndexError
|
If the |
NotImplementedError
|
If the |
Examples:
Accessing a root-level field:
>>> class Config(BaseModel):
... a: int
... b: str
>>> config = Config(a=1, b="2")
>>> get_config_field(config, "a")
>>> 1
Accessing a nested field:
>>> class NestedConfig(BaseModel):
... d: bool
>>> class Config(BaseModel):
... a: int
... b: str
... c: NestedConfig
>>> config = Config(a=1, b="2", c=NestedConfig(d=True))
>>> get_config_field(config, "c.d")
>>> True
Accessing an element from a list:
get_current_deployment(settings)
#
Get current deployment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
settings |
Settings
|
The settings to use. |
required |
Returns:
Type | Description |
---|---|
Deployment
|
The current deployment. |
Raises:
Type | Description |
---|---|
DeploymentError
|
If the deployment file does not exist or the deployment has already ended. |
get_status(settings)
#
Get the current status of the Acoupi system.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
settings |
Settings
|
The settings object containing configuration paths. |
required |
Returns:
Type | Description |
---|---|
AcoupiStatus
|
An object containing the status of various components of the Acoupi system. |
get_task_list(program, include_celery_tasks=False)
#
Return a list of all the tasks registered in the current program.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
program |
AcoupiProgram
|
The AcoupiProgram instance to get the tasks from. |
required |
include_celery_tasks |
bool
|
Whether to include celery tasks in the list. Defaults to False. |
False
|
Returns:
Type | Description |
---|---|
List[str]
|
List of the names of available tasks. |
Notes
Celery registers a number of tasks by default, which can be excluded
from the list by setting include_celery_tasks
to False
.
get_temp_file_id(path)
#
Get the temporary recording UUID from the path.
get_temp_files(path=TEMP_PATH)
#
Get the list of temporary recordings.
Temporary recordings are stored in memory to avoid unnecessary writes to the SD card.
is_configured(settings)
#
Check if acoupi is configured.
load_config(path, schema)
#
Load config from file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Path
|
Path to the config file. |
required |
schema |
Type[S]
|
Pydantic model to validate the config. |
required |
Returns:
Type | Description |
---|---|
S
|
The loaded config. |
Raises:
Type | Description |
---|---|
ConfigurationError
|
If the configuration is invalid. |
FileNotFoundError
|
If the config file does not exist. |
load_config_schema(settings)
#
Load the configuration schema for the program.
load_program(settings)
#
Load a fully configured acoupi program.
This function loads the acoupi program class from the specified module, instantiates it, and configures it with the specified config files.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
settings |
Settings
|
The settings object containing the paths to the program and celery config files. |
required |
Returns:
Type | Description |
---|---|
AcoupiProgram
|
A fully configured instance of the acoupi program class. |
Raises:
Type | Description |
---|---|
ProgramNotFoundError
|
If the program module is not found. |
InvalidProgramError
|
If the loaded class is not a valid acoupi program class. |
load_program_class(program_name)
#
Load the acoupi program class from a specified module.
This function searches the given module for a valid AcoupiProgram class. If multiple such classes exist within the module, it prioritizes classes that are not further subclassed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
program_name |
str
|
The name of the module containing the acoupi program class. |
required |
Returns:
Type | Description |
---|---|
Type[AcoupiProgram]
|
The acoupi program class. |
Raises:
Type | Description |
---|---|
ProgramNotFoundError
|
If the specified program module is not found. |
InvalidProgramError
|
If the loaded class is not a valid acoupi program class. |
Notes
When multiple AcoupiProgram classes are found within the module, this function will attempt to select one that is not further subclassed. If multiple such "final" classes exist, a warning will be issued, and the first one encountered will be selected. It is recommended that when exposing AcoupiProgram classes in a module, only one "final" (non-subclassable) class be made available.
move_recording(recording, dest, logger=None)
#
Move the recording to the destination.
parse_config_from_args(schema, args=None, prompt=True)
#
Parse configurations from user provided arguments.
This function will parse the configurations from the user provided in the command line arguments. It will return the parsed configuration.
It will override the default configuration with the user provided configuration, and ask the user for any missing configuration.
This function will raise an error if the user provided configuration is invalid.
Args: schema: The configuration schema to use. args: The arguments to parse. If None, will use an empty list. prompt: Whether to prompt the user for missing configuration.
Returns:
Type | Description |
---|---|
config: The parsed configuration.
|
|
profile_task(program, task_name)
#
Profile a task from the current program.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
program |
AcoupiProgram
|
The AcoupiProgram instance to profile the task from. |
required |
task_name |
str
|
The name of the task to profile. |
required |
Raises:
Type | Description |
---|---|
ValueError
|
If the specified task is not found. |
Notes
This function uses cProfile to profile the task. The output can
be saved to a file by providing the output
parameter. The task
is run in the current Python process and does not send the task to
the Celery workers, so the profiling will only show the performance
of the task without the overhead of the Celery workers.
purge_queues(settings)
#
Purge all messages from all queues.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
settings |
Settings
|
The settings object containing configuration for Celery. |
required |
Returns:
Type | Description |
---|---|
CompletedProcess
|
The result of the command execution. |
restart_workers(settings, log_level=None)
#
Restart the Celery workers.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
settings |
Settings
|
The current acoupi settings. |
required |
log_level |
Optional[Literal]
|
The log level for the workers, by default None. |
None
|
Returns:
Type | Description |
---|---|
CompletedProcess
|
The result of the subprocess.run function. |
run_celery_command(settings, args, with_app=True, quiet=False, **kwargs)
#
run_task(program, task_name, recording=None)
#
Run a task from the current program.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
program |
AcoupiProgram
|
The AcoupiProgram instance to run the task from. |
required |
task_name |
str
|
The name of the task to run. |
required |
Raises:
Type | Description |
---|---|
ValueError
|
If the specified task is not found. |
Notes
This function runs the task in the current Python process and does not
send the task to the Celery workers. This can be helpful for testing a
task without setting up Celery workers. To run the task through Celery
workers, use the acoupi celery call <task_name>
command.
services_are_installed(settings, path=None)
#
Check if acoupi services are installed.
set_config_field(config, field, value, is_json=False, strict=False, from_attributes=False)
#
Set a specific field in a config object.
This function enables you to update configuration values by specifying the
field name, including nested fields using dot notation
(e.g., 'section.subsection.value'). The provided value
is validated
against the field's type definition in the Pydantic model to ensure data
integrity.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
S
|
The configuration object to be modified. Must be an instance of a Pydantic model. |
required |
field |
str
|
The name of the field to set. Use dot notation for nested fields. If an empty string, the entire configuration is replaced. This is useful when setting the configuration from a JSON string. |
required |
value |
Any
|
The new value to assign to the specified field. The type should be compatible with the field's definition in the Pydantic model. |
required |
is_json |
bool
|
If True, the |
False
|
strict |
bool
|
If True, type coercion is disabled, and the |
False
|
from_attributes |
bool
|
Relevant only when setting a field that is itself a Pydantic model. If
True, the |
False
|
Returns:
Type | Description |
---|---|
S
|
A new configuration object with the specified field modified. The
original |
Raises:
Type | Description |
---|---|
ValidationError
|
If the provided |
AttributeError
|
If the specified |
IndexError
|
If the |
NotImplementedError
|
If the |
Examples:
Setting a root-level field:
>>> class Config(BaseModel):
... a: int
... b: str
>>> config = Config(a=1, b="2")
>>> new_config = set_config_field(config, "a", 3)
>>> new_config.a
>>> 3
Setting a nested field:
>>> class NestedConfig(BaseModel):
... d: bool
>>> class Config(BaseModel):
... a: int
... b: str
... c: NestedConfig
>>> config = Config(a=1, b="2", c=NestedConfig(d=True))
>>> new_config = set_config_field(config, "c.d", False)
>>> new_config.c.d
Setting an element in a list:
>>> class Config(BaseModel):
... a: List[int]
>>> config = Config(a=[1, 2, 3])
>>> new_config = set_config_field(config, "a.0", 4)
>>> new_config.a[0]
>>> 4
Setting a field from a JSON string:
setup_program(settings, program_name, args=None, prompt=False)
#
Set up an Acoupi Program.
start_deployment(settings, name, latitude=None, longitude=None)
#
Start a new deployment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
settings |
Settings
|
The settings to use. |
required |
name |
str
|
The name of the deployment. |
required |
Returns:
Type | Description |
---|---|
Deployment
|
|
Raises:
Type | Description |
---|---|
DeploymentError
|
If Acoupi has already been deployed. |
ValueError
|
If the deployment data is invalid. i.e. the latitude or longitude is invalid. |
start_program(settings, name, latitude=None, longitude=None)
#
start_services(settings, path=None, **kwargs)
#
Start acoupi services.
start_workers(settings, pool='threads', log_level=None)
#
Start the Celery workers.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
settings |
Settings
|
The current acoupi settings. |
required |
pool |
Literal
|
The pool type for the workers, by default "threads". |
'threads'
|
log_level |
Optional[Literal]
|
The log level for the workers, by default None. |
None
|
Returns:
Type | Description |
---|---|
CompletedProcess
|
The result of the subprocess.run function. |
status_services(settings, path=None, **kwargs)
#
Stop acoupi services.
stop_program(settings)
#
stop_services(settings, path=None, **kwargs)
#
Stop acoupi services.
stop_workers(settings, log_level=None)
#
Stop the Celery workers.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
settings |
Settings
|
The current acoupi settings. |
required |
log_level |
Optional[Literal]
|
The log level for the workers, by default None. |
None
|
Returns:
Type | Description |
---|---|
CompletedProcess
|
The result of the subprocess.run function. |
write_config(config, path)
#
Write config to file.
write_program_file(program_name, settings)
#
Write the python script with the celery app to run.
acoupi.system.apps
#
acoupi.system.celery
#
Classes#
CeleryState
#
CeleryStatus
#
WorkerState
#
WorkerStatus
#
Functions#
get_celery_bin()
#
Return the path to the celery binary.
get_celery_status(settings)
#
purge_queue(settings, queue_name)
#
Purge all messages from the specified queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
settings |
Settings
|
The settings object containing configuration for Celery. |
required |
queue_name |
str
|
The name of the queue to purge. |
required |
Returns:
Type | Description |
---|---|
CompletedProcess
|
The result of the command execution. |
purge_queues(settings)
#
Purge all messages from all queues.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
settings |
Settings
|
The settings object containing configuration for Celery. |
required |
Returns:
Type | Description |
---|---|
CompletedProcess
|
The result of the command execution. |
restart_workers(settings, log_level=None)
#
Restart the Celery workers.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
settings |
Settings
|
The current acoupi settings. |
required |
log_level |
Optional[Literal]
|
The log level for the workers, by default None. |
None
|
Returns:
Type | Description |
---|---|
CompletedProcess
|
The result of the subprocess.run function. |
run_celery_command(settings, args, with_app=True, quiet=False, **kwargs)
#
start_workers(settings, pool='threads', log_level=None)
#
Start the Celery workers.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
settings |
Settings
|
The current acoupi settings. |
required |
pool |
Literal
|
The pool type for the workers, by default "threads". |
'threads'
|
log_level |
Optional[Literal]
|
The log level for the workers, by default None. |
None
|
Returns:
Type | Description |
---|---|
CompletedProcess
|
The result of the subprocess.run function. |
stop_workers(settings, log_level=None)
#
Stop the Celery workers.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
settings |
Settings
|
The current acoupi settings. |
required |
log_level |
Optional[Literal]
|
The log level for the workers, by default None. |
None
|
Returns:
Type | Description |
---|---|
CompletedProcess
|
The result of the subprocess.run function. |
acoupi.system.constants
#
Path constants for acoupi system.
Classes#
CeleryConfig
#
Bases: BaseModel
Configuration settings for Celery in Acoupi.
This class defines the settings used to configure the Celery task queue specifically for the Acoupi application.
Attributes#
accept_content: List[str] = Field(default_factory=lambda: ['pickle'])
class-attribute
instance-attribute
#
broker_connection_retry_on_startup: bool = True
class-attribute
instance-attribute
#
Retry to establish the connection to the AMQP broker on startup.
Automatically try to re-establish the connection to the AMQP broke if lost after the initial connection is made.
broker_url: str = 'pyamqp://guest@localhost//'
class-attribute
instance-attribute
#
The URL of the message broker used by Celery.
Acoupi uses RabbitMQ with the default guest user. You may need to update this if your RabbitMQ setup is different.
enable_utc: bool = True
class-attribute
instance-attribute
#
Whether to enable UTC for Celery.
It's generally recommended to keep this enabled for consistency across different Acoupi deployments.
result_backend: str = 'rpc://'
class-attribute
instance-attribute
#
The URL for storing task results.
'rpc://' indicates that results are sent back directly to the client.
result_persistent: bool = False
class-attribute
instance-attribute
#
Whether to persist task results.
In Acoupi deployments, task results are not typically needed after the task has completed, as all essential data is stored in an independent database. This setting helps to avoid unnecessary storage overhead.
result_serializer: str = 'pickle'
class-attribute
instance-attribute
#
task_acks_late: bool = True
class-attribute
instance-attribute
#
Whether to acknowledge tasks after they have been executed.
True means that tasks are acknowledged after they have been executed, not right before.
task_serializer: str = 'pickle'
class-attribute
instance-attribute
#
task_soft_time_limit: int = 30
class-attribute
instance-attribute
#
The soft time limit (in seconds) for task execution.
If a task exceeds this limit, it will receive a warning.
If you have tasks that are expected to run longer than this limit, you should increase this value or specify the time limit directly.
task_time_limit: int = 60
class-attribute
instance-attribute
#
The hard time limit (in seconds) for task execution.
If a task exceeds this limit, it will be terminated.
If you have tasks that are expected to run longer than this limit, you should increase this value or specify the time limit directly.
timezone: str = 'UTC'
class-attribute
instance-attribute
#
The timezone to use for Celery.
worker_prefetch_multiplier: int = 1
class-attribute
instance-attribute
#
The number of tasks a worker can prefetch.
Setting this to 1 prevents tasks from being delayed due to other tasks in the queue. Celery defaults to prefetching tasks in batches, which can cause a fast task to wait for a slower one in the same batch.
Settings
#
Bases: BaseSettings
Settings for acoupi system.
Attributes#
acoupi_beat_service_file: str = 'acoupi-beat.service'
class-attribute
instance-attribute
#
acoupi_service_file: str = 'acoupi.service'
class-attribute
instance-attribute
#
app_name: str = 'app'
class-attribute
instance-attribute
#
beat_script_path: Path = home / 'bin' / 'acoupi-beat.sh'
class-attribute
instance-attribute
#
celery_config_file: Path = home / 'config' / 'celery.json'
class-attribute
instance-attribute
#
celery_pool_type: Literal['threads', 'prefork', 'eventlet', 'gevent', 'solo', 'processes'] = 'threads'
class-attribute
instance-attribute
#
deployment_file: Path = home / 'config' / 'deployment.json'
class-attribute
instance-attribute
#
env_file: Path = home / 'config' / 'env'
class-attribute
instance-attribute
#
home: Path = Path.home() / '.acoupi'
class-attribute
instance-attribute
#
log_dir: Path = home / 'log'
class-attribute
instance-attribute
#
log_level: str = 'INFO'
class-attribute
instance-attribute
#
model_config = SettingsConfigDict(env_prefix='ACOUPI_')
class-attribute
instance-attribute
#
program_config_file: Path = home / 'config' / 'program.json'
class-attribute
instance-attribute
#
program_file: Path = home / app_name + '.py'
class-attribute
instance-attribute
#
program_name_file: Path = home / 'config' / 'name'
class-attribute
instance-attribute
#
restart_script_path: Path = home / 'bin' / 'acoupi-workers-restart.sh'
class-attribute
instance-attribute
#
run_dir: Path = home / 'run'
class-attribute
instance-attribute
#
start_script_path: Path = home / 'bin' / 'acoupi-workers-start.sh'
class-attribute
instance-attribute
#
stop_script_path: Path = home / 'bin' / 'acoupi-workers-stop.sh'
class-attribute
instance-attribute
#
acoupi.system.deployments
#
Functions that manage Acoupi system.
This module contains utility functions for acoupi programs such as loading programs and getting celery apps from programs.
Classes#
Functions#
end_deployment(settings)
#
End current deployment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
settings |
Settings
|
The settings to use. |
required |
Returns:
Type | Description |
---|---|
Deployment
|
The ended deployment. |
Raises:
Type | Description |
---|---|
DeploymentError
|
If the deployment has already ended or has not been started yet. |
get_current_deployment(settings)
#
Get current deployment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
settings |
Settings
|
The settings to use. |
required |
Returns:
Type | Description |
---|---|
Deployment
|
The current deployment. |
Raises:
Type | Description |
---|---|
DeploymentError
|
If the deployment file does not exist or the deployment has already ended. |
load_deployment_from_file(path)
#
Load deployment from file.
The deployment is loaded from a JSON file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Path
|
The path to load the deployment from. |
required |
Returns:
Type | Description |
---|---|
Deployment
|
The loaded deployment. |
Raises:
Type | Description |
---|---|
ValueError
|
If the deployment data is invalid. |
FileNotFoundError
|
If the deployment file does not exist. |
start_deployment(settings, name, latitude=None, longitude=None)
#
Start a new deployment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
settings |
Settings
|
The settings to use. |
required |
name |
str
|
The name of the deployment. |
required |
Returns:
Type | Description |
---|---|
Deployment
|
|
Raises:
Type | Description |
---|---|
DeploymentError
|
If Acoupi has already been deployed. |
ValueError
|
If the deployment data is invalid. i.e. the latitude or longitude is invalid. |
write_deployment_to_file(deployment, path)
#
Write deployment to file.
The deployment is written as a JSON file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
Deployment
|
The deployment to write to file. |
required |
path |
Path
|
The path to write the deployment to. |
required |
acoupi.system.exceptions
#
Custom Exceptions for Acoupi system.
Classes#
ConfigurationError(message, help=None)
#
DeploymentError(message)
#
HealthCheckError(message)
#
InvalidProgramError(program)
#
ParameterError(value, message, help=None)
#
acoupi.system.files
#
Functions to handle files.
Attributes#
DEFAULT_AUDIO_STORAGE = Path.home() / 'audio'
module-attribute
#
TEMP_PATH = Path('/run/shm/')
module-attribute
#
logger = logging.getLogger(__name__)
module-attribute
#
Functions#
delete_recording(recording)
#
Delete the recording.
get_temp_dir(in_memory=True)
#
Get a temporary directory where to store recordings.
Recordings are usually stored in a staging temporary directory before being moved to the final storage location. This function returns the path of the directory where the temporary recordings are stored.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
in_memory |
bool
|
If True, the temporary files will be stored in memory. |
True
|
Notes
It is recommended to store temporary files in memory to reduce the number of writes to the SD card. It can also be useful in scenarios where recordings should not be kept in disk for legal reasons.
If in_memory
is set to True but the system does not support
in-memory storage, the function will return the default temporary
and show a warning.
get_temp_file(path)
#
Get the temporary recording UUID from the path.
get_temp_file_id(path)
#
Get the temporary recording UUID from the path.
get_temp_files(path=TEMP_PATH)
#
Get the list of temporary recordings.
Temporary recordings are stored in memory to avoid unnecessary writes to the SD card.
get_temp_files_paths(path)
#
Get the temporary recording path.
move_recording(recording, dest, logger=None)
#
Move the recording to the destination.
acoupi.system.lifecycle
#
acoupi.system.programs
#
System functions for managing Acoupi programs.
Attributes#
Classes#
ProgramState
#
Functions#
get_program_state(settings)
#
load_config_schema(settings)
#
Load the configuration schema for the program.
load_program(settings)
#
Load a fully configured acoupi program.
This function loads the acoupi program class from the specified module, instantiates it, and configures it with the specified config files.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
settings |
Settings
|
The settings object containing the paths to the program and celery config files. |
required |
Returns:
Type | Description |
---|---|
AcoupiProgram
|
A fully configured instance of the acoupi program class. |
Raises:
Type | Description |
---|---|
ProgramNotFoundError
|
If the program module is not found. |
InvalidProgramError
|
If the loaded class is not a valid acoupi program class. |
load_program_class(program_name)
#
Load the acoupi program class from a specified module.
This function searches the given module for a valid AcoupiProgram class. If multiple such classes exist within the module, it prioritizes classes that are not further subclassed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
program_name |
str
|
The name of the module containing the acoupi program class. |
required |
Returns:
Type | Description |
---|---|
Type[AcoupiProgram]
|
The acoupi program class. |
Raises:
Type | Description |
---|---|
ProgramNotFoundError
|
If the specified program module is not found. |
InvalidProgramError
|
If the loaded class is not a valid acoupi program class. |
Notes
When multiple AcoupiProgram classes are found within the module, this function will attempt to select one that is not further subclassed. If multiple such "final" classes exist, a warning will be issued, and the first one encountered will be selected. It is recommended that when exposing AcoupiProgram classes in a module, only one "final" (non-subclassable) class be made available.
load_worker_config(settings)
#
Load the configuration schema for the program.
write_program_file(program_name, settings)
#
Write the python script with the celery app to run.
acoupi.system.scripts
#
Classes#
Functions#
get_celery_bin()
#
Return the path to the celery binary.
give_executable_permissions(path)
#
Give executable permissions to a file.
write_beat_script(settings, celery_bin=None)
#
Write the beat script.
write_scripts(config, settings, celery_bin=None)
#
Write the worker scripts.
write_workers_restart_script(config, settings, celery_bin=None)
#
Write the worker restart script.
write_workers_start_script(config, settings, celery_bin=None)
#
Write the worker start script.
write_workers_stop_script(config, settings, celery_bin=None)
#
Write the worker stop script.
acoupi.system.services
#
System functions to manage acoupi services.
Classes#
ServiceStatus
#
Service status.
Attributes#
ACTIVATING = 'activating'
class-attribute
instance-attribute
#
ACTIVE = 'active'
class-attribute
instance-attribute
#
DEACTIVATING = 'deactivating'
class-attribute
instance-attribute
#
FAILED = 'failed'
class-attribute
instance-attribute
#
INACTIVE = 'inactive'
class-attribute
instance-attribute
#
RELOADING = 'reloading'
class-attribute
instance-attribute
#
UNINSTALLED = 'uninstalled'
class-attribute
instance-attribute
#
UNKNOWN = 'unknown'
class-attribute
instance-attribute
#
Functions#
disable_services(settings, path=None, **kwargs)
#
Disable acoupi services.
enable_services(settings, path=None, **kwargs)
#
Enable acoupi services.
get_acoupi_beat_service_status(settings, path=None)
#
Get the status of acoupi services.
get_acoupi_service_status(settings, path=None)
#
Get the status of acoupi services.
get_service_status(service_name)
#
get_user_unit_dir()
#
install_services(settings, path=None)
#
Install acoupi services.
services_are_installed(settings, path=None)
#
Check if acoupi services are installed.
start_services(settings, path=None, **kwargs)
#
Start acoupi services.
status_services(settings, path=None, **kwargs)
#
Stop acoupi services.
stop_services(settings, path=None, **kwargs)
#
Stop acoupi services.
uninstall_services(settings, path=None)
#
Uninstall acoupi services.
acoupi.system.state
#
Functions for accessing the state of the Acoupi system.
Classes#
AcoupiStatus
#
Bases: BaseModel
Model representing the status of the Acoupi system.
Attributes:
Name | Type | Description |
---|---|---|
acoupi_service |
ServiceStatus
|
The status of the acoupi systemd service. |
beat_service |
ServiceStatus
|
The status of the beat systemd service. |
celery |
CeleryStatus
|
The status of the Celery workers. |
program |
ProgramStatus
|
The status of the acoupi program. |
ServicesStatus
#
Functions#
get_status(settings)
#
Get the current status of the Acoupi system.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
settings |
Settings
|
The settings object containing configuration paths. |
required |
Returns:
Type | Description |
---|---|
AcoupiStatus
|
An object containing the status of various components of the Acoupi system. |
is_configured(settings)
#
Check if acoupi is configured.
acoupi.system.tasks
#
Acoupi system tasks module.
This module provides a set of functions to help manage and interact with the tasks of the currently configured acoupi program.
Classes#
Functions#
get_task_list(program, include_celery_tasks=False)
#
Return a list of all the tasks registered in the current program.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
program |
AcoupiProgram
|
The AcoupiProgram instance to get the tasks from. |
required |
include_celery_tasks |
bool
|
Whether to include celery tasks in the list. Defaults to False. |
False
|
Returns:
Type | Description |
---|---|
List[str]
|
List of the names of available tasks. |
Notes
Celery registers a number of tasks by default, which can be excluded
from the list by setting include_celery_tasks
to False
.
profile_task(program, task_name)
#
Profile a task from the current program.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
program |
AcoupiProgram
|
The AcoupiProgram instance to profile the task from. |
required |
task_name |
str
|
The name of the task to profile. |
required |
Raises:
Type | Description |
---|---|
ValueError
|
If the specified task is not found. |
Notes
This function uses cProfile to profile the task. The output can
be saved to a file by providing the output
parameter. The task
is run in the current Python process and does not send the task to
the Celery workers, so the profiling will only show the performance
of the task without the overhead of the Celery workers.
run_task(program, task_name, recording=None)
#
Run a task from the current program.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
program |
AcoupiProgram
|
The AcoupiProgram instance to run the task from. |
required |
task_name |
str
|
The name of the task to run. |
required |
Raises:
Type | Description |
---|---|
ValueError
|
If the specified task is not found. |
Notes
This function runs the task in the current Python process and does not
send the task to the Celery workers. This can be helpful for testing a
task without setting up Celery workers. To run the task through Celery
workers, use the acoupi celery call <task_name>
command.