Skip to content

System#

acoupi.system #

Functions that manage Acoupi system.

This module contains utility functions for acoupi programs such as loading programs and getting celery apps from programs.

Modules:

Name Description
apps

System functions for managing celery apps.

celery
config
constants

Path constants for acoupi system.

deployments

Functions that manage Acoupi system.

exceptions

Custom Exceptions for Acoupi system.

files

Functions to handle files.

lifecycle

System functions for managing Acoupi programs.

programs

System functions for managing Acoupi programs.

scripts
services

System functions to manage acoupi services.

state

Functions for accessing the state of the Acoupi system.

tasks

Acoupi system tasks module.

templates

Jinja2 environment for rendering templates.

Classes:

Name Description
AcoupiStatus

Model representing the status of the Acoupi system.

Settings

Settings for acoupi system.

Functions:

Name Description
delete_recording

Delete the recording.

disable_services

Disable acoupi services.

dump_config

Dump a configuration object to a JSON string.

enable_services

Enable acoupi services.

end_deployment

End current deployment.

get_celery_app

Get the currently setup celery app.

get_celery_status
get_config_field

Retrieve a field or nested field from a configuration object.

get_current_deployment

Get current deployment.

get_status

Get the current status of the Acoupi system.

get_task_list

Return a list of all the tasks registered in the current program.

get_temp_file_id

Get the temporary recording UUID from the path.

get_temp_files

Get the list of temporary recordings.

is_configured

Check if acoupi is configured.

load_config

Load config from file.

load_config_schema

Load the configuration schema for the program.

load_program

Load a fully configured acoupi program.

load_program_class

Load the acoupi program class from a specified module.

move_recording

Move the recording to the destination.

parse_config_from_args

Parse configurations from user provided arguments.

profile_task

Profile a task from the current program.

purge_queues

Purge all messages from all queues.

restart_workers

Restart the Celery workers.

run_celery_command
run_task

Run a task from the current program.

services_are_installed

Check if acoupi services are installed.

set_config_field

Set a specific field in a config object.

setup_program

Set up an Acoupi Program.

start_deployment

Start a new deployment.

start_program
start_services

Start acoupi services.

start_workers

Start the Celery workers.

status_services

Stop acoupi services.

stop_program
stop_services

Stop acoupi services.

stop_workers

Stop the Celery workers.

write_config

Write config to file.

write_program_file

Write the python script with the celery app to run.

Classes#

AcoupiStatus #

Bases: BaseModel

Model representing the status of the Acoupi system.

Attributes:

Name Type Description
acoupi_service ServiceStatus

The status of the acoupi systemd service.

beat_service ServiceStatus

The status of the beat systemd service.

celery CeleryStatus

The status of the Celery workers.

program ProgramStatus

The status of the acoupi program.

Attributes#
celery instance-attribute #
deployment instance-attribute #
program instance-attribute #
services instance-attribute #

Settings #

Bases: BaseSettings

Settings for acoupi system.

Attributes:

Name Type Description
acoupi_beat_service_file str
acoupi_service_file str
app_name str
beat_script_path Path
celery_config_file Path
celery_pool_type Literal['threads', 'prefork', 'eventlet', 'gevent', 'solo', 'processes']
deployment_file Path
env_file Path
home Path
log_dir Path
log_level str
model_config
program_config_file Path
program_file Path
program_name_file Path
restart_script_path Path
run_dir Path
start_script_path Path
stop_script_path Path
Attributes#
acoupi_beat_service_file = 'acoupi-beat.service' class-attribute instance-attribute #
acoupi_service_file = 'acoupi.service' class-attribute instance-attribute #
app_name = 'app' class-attribute instance-attribute #
beat_script_path = home / 'bin' / 'acoupi-beat.sh' class-attribute instance-attribute #
celery_config_file = home / 'config' / 'celery.json' class-attribute instance-attribute #
celery_pool_type = 'threads' class-attribute instance-attribute #
deployment_file = home / 'config' / 'deployment.json' class-attribute instance-attribute #
env_file = home / 'config' / 'env' class-attribute instance-attribute #
home = Path.home() / '.acoupi' class-attribute instance-attribute #
log_dir = home / 'log' class-attribute instance-attribute #
log_level = 'INFO' class-attribute instance-attribute #
model_config = SettingsConfigDict(env_prefix='ACOUPI_') class-attribute instance-attribute #
program_config_file = home / 'config' / 'program.json' class-attribute instance-attribute #
program_file = home / (app_name + '.py') class-attribute instance-attribute #
program_name_file = home / 'config' / 'name' class-attribute instance-attribute #
restart_script_path = home / 'bin' / 'acoupi-workers-restart.sh' class-attribute instance-attribute #
run_dir = home / 'run' class-attribute instance-attribute #
start_script_path = home / 'bin' / 'acoupi-workers-start.sh' class-attribute instance-attribute #
stop_script_path = home / 'bin' / 'acoupi-workers-stop.sh' class-attribute instance-attribute #

Functions#

delete_recording(recording) #

Delete the recording.

disable_services(settings, path=None, **kwargs) #

Disable acoupi services.

dump_config(config, indent=2) #

Dump a configuration object to a JSON string.

enable_services(settings, path=None, **kwargs) #

Enable acoupi services.

end_deployment(settings) #

End current deployment.

Parameters:

Name Type Description Default
settings Settings

The settings to use.

required

Returns:

Type Description
Deployment

The ended deployment.

Raises:

Type Description
DeploymentError

If the deployment has already ended or has not been started yet.

get_celery_app(settings) #

Get the currently setup celery app.

get_celery_status(settings) #

get_config_field(config, field) #

Retrieve a field or nested field from a configuration object.

This function allows you to access configuration values by specifying the field name, including nested fields using dot notation (e.g., 'section.subsection.value'). If no field is provided, the entire configuration object is returned.

It is possible to get nested fields by using a dot notation.

Parameters:

Name Type Description Default
config BaseModel

The configuration object, an instance of a Pydantic model.

required
field str

The name of the field to retrieve. Use dot notation for nested fields.

required

Returns:

Type Description
Any

The value of the specified field. The type depends on the field's definition in the Pydantic model.

Raises:

Type Description
AttributeError

If the specified field does not exist within the configuration schema.

IndexError

If the field refers to an index in a list or tuple that is out of bounds.

NotImplementedError

If the field points to a data type or structure that this function does not currently support for retrieval.

Examples:

Accessing a root-level field:

>>> class Config(BaseModel):
...     a: int
...     b: str
>>> config = Config(a=1, b="2")
>>> get_config_field(config, "a")
>>> 1

Accessing a nested field:

>>> class NestedConfig(BaseModel):
...     d: bool
>>> class Config(BaseModel):
...     a: int
...     b: str
...     c: NestedConfig
>>> config = Config(a=1, b="2", c=NestedConfig(d=True))
>>> get_config_field(config, "c.d")
>>> True

Accessing an element from a list:

>>> class Config(BaseModel):
...     a: List[int]
>>> config = Config(a=[1, 2, 3])
>>> get_config_field(config, "a.0")
>>> 1

get_current_deployment(settings) #

Get current deployment.

Parameters:

Name Type Description Default
settings Settings

The settings to use.

required

Returns:

Type Description
Deployment

The current deployment.

Raises:

Type Description
DeploymentError

If the deployment file does not exist or the deployment has already ended.

get_status(settings) #

Get the current status of the Acoupi system.

Parameters:

Name Type Description Default
settings Settings

The settings object containing configuration paths.

required

Returns:

Type Description
AcoupiStatus

An object containing the status of various components of the Acoupi system.

get_task_list(program, include_celery_tasks=False) #

Return a list of all the tasks registered in the current program.

Parameters:

Name Type Description Default
program AcoupiProgram

The AcoupiProgram instance to get the tasks from.

required
include_celery_tasks bool

Whether to include celery tasks in the list. Defaults to False.

False

Returns:

Type Description
List[str]

List of the names of available tasks.

Notes

Celery registers a number of tasks by default, which can be excluded from the list by setting include_celery_tasks to False.

get_temp_file_id(path) #

Get the temporary recording UUID from the path.

get_temp_files(path=TEMP_PATH) #

Get the list of temporary recordings.

Temporary recordings are stored in memory to avoid unnecessary writes to the SD card.

is_configured(settings) #

Check if acoupi is configured.

load_config(path, schema) #

Load config from file.

Parameters:

Name Type Description Default
path Path

Path to the config file.

required
schema Type[S]

Pydantic model to validate the config.

required

Returns:

Type Description
S

The loaded config.

Raises:

Type Description
ConfigurationError

If the configuration is invalid.

FileNotFoundError

If the config file does not exist.

load_config_schema(settings) #

Load the configuration schema for the program.

load_program(settings) #

Load a fully configured acoupi program.

This function loads the acoupi program class from the specified module, instantiates it, and configures it with the specified config files.

Parameters:

Name Type Description Default
settings Settings

The settings object containing the paths to the program and celery config files.

required

Returns:

Type Description
AcoupiProgram

A fully configured instance of the acoupi program class.

Raises:

Type Description
ProgramNotFoundError

If the program module is not found.

InvalidProgramError

If the loaded class is not a valid acoupi program class.

load_program_class(program_name) #

Load the acoupi program class from a specified module.

This function searches the given module for a valid AcoupiProgram class. If multiple such classes exist within the module, it prioritizes classes that are not further subclassed.

Parameters:

Name Type Description Default
program_name str

The name of the module containing the acoupi program class.

required

Returns:

Type Description
Type[AcoupiProgram]

The acoupi program class.

Raises:

Type Description
ProgramNotFoundError

If the specified program module is not found.

InvalidProgramError

If the loaded class is not a valid acoupi program class.

Notes

When multiple AcoupiProgram classes are found within the module, this function will attempt to select one that is not further subclassed. If multiple such "final" classes exist, a warning will be issued, and the first one encountered will be selected. It is recommended that when exposing AcoupiProgram classes in a module, only one "final" (non-subclassable) class be made available.

move_recording(recording, dest, logger=None) #

Move the recording to the destination.

parse_config_from_args(schema, args=None, prompt=True) #

Parse configurations from user provided arguments.

This function will parse the configurations from the user provided in the command line arguments. It will return the parsed configuration.

It will override the default configuration with the user provided configuration, and ask the user for any missing configuration.

This function will raise an error if the user provided configuration is invalid.

Args: schema: The configuration schema to use. args: The arguments to parse. If None, will use an empty list. prompt: Whether to prompt the user for missing configuration.

Returns:

Type Description
config: The parsed configuration.

profile_task(program, task_name) #

Profile a task from the current program.

Parameters:

Name Type Description Default
program AcoupiProgram

The AcoupiProgram instance to profile the task from.

required
task_name str

The name of the task to profile.

required

Raises:

Type Description
ValueError

If the specified task is not found.

Notes

This function uses cProfile to profile the task. The output can be saved to a file by providing the output parameter. The task is run in the current Python process and does not send the task to the Celery workers, so the profiling will only show the performance of the task without the overhead of the Celery workers.

purge_queues(settings) #

Purge all messages from all queues.

Parameters:

Name Type Description Default
settings Settings

The settings object containing configuration for Celery.

required

Returns:

Type Description
CompletedProcess

The result of the command execution.

restart_workers(settings, log_level=None) #

Restart the Celery workers.

Parameters:

Name Type Description Default
settings Settings

The current acoupi settings.

required
log_level Optional[Literal]

The log level for the workers, by default None.

None

Returns:

Type Description
CompletedProcess

The result of the subprocess.run function.

run_celery_command(settings, args, with_app=True, quiet=False, **kwargs) #

run_task(program, task_name, recording=None) #

Run a task from the current program.

Parameters:

Name Type Description Default
program AcoupiProgram

The AcoupiProgram instance to run the task from.

required
task_name str

The name of the task to run.

required

Raises:

Type Description
ValueError

If the specified task is not found.

Notes

This function runs the task in the current Python process and does not send the task to the Celery workers. This can be helpful for testing a task without setting up Celery workers. To run the task through Celery workers, use the acoupi celery call <task_name> command.

services_are_installed(settings, path=None) #

Check if acoupi services are installed.

set_config_field(config, field, value, is_json=False, strict=False, from_attributes=False) #

Set a specific field in a config object.

This function enables you to update configuration values by specifying the field name, including nested fields using dot notation (e.g., 'section.subsection.value'). The provided value is validated against the field's type definition in the Pydantic model to ensure data integrity.

Parameters:

Name Type Description Default
config S

The configuration object to be modified. Must be an instance of a Pydantic model.

required
field str

The name of the field to set. Use dot notation for nested fields. If an empty string, the entire configuration is replaced. This is useful when setting the configuration from a JSON string.

required
value Any

The new value to assign to the specified field. The type should be compatible with the field's definition in the Pydantic model.

required
is_json bool

If True, the value is treated as a JSON string and parsed before being assigned. Default is False.

False
strict bool

If True, type coercion is disabled, and the value must match the field's type exactly. If False (default), type coercion is attempted if possible.

False
from_attributes bool

Relevant only when setting a field that is itself a Pydantic model. If True, the value is assumed to be an object whose attributes will be used to populate the model. See Pydantic's model_validate documentation for details. Default is False.

False

Returns:

Type Description
S

A new configuration object with the specified field modified. The original config object remains unchanged.

Raises:

Type Description
ValidationError

If the provided value fails validation against the field's type definition in the Pydantic model.

AttributeError

If the specified field does not exist within the configuration schema.

IndexError

If the field refers to an index in a list or tuple that is out of bounds.

NotImplementedError

If the field points to a data type or structure that this function does not currently support for setting.

Examples:

Setting a root-level field:

>>> class Config(BaseModel):
...     a: int
...     b: str
>>> config = Config(a=1, b="2")
>>> new_config = set_config_field(config, "a", 3)
>>> new_config.a
>>> 3

Setting a nested field:

>>> class NestedConfig(BaseModel):
...     d: bool
>>> class Config(BaseModel):
...     a: int
...     b: str
...     c: NestedConfig
>>> config = Config(a=1, b="2", c=NestedConfig(d=True))
>>> new_config = set_config_field(config, "c.d", False)
>>> new_config.c.d

Setting an element in a list:

>>> class Config(BaseModel):
...     a: List[int]
>>> config = Config(a=[1, 2, 3])
>>> new_config = set_config_field(config, "a.0", 4)
>>> new_config.a[0]
>>> 4

Setting a field from a JSON string:

>>> class Config(BaseModel):
...     a: int
...     b: str
>>> config = Config(a=1, b="2")
>>> new_config = set_config_field(
...     config,
...     "",
...     '{"a": 3, "b": "4"}',
...     is_json=True,
... )
>>> new_config.a
>>> 3

setup_program(settings, program_name, args=None, prompt=False) #

Set up an Acoupi Program.

start_deployment(settings, name, latitude=None, longitude=None) #

Start a new deployment.

Parameters:

Name Type Description Default
settings Settings

The settings to use.

required
name str

The name of the deployment.

required

Returns:

Type Description
Deployment

Raises:

Type Description
DeploymentError

If Acoupi has already been deployed.

ValueError

If the deployment data is invalid. i.e. the latitude or longitude is invalid.

start_program(settings, name, latitude=None, longitude=None) #

start_services(settings, path=None, **kwargs) #

Start acoupi services.

start_workers(settings, pool='threads', log_level=None) #

Start the Celery workers.

Parameters:

Name Type Description Default
settings Settings

The current acoupi settings.

required
pool Literal

The pool type for the workers, by default "threads".

'threads'
log_level Optional[Literal]

The log level for the workers, by default None.

None

Returns:

Type Description
CompletedProcess

The result of the subprocess.run function.

status_services(settings, path=None, **kwargs) #

Stop acoupi services.

stop_program(settings) #

stop_services(settings, path=None, **kwargs) #

Stop acoupi services.

stop_workers(settings, log_level=None) #

Stop the Celery workers.

Parameters:

Name Type Description Default
settings Settings

The current acoupi settings.

required
log_level Optional[Literal]

The log level for the workers, by default None.

None

Returns:

Type Description
CompletedProcess

The result of the subprocess.run function.

write_config(config, path) #

Write config to file.

write_program_file(program_name, settings) #

Write the python script with the celery app to run.

acoupi.system.apps #

System functions for managing celery apps.

Functions:

Name Description
get_celery_app

Get the currently setup celery app.

Classes#

Functions#

get_celery_app(settings) #

Get the currently setup celery app.

acoupi.system.celery #

Classes:

Name Description
CeleryState
CeleryStatus
WorkerState
WorkerStatus

Functions:

Name Description
get_celery_bin

Return the path to the celery binary.

get_celery_status
restart_workers

Restart the Celery workers.

run_celery_command
start_workers

Start the Celery workers.

stop_workers

Stop the Celery workers.

Classes#

CeleryState #

Bases: str, Enum

Attributes:

Name Type Description
AVAILABLE
ERROR
UNAVAILABLE
Attributes#
AVAILABLE = 'available' class-attribute instance-attribute #
ERROR = 'error' class-attribute instance-attribute #
UNAVAILABLE = 'unavailable' class-attribute instance-attribute #

CeleryStatus #

Bases: BaseModel

Attributes:

Name Type Description
state CeleryState
workers List[WorkerStatus]
Attributes#
state instance-attribute #
workers = Field(default_factory=list) class-attribute instance-attribute #

WorkerState #

Bases: str, Enum

Attributes:

Name Type Description
NOTOK
OK
Attributes#
NOTOK = 'notok' class-attribute instance-attribute #
OK = 'ok' class-attribute instance-attribute #

WorkerStatus #

Bases: BaseModel

Attributes:

Name Type Description
state WorkerState
worker_name str
Attributes#
state instance-attribute #
worker_name instance-attribute #

Functions#

get_celery_bin() #

Return the path to the celery binary.

get_celery_status(settings) #

purge_queue(settings, queue_name) #

Purge all messages from the specified queue.

Parameters:

Name Type Description Default
settings Settings

The settings object containing configuration for Celery.

required
queue_name str

The name of the queue to purge.

required

Returns:

Type Description
CompletedProcess

The result of the command execution.

purge_queues(settings) #

Purge all messages from all queues.

Parameters:

Name Type Description Default
settings Settings

The settings object containing configuration for Celery.

required

Returns:

Type Description
CompletedProcess

The result of the command execution.

restart_workers(settings, log_level=None) #

Restart the Celery workers.

Parameters:

Name Type Description Default
settings Settings

The current acoupi settings.

required
log_level Optional[Literal]

The log level for the workers, by default None.

None

Returns:

Type Description
CompletedProcess

The result of the subprocess.run function.

run_celery_command(settings, args, with_app=True, quiet=False, **kwargs) #

start_workers(settings, pool='threads', log_level=None) #

Start the Celery workers.

Parameters:

Name Type Description Default
settings Settings

The current acoupi settings.

required
pool Literal

The pool type for the workers, by default "threads".

'threads'
log_level Optional[Literal]

The log level for the workers, by default None.

None

Returns:

Type Description
CompletedProcess

The result of the subprocess.run function.

stop_workers(settings, log_level=None) #

Stop the Celery workers.

Parameters:

Name Type Description Default
settings Settings

The current acoupi settings.

required
log_level Optional[Literal]

The log level for the workers, by default None.

None

Returns:

Type Description
CompletedProcess

The result of the subprocess.run function.

acoupi.system.constants #

Path constants for acoupi system.

Classes:

Name Description
CeleryConfig

Configuration settings for Celery in Acoupi.

Settings

Settings for acoupi system.

Classes#

CeleryConfig #

Bases: BaseModel

Configuration settings for Celery in Acoupi.

This class defines the settings used to configure the Celery task queue specifically for the Acoupi application.

Attributes:

Name Type Description
accept_content List[str]
broker_connection_retry_on_startup bool

Retry to establish the connection to the AMQP broker on startup.

broker_url str

The URL of the message broker used by Celery.

enable_utc bool

Whether to enable UTC for Celery.

result_backend str

The URL for storing task results.

result_persistent bool

Whether to persist task results.

result_serializer str
task_acks_late bool

Whether to acknowledge tasks after they have been executed.

task_serializer str
task_soft_time_limit int

The soft time limit (in seconds) for task execution.

task_time_limit int

The hard time limit (in seconds) for task execution.

timezone str

The timezone to use for Celery.

worker_prefetch_multiplier int

The number of tasks a worker can prefetch.

Attributes#
accept_content = Field(default_factory=(lambda: ['pickle'])) class-attribute instance-attribute #
broker_connection_retry_on_startup = True class-attribute instance-attribute #

Retry to establish the connection to the AMQP broker on startup.

Automatically try to re-establish the connection to the AMQP broke if lost after the initial connection is made.

broker_url = 'pyamqp://guest@localhost//' class-attribute instance-attribute #

The URL of the message broker used by Celery.

Acoupi uses RabbitMQ with the default guest user. You may need to update this if your RabbitMQ setup is different.

enable_utc = True class-attribute instance-attribute #

Whether to enable UTC for Celery.

It's generally recommended to keep this enabled for consistency across different Acoupi deployments.

result_backend = 'rpc://' class-attribute instance-attribute #

The URL for storing task results.

'rpc://' indicates that results are sent back directly to the client.

result_persistent = False class-attribute instance-attribute #

Whether to persist task results.

In Acoupi deployments, task results are not typically needed after the task has completed, as all essential data is stored in an independent database. This setting helps to avoid unnecessary storage overhead.

result_serializer = 'pickle' class-attribute instance-attribute #
task_acks_late = True class-attribute instance-attribute #

Whether to acknowledge tasks after they have been executed.

True means that tasks are acknowledged after they have been executed, not right before.

task_serializer = 'pickle' class-attribute instance-attribute #
task_soft_time_limit = 30 class-attribute instance-attribute #

The soft time limit (in seconds) for task execution.

If a task exceeds this limit, it will receive a warning.

If you have tasks that are expected to run longer than this limit, you should increase this value or specify the time limit directly.

task_time_limit = 60 class-attribute instance-attribute #

The hard time limit (in seconds) for task execution.

If a task exceeds this limit, it will be terminated.

If you have tasks that are expected to run longer than this limit, you should increase this value or specify the time limit directly.

timezone = 'UTC' class-attribute instance-attribute #

The timezone to use for Celery.

worker_prefetch_multiplier = 1 class-attribute instance-attribute #

The number of tasks a worker can prefetch.

Setting this to 1 prevents tasks from being delayed due to other tasks in the queue. Celery defaults to prefetching tasks in batches, which can cause a fast task to wait for a slower one in the same batch.

Settings #

Bases: BaseSettings

Settings for acoupi system.

Attributes:

Name Type Description
acoupi_beat_service_file str
acoupi_service_file str
app_name str
beat_script_path Path
celery_config_file Path
celery_pool_type Literal['threads', 'prefork', 'eventlet', 'gevent', 'solo', 'processes']
deployment_file Path
env_file Path
home Path
log_dir Path
log_level str
model_config
program_config_file Path
program_file Path
program_name_file Path
restart_script_path Path
run_dir Path
start_script_path Path
stop_script_path Path
Attributes#
acoupi_beat_service_file = 'acoupi-beat.service' class-attribute instance-attribute #
acoupi_service_file = 'acoupi.service' class-attribute instance-attribute #
app_name = 'app' class-attribute instance-attribute #
beat_script_path = home / 'bin' / 'acoupi-beat.sh' class-attribute instance-attribute #
celery_config_file = home / 'config' / 'celery.json' class-attribute instance-attribute #
celery_pool_type = 'threads' class-attribute instance-attribute #
deployment_file = home / 'config' / 'deployment.json' class-attribute instance-attribute #
env_file = home / 'config' / 'env' class-attribute instance-attribute #
home = Path.home() / '.acoupi' class-attribute instance-attribute #
log_dir = home / 'log' class-attribute instance-attribute #
log_level = 'INFO' class-attribute instance-attribute #
model_config = SettingsConfigDict(env_prefix='ACOUPI_') class-attribute instance-attribute #
program_config_file = home / 'config' / 'program.json' class-attribute instance-attribute #
program_file = home / (app_name + '.py') class-attribute instance-attribute #
program_name_file = home / 'config' / 'name' class-attribute instance-attribute #
restart_script_path = home / 'bin' / 'acoupi-workers-restart.sh' class-attribute instance-attribute #
run_dir = home / 'run' class-attribute instance-attribute #
start_script_path = home / 'bin' / 'acoupi-workers-start.sh' class-attribute instance-attribute #
stop_script_path = home / 'bin' / 'acoupi-workers-stop.sh' class-attribute instance-attribute #

acoupi.system.deployments #

Functions that manage Acoupi system.

This module contains utility functions for acoupi programs such as loading programs and getting celery apps from programs.

Functions:

Name Description
end_deployment

End current deployment.

get_current_deployment

Get current deployment.

start_deployment

Start a new deployment.

Classes#

Functions#

end_deployment(settings) #

End current deployment.

Parameters:

Name Type Description Default
settings Settings

The settings to use.

required

Returns:

Type Description
Deployment

The ended deployment.

Raises:

Type Description
DeploymentError

If the deployment has already ended or has not been started yet.

get_current_deployment(settings) #

Get current deployment.

Parameters:

Name Type Description Default
settings Settings

The settings to use.

required

Returns:

Type Description
Deployment

The current deployment.

Raises:

Type Description
DeploymentError

If the deployment file does not exist or the deployment has already ended.

load_deployment_from_file(path) #

Load deployment from file.

The deployment is loaded from a JSON file.

Parameters:

Name Type Description Default
path Path

The path to load the deployment from.

required

Returns:

Type Description
Deployment

The loaded deployment.

Raises:

Type Description
ValueError

If the deployment data is invalid.

FileNotFoundError

If the deployment file does not exist.

start_deployment(settings, name, latitude=None, longitude=None) #

Start a new deployment.

Parameters:

Name Type Description Default
settings Settings

The settings to use.

required
name str

The name of the deployment.

required

Returns:

Type Description
Deployment

Raises:

Type Description
DeploymentError

If Acoupi has already been deployed.

ValueError

If the deployment data is invalid. i.e. the latitude or longitude is invalid.

write_deployment_to_file(deployment, path) #

Write deployment to file.

The deployment is written as a JSON file.

Parameters:

Name Type Description Default
deployment Deployment

The deployment to write to file.

required
path Path

The path to write the deployment to.

required

acoupi.system.exceptions #

Custom Exceptions for Acoupi system.

Classes:

Name Description
ConfigurationError

Exception raised when a configuration is invalid.

DeploymentError

Exception raised when a deployment fails.

HealthCheckError

Exception raised when a health check fails.

InvalidProgramError

Exception raised when a program file is invalid.

ParameterError

Exception raised when a parameter is invalid.

ProgramNotFoundError

Exception raised when program is not found.

Classes#

ConfigurationError(message, help=None) #

Bases: Exception

Exception raised when a configuration is invalid.

Parameters:

Name Type Description Default
message str

The error message.

required
help str

An optional help message on how to fix the error.

None

Attributes:

Name Type Description
help
message
Attributes#
help = help instance-attribute #
message = message instance-attribute #

DeploymentError(message) #

Bases: Exception

Exception raised when a deployment fails.

Attributes:

Name Type Description
message
Attributes#
message = message instance-attribute #

HealthCheckError(message) #

Bases: Exception

Exception raised when a health check fails.

Attributes:

Name Type Description
message
Attributes#
message = message instance-attribute #

InvalidProgramError(program) #

Bases: Exception

Exception raised when a program file is invalid.

Attributes:

Name Type Description
program
Attributes#
program = program instance-attribute #

ParameterError(value, message, help=None) #

Bases: Exception

Exception raised when a parameter is invalid.

Parameters:

Name Type Description Default
value str

The value that caused the error.

required
message str

The error message.

required
help str

An optional help message on how to fix the error.

None

Attributes:

Name Type Description
help
message
value
Attributes#
help = help instance-attribute #
message = message instance-attribute #
value = value instance-attribute #

ProgramNotFoundError(program) #

Bases: Exception

Exception raised when program is not found.

Attributes:

Name Type Description
program
Attributes#
program = program instance-attribute #

acoupi.system.files #

Functions to handle files.

Functions:

Name Description
delete_recording

Delete the recording.

get_temp_dir

Get a temporary directory where to store recordings.

get_temp_file_id

Get the temporary recording UUID from the path.

get_temp_files

Get the list of temporary recordings.

get_temp_files_paths

Get the temporary recording path.

move_recording

Move the recording to the destination.

Attributes#

DEFAULT_AUDIO_STORAGE = Path.home() / 'audio' module-attribute #

TEMP_PATH = Path('/run/shm/') module-attribute #

logger = logging.getLogger(__name__) module-attribute #

Functions#

delete_recording(recording) #

Delete the recording.

get_temp_dir(in_memory=True) #

Get a temporary directory where to store recordings.

Recordings are usually stored in a staging temporary directory before being moved to the final storage location. This function returns the path of the directory where the temporary recordings are stored.

Parameters:

Name Type Description Default
in_memory bool

If True, the temporary files will be stored in memory.

True
Notes

It is recommended to store temporary files in memory to reduce the number of writes to the SD card. It can also be useful in scenarios where recordings should not be kept in disk for legal reasons.

If in_memory is set to True but the system does not support in-memory storage, the function will return the default temporary and show a warning.

get_temp_file(path) #

Get the temporary recording UUID from the path.

get_temp_file_id(path) #

Get the temporary recording UUID from the path.

get_temp_files(path=TEMP_PATH) #

Get the list of temporary recordings.

Temporary recordings are stored in memory to avoid unnecessary writes to the SD card.

get_temp_files_paths(path) #

Get the temporary recording path.

move_recording(recording, dest, logger=None) #

Move the recording to the destination.

acoupi.system.lifecycle #

System functions for managing Acoupi programs.

Functions:

Name Description
setup_program

Set up an Acoupi Program.

start_program
stop_program

Classes#

Functions#

setup_program(settings, program_name, args=None, prompt=False) #

Set up an Acoupi Program.

start_program(settings, name, latitude=None, longitude=None) #

stop_program(settings) #

acoupi.system.programs #

System functions for managing Acoupi programs.

Functions:

Name Description
load_config_schema

Load the configuration schema for the program.

load_program

Load a fully configured acoupi program.

load_program_class

Load the acoupi program class from a specified module.

write_program_file

Write the python script with the celery app to run.

Attributes#

Classes#

ProgramState #

Bases: str, Enum

Attributes:

Name Type Description
ERROR
OK
UNHEALTHY
Attributes#
ERROR = 'error' class-attribute instance-attribute #
OK = 'ok' class-attribute instance-attribute #
UNHEALTHY = 'unhealthy' class-attribute instance-attribute #

Functions#

get_program_state(settings) #

load_config_schema(settings) #

Load the configuration schema for the program.

load_program(settings) #

Load a fully configured acoupi program.

This function loads the acoupi program class from the specified module, instantiates it, and configures it with the specified config files.

Parameters:

Name Type Description Default
settings Settings

The settings object containing the paths to the program and celery config files.

required

Returns:

Type Description
AcoupiProgram

A fully configured instance of the acoupi program class.

Raises:

Type Description
ProgramNotFoundError

If the program module is not found.

InvalidProgramError

If the loaded class is not a valid acoupi program class.

load_program_class(program_name) #

Load the acoupi program class from a specified module.

This function searches the given module for a valid AcoupiProgram class. If multiple such classes exist within the module, it prioritizes classes that are not further subclassed.

Parameters:

Name Type Description Default
program_name str

The name of the module containing the acoupi program class.

required

Returns:

Type Description
Type[AcoupiProgram]

The acoupi program class.

Raises:

Type Description
ProgramNotFoundError

If the specified program module is not found.

InvalidProgramError

If the loaded class is not a valid acoupi program class.

Notes

When multiple AcoupiProgram classes are found within the module, this function will attempt to select one that is not further subclassed. If multiple such "final" classes exist, a warning will be issued, and the first one encountered will be selected. It is recommended that when exposing AcoupiProgram classes in a module, only one "final" (non-subclassable) class be made available.

load_worker_config(settings) #

Load the configuration schema for the program.

write_program_file(program_name, settings) #

Write the python script with the celery app to run.

acoupi.system.scripts #

Functions:

Name Description
write_scripts

Write the worker scripts.

Classes#

Functions#

get_celery_bin() #

Return the path to the celery binary.

give_executable_permissions(path) #

Give executable permissions to a file.

write_beat_script(settings, celery_bin=None) #

Write the beat script.

write_scripts(config, settings, celery_bin=None) #

Write the worker scripts.

write_workers_restart_script(config, settings, celery_bin=None) #

Write the worker restart script.

write_workers_start_script(config, settings, celery_bin=None) #

Write the worker start script.

write_workers_stop_script(config, settings, celery_bin=None) #

Write the worker stop script.

acoupi.system.services #

System functions to manage acoupi services.

Functions:

Name Description
disable_services

Disable acoupi services.

enable_services

Enable acoupi services.

install_services

Install acoupi services.

services_are_installed

Check if acoupi services are installed.

start_services

Start acoupi services.

status_services

Stop acoupi services.

stop_services

Stop acoupi services.

uninstall_services

Uninstall acoupi services.

Classes#

ServiceStatus #

Bases: str, Enum

Service status.

Attributes:

Name Type Description
ACTIVATING
ACTIVE
DEACTIVATING
FAILED
INACTIVE
RELOADING
UNINSTALLED
UNKNOWN
Attributes#
ACTIVATING = 'activating' class-attribute instance-attribute #
ACTIVE = 'active' class-attribute instance-attribute #
DEACTIVATING = 'deactivating' class-attribute instance-attribute #
FAILED = 'failed' class-attribute instance-attribute #
INACTIVE = 'inactive' class-attribute instance-attribute #
RELOADING = 'reloading' class-attribute instance-attribute #
UNINSTALLED = 'uninstalled' class-attribute instance-attribute #
UNKNOWN = 'unknown' class-attribute instance-attribute #

Functions#

disable_services(settings, path=None, **kwargs) #

Disable acoupi services.

enable_services(settings, path=None, **kwargs) #

Enable acoupi services.

get_acoupi_beat_service_status(settings, path=None) #

Get the status of acoupi services.

get_acoupi_service_status(settings, path=None) #

Get the status of acoupi services.

get_service_status(service_name) #

get_user_unit_dir() #

Get the user unit directory.

Returns:

Type Description
Path

The user unit directory.

install_services(settings, path=None) #

Install acoupi services.

services_are_installed(settings, path=None) #

Check if acoupi services are installed.

start_services(settings, path=None, **kwargs) #

Start acoupi services.

status_services(settings, path=None, **kwargs) #

Stop acoupi services.

stop_services(settings, path=None, **kwargs) #

Stop acoupi services.

uninstall_services(settings, path=None) #

Uninstall acoupi services.

acoupi.system.state #

Functions for accessing the state of the Acoupi system.

Classes:

Name Description
AcoupiStatus

Model representing the status of the Acoupi system.

Functions:

Name Description
get_status

Get the current status of the Acoupi system.

is_configured

Check if acoupi is configured.

Classes#

AcoupiStatus #

Bases: BaseModel

Model representing the status of the Acoupi system.

Attributes:

Name Type Description
acoupi_service ServiceStatus

The status of the acoupi systemd service.

beat_service ServiceStatus

The status of the beat systemd service.

celery CeleryStatus

The status of the Celery workers.

program ProgramStatus

The status of the acoupi program.

Attributes#
celery instance-attribute #
deployment instance-attribute #
program instance-attribute #
services instance-attribute #

ServicesStatus #

Bases: BaseModel

Attributes:

Name Type Description
acoupi ServiceStatus
beat ServiceStatus
Attributes#
acoupi instance-attribute #
beat instance-attribute #

Functions#

get_status(settings) #

Get the current status of the Acoupi system.

Parameters:

Name Type Description Default
settings Settings

The settings object containing configuration paths.

required

Returns:

Type Description
AcoupiStatus

An object containing the status of various components of the Acoupi system.

is_configured(settings) #

Check if acoupi is configured.

acoupi.system.tasks #

Acoupi system tasks module.

This module provides a set of functions to help manage and interact with the tasks of the currently configured acoupi program.

Functions:

Name Description
get_task_list

Return a list of all the tasks registered in the current program.

profile_task

Profile a task from the current program.

run_task

Run a task from the current program.

Classes#

Functions#

get_task_list(program, include_celery_tasks=False) #

Return a list of all the tasks registered in the current program.

Parameters:

Name Type Description Default
program AcoupiProgram

The AcoupiProgram instance to get the tasks from.

required
include_celery_tasks bool

Whether to include celery tasks in the list. Defaults to False.

False

Returns:

Type Description
List[str]

List of the names of available tasks.

Notes

Celery registers a number of tasks by default, which can be excluded from the list by setting include_celery_tasks to False.

profile_task(program, task_name) #

Profile a task from the current program.

Parameters:

Name Type Description Default
program AcoupiProgram

The AcoupiProgram instance to profile the task from.

required
task_name str

The name of the task to profile.

required

Raises:

Type Description
ValueError

If the specified task is not found.

Notes

This function uses cProfile to profile the task. The output can be saved to a file by providing the output parameter. The task is run in the current Python process and does not send the task to the Celery workers, so the profiling will only show the performance of the task without the overhead of the Celery workers.

run_task(program, task_name, recording=None) #

Run a task from the current program.

Parameters:

Name Type Description Default
program AcoupiProgram

The AcoupiProgram instance to run the task from.

required
task_name str

The name of the task to run.

required

Raises:

Type Description
ValueError

If the specified task is not found.

Notes

This function runs the task in the current Python process and does not send the task to the Celery workers. This can be helpful for testing a task without setting up Celery workers. To run the task through Celery workers, use the acoupi celery call <task_name> command.

acoupi.system.templates #

Jinja2 environment for rendering templates.

Functions:

Name Description
render_template

Render a template with the given name and keyword arguments.

Attributes#

env = Environment(loader=(PackageLoader('acoupi')), autoescape=(select_autoescape())) module-attribute #

Functions#

render_template(template_name, **kwargs) #

Render a template with the given name and keyword arguments.