Skip to content

System#

acoupi.system #

Functions that manage Acoupi system.

This module contains utility functions for acoupi programs such as loading programs and getting celery apps from programs.

Classes#

AcoupiStatus #

Bases: BaseModel

Model representing the status of the Acoupi system.

Attributes:

Name Type Description
acoupi_service ServiceStatus

The status of the acoupi systemd service.

beat_service ServiceStatus

The status of the beat systemd service.

celery CeleryStatus

The status of the Celery workers.

program ProgramStatus

The status of the acoupi program.

Attributes#
celery: CeleryStatus instance-attribute #
deployment: Optional[Deployment] instance-attribute #
program: ProgramState instance-attribute #
services: ServicesStatus instance-attribute #

Settings #

Bases: BaseSettings

Settings for acoupi system.

Attributes#
acoupi_beat_service_file: str = 'acoupi-beat.service' class-attribute instance-attribute #
acoupi_service_file: str = 'acoupi.service' class-attribute instance-attribute #
app_name: str = 'app' class-attribute instance-attribute #
beat_script_path: Path = home / 'bin' / 'acoupi-beat.sh' class-attribute instance-attribute #
celery_config_file: Path = home / 'config' / 'celery.json' class-attribute instance-attribute #
celery_pool_type: Literal['threads', 'prefork', 'eventlet', 'gevent', 'solo', 'processes'] = 'threads' class-attribute instance-attribute #
deployment_file: Path = home / 'config' / 'deployment.json' class-attribute instance-attribute #
env_file: Path = home / 'config' / 'env' class-attribute instance-attribute #
home: Path = Path.home() / '.acoupi' class-attribute instance-attribute #
log_dir: Path = home / 'log' class-attribute instance-attribute #
log_level: str = 'INFO' class-attribute instance-attribute #
model_config = SettingsConfigDict(env_prefix='ACOUPI_') class-attribute instance-attribute #
program_config_file: Path = home / 'config' / 'program.json' class-attribute instance-attribute #
program_file: Path = home / app_name + '.py' class-attribute instance-attribute #
program_name_file: Path = home / 'config' / 'name' class-attribute instance-attribute #
restart_script_path: Path = home / 'bin' / 'acoupi-workers-restart.sh' class-attribute instance-attribute #
run_dir: Path = home / 'run' class-attribute instance-attribute #
start_script_path: Path = home / 'bin' / 'acoupi-workers-start.sh' class-attribute instance-attribute #
stop_script_path: Path = home / 'bin' / 'acoupi-workers-stop.sh' class-attribute instance-attribute #

Functions#

delete_recording(recording) #

Delete the recording.

disable_services(settings, path=None, **kwargs) #

Disable acoupi services.

dump_config(config, indent=2) #

Dump a configuration object to a JSON string.

enable_services(settings, path=None, **kwargs) #

Enable acoupi services.

end_deployment(settings) #

End current deployment.

Parameters:

Name Type Description Default
settings Settings

The settings to use.

required

Returns:

Type Description
Deployment

The ended deployment.

Raises:

Type Description
DeploymentError

If the deployment has already ended or has not been started yet.

get_celery_app(settings) #

Get the currently setup celery app.

get_celery_status(settings) #

get_config_field(config, field) #

Retrieve a field or nested field from a configuration object.

This function allows you to access configuration values by specifying the field name, including nested fields using dot notation (e.g., 'section.subsection.value'). If no field is provided, the entire configuration object is returned.

It is possible to get nested fields by using a dot notation.

Parameters:

Name Type Description Default
config BaseModel

The configuration object, an instance of a Pydantic model.

required
field str

The name of the field to retrieve. Use dot notation for nested fields.

required

Returns:

Type Description
Any

The value of the specified field. The type depends on the field's definition in the Pydantic model.

Raises:

Type Description
AttributeError

If the specified field does not exist within the configuration schema.

IndexError

If the field refers to an index in a list or tuple that is out of bounds.

NotImplementedError

If the field points to a data type or structure that this function does not currently support for retrieval.

Examples:

Accessing a root-level field:

>>> class Config(BaseModel):
...     a: int
...     b: str
>>> config = Config(a=1, b="2")
>>> get_config_field(config, "a")
>>> 1

Accessing a nested field:

>>> class NestedConfig(BaseModel):
...     d: bool
>>> class Config(BaseModel):
...     a: int
...     b: str
...     c: NestedConfig
>>> config = Config(a=1, b="2", c=NestedConfig(d=True))
>>> get_config_field(config, "c.d")
>>> True

Accessing an element from a list:

>>> class Config(BaseModel):
...     a: List[int]
>>> config = Config(a=[1, 2, 3])
>>> get_config_field(config, "a.0")
>>> 1

get_current_deployment(settings) #

Get current deployment.

Parameters:

Name Type Description Default
settings Settings

The settings to use.

required

Returns:

Type Description
Deployment

The current deployment.

Raises:

Type Description
DeploymentError

If the deployment file does not exist or the deployment has already ended.

get_status(settings) #

Get the current status of the Acoupi system.

Parameters:

Name Type Description Default
settings Settings

The settings object containing configuration paths.

required

Returns:

Type Description
AcoupiStatus

An object containing the status of various components of the Acoupi system.

get_task_list(program, include_celery_tasks=False) #

Return a list of all the tasks registered in the current program.

Parameters:

Name Type Description Default
program AcoupiProgram

The AcoupiProgram instance to get the tasks from.

required
include_celery_tasks bool

Whether to include celery tasks in the list. Defaults to False.

False

Returns:

Type Description
List[str]

List of the names of available tasks.

Notes

Celery registers a number of tasks by default, which can be excluded from the list by setting include_celery_tasks to False.

get_temp_file_id(path) #

Get the temporary recording UUID from the path.

get_temp_files(path=TEMP_PATH) #

Get the list of temporary recordings.

Temporary recordings are stored in memory to avoid unnecessary writes to the SD card.

is_configured(settings) #

Check if acoupi is configured.

load_config(path, schema) #

Load config from file.

Parameters:

Name Type Description Default
path Path

Path to the config file.

required
schema Type[S]

Pydantic model to validate the config.

required

Returns:

Type Description
S

The loaded config.

Raises:

Type Description
ConfigurationError

If the configuration is invalid.

FileNotFoundError

If the config file does not exist.

load_config_schema(settings) #

Load the configuration schema for the program.

load_program(settings) #

Load a fully configured acoupi program.

This function loads the acoupi program class from the specified module, instantiates it, and configures it with the specified config files.

Parameters:

Name Type Description Default
settings Settings

The settings object containing the paths to the program and celery config files.

required

Returns:

Type Description
AcoupiProgram

A fully configured instance of the acoupi program class.

Raises:

Type Description
ProgramNotFoundError

If the program module is not found.

InvalidProgramError

If the loaded class is not a valid acoupi program class.

load_program_class(program_name) #

Load the acoupi program class from a specified module.

This function searches the given module for a valid AcoupiProgram class. If multiple such classes exist within the module, it prioritizes classes that are not further subclassed.

Parameters:

Name Type Description Default
program_name str

The name of the module containing the acoupi program class.

required

Returns:

Type Description
Type[AcoupiProgram]

The acoupi program class.

Raises:

Type Description
ProgramNotFoundError

If the specified program module is not found.

InvalidProgramError

If the loaded class is not a valid acoupi program class.

Notes

When multiple AcoupiProgram classes are found within the module, this function will attempt to select one that is not further subclassed. If multiple such "final" classes exist, a warning will be issued, and the first one encountered will be selected. It is recommended that when exposing AcoupiProgram classes in a module, only one "final" (non-subclassable) class be made available.

move_recording(recording, dest, logger=None) #

Move the recording to the destination.

parse_config_from_args(schema, args=None, prompt=True) #

Parse configurations from user provided arguments.

This function will parse the configurations from the user provided in the command line arguments. It will return the parsed configuration.

It will override the default configuration with the user provided configuration, and ask the user for any missing configuration.

This function will raise an error if the user provided configuration is invalid.

Args: schema: The configuration schema to use. args: The arguments to parse. If None, will use an empty list. prompt: Whether to prompt the user for missing configuration.

Returns:

Type Description
config: The parsed configuration.

profile_task(program, task_name) #

Profile a task from the current program.

Parameters:

Name Type Description Default
program AcoupiProgram

The AcoupiProgram instance to profile the task from.

required
task_name str

The name of the task to profile.

required

Raises:

Type Description
ValueError

If the specified task is not found.

Notes

This function uses cProfile to profile the task. The output can be saved to a file by providing the output parameter. The task is run in the current Python process and does not send the task to the Celery workers, so the profiling will only show the performance of the task without the overhead of the Celery workers.

purge_queues(settings) #

Purge all messages from all queues.

Parameters:

Name Type Description Default
settings Settings

The settings object containing configuration for Celery.

required

Returns:

Type Description
CompletedProcess

The result of the command execution.

restart_workers(settings, log_level=None) #

Restart the Celery workers.

Parameters:

Name Type Description Default
settings Settings

The current acoupi settings.

required
log_level Optional[Literal]

The log level for the workers, by default None.

None

Returns:

Type Description
CompletedProcess

The result of the subprocess.run function.

run_celery_command(settings, args, with_app=True, quiet=False, **kwargs) #

run_task(program, task_name, recording=None) #

Run a task from the current program.

Parameters:

Name Type Description Default
program AcoupiProgram

The AcoupiProgram instance to run the task from.

required
task_name str

The name of the task to run.

required

Raises:

Type Description
ValueError

If the specified task is not found.

Notes

This function runs the task in the current Python process and does not send the task to the Celery workers. This can be helpful for testing a task without setting up Celery workers. To run the task through Celery workers, use the acoupi celery call <task_name> command.

services_are_installed(settings, path=None) #

Check if acoupi services are installed.

set_config_field(config, field, value, is_json=False, strict=False, from_attributes=False) #

Set a specific field in a config object.

This function enables you to update configuration values by specifying the field name, including nested fields using dot notation (e.g., 'section.subsection.value'). The provided value is validated against the field's type definition in the Pydantic model to ensure data integrity.

Parameters:

Name Type Description Default
config S

The configuration object to be modified. Must be an instance of a Pydantic model.

required
field str

The name of the field to set. Use dot notation for nested fields. If an empty string, the entire configuration is replaced. This is useful when setting the configuration from a JSON string.

required
value Any

The new value to assign to the specified field. The type should be compatible with the field's definition in the Pydantic model.

required
is_json bool

If True, the value is treated as a JSON string and parsed before being assigned. Default is False.

False
strict bool

If True, type coercion is disabled, and the value must match the field's type exactly. If False (default), type coercion is attempted if possible.

False
from_attributes bool

Relevant only when setting a field that is itself a Pydantic model. If True, the value is assumed to be an object whose attributes will be used to populate the model. See Pydantic's model_validate documentation for details. Default is False.

False

Returns:

Type Description
S

A new configuration object with the specified field modified. The original config object remains unchanged.

Raises:

Type Description
ValidationError

If the provided value fails validation against the field's type definition in the Pydantic model.

AttributeError

If the specified field does not exist within the configuration schema.

IndexError

If the field refers to an index in a list or tuple that is out of bounds.

NotImplementedError

If the field points to a data type or structure that this function does not currently support for setting.

Examples:

Setting a root-level field:

>>> class Config(BaseModel):
...     a: int
...     b: str
>>> config = Config(a=1, b="2")
>>> new_config = set_config_field(config, "a", 3)
>>> new_config.a
>>> 3

Setting a nested field:

>>> class NestedConfig(BaseModel):
...     d: bool
>>> class Config(BaseModel):
...     a: int
...     b: str
...     c: NestedConfig
>>> config = Config(a=1, b="2", c=NestedConfig(d=True))
>>> new_config = set_config_field(config, "c.d", False)
>>> new_config.c.d

Setting an element in a list:

>>> class Config(BaseModel):
...     a: List[int]
>>> config = Config(a=[1, 2, 3])
>>> new_config = set_config_field(config, "a.0", 4)
>>> new_config.a[0]
>>> 4

Setting a field from a JSON string:

>>> class Config(BaseModel):
...     a: int
...     b: str
>>> config = Config(a=1, b="2")
>>> new_config = set_config_field(
...     config,
...     "",
...     '{"a": 3, "b": "4"}',
...     is_json=True,
... )
>>> new_config.a
>>> 3

setup_program(settings, program_name, args=None, prompt=False) #

Set up an Acoupi Program.

start_deployment(settings, name, latitude=None, longitude=None) #

Start a new deployment.

Parameters:

Name Type Description Default
settings Settings

The settings to use.

required
name str

The name of the deployment.

required

Returns:

Type Description
Deployment

Raises:

Type Description
DeploymentError

If Acoupi has already been deployed.

ValueError

If the deployment data is invalid. i.e. the latitude or longitude is invalid.

start_program(settings, name, latitude=None, longitude=None) #

start_services(settings, path=None, **kwargs) #

Start acoupi services.

start_workers(settings, pool='threads', log_level=None) #

Start the Celery workers.

Parameters:

Name Type Description Default
settings Settings

The current acoupi settings.

required
pool Literal

The pool type for the workers, by default "threads".

'threads'
log_level Optional[Literal]

The log level for the workers, by default None.

None

Returns:

Type Description
CompletedProcess

The result of the subprocess.run function.

status_services(settings, path=None, **kwargs) #

Stop acoupi services.

stop_program(settings) #

stop_services(settings, path=None, **kwargs) #

Stop acoupi services.

stop_workers(settings, log_level=None) #

Stop the Celery workers.

Parameters:

Name Type Description Default
settings Settings

The current acoupi settings.

required
log_level Optional[Literal]

The log level for the workers, by default None.

None

Returns:

Type Description
CompletedProcess

The result of the subprocess.run function.

write_config(config, path) #

Write config to file.

write_program_file(program_name, settings) #

Write the python script with the celery app to run.

acoupi.system.apps #

System functions for managing celery apps.

Classes#

Functions#

get_celery_app(settings) #

Get the currently setup celery app.

acoupi.system.celery #

Classes#

CeleryState #

Bases: str, Enum

Attributes#
AVAILABLE = 'available' class-attribute instance-attribute #
ERROR = 'error' class-attribute instance-attribute #
UNAVAILABLE = 'unavailable' class-attribute instance-attribute #

CeleryStatus #

Bases: BaseModel

Attributes#
state: CeleryState instance-attribute #
workers: List[WorkerStatus] = Field(default_factory=list) class-attribute instance-attribute #

WorkerState #

Bases: str, Enum

Attributes#
NOTOK = 'notok' class-attribute instance-attribute #
OK = 'ok' class-attribute instance-attribute #

WorkerStatus #

Bases: BaseModel

Attributes#
state: WorkerState instance-attribute #
worker_name: str instance-attribute #

Functions#

get_celery_bin() #

Return the path to the celery binary.

get_celery_status(settings) #

purge_queue(settings, queue_name) #

Purge all messages from the specified queue.

Parameters:

Name Type Description Default
settings Settings

The settings object containing configuration for Celery.

required
queue_name str

The name of the queue to purge.

required

Returns:

Type Description
CompletedProcess

The result of the command execution.

purge_queues(settings) #

Purge all messages from all queues.

Parameters:

Name Type Description Default
settings Settings

The settings object containing configuration for Celery.

required

Returns:

Type Description
CompletedProcess

The result of the command execution.

restart_workers(settings, log_level=None) #

Restart the Celery workers.

Parameters:

Name Type Description Default
settings Settings

The current acoupi settings.

required
log_level Optional[Literal]

The log level for the workers, by default None.

None

Returns:

Type Description
CompletedProcess

The result of the subprocess.run function.

run_celery_command(settings, args, with_app=True, quiet=False, **kwargs) #

start_workers(settings, pool='threads', log_level=None) #

Start the Celery workers.

Parameters:

Name Type Description Default
settings Settings

The current acoupi settings.

required
pool Literal

The pool type for the workers, by default "threads".

'threads'
log_level Optional[Literal]

The log level for the workers, by default None.

None

Returns:

Type Description
CompletedProcess

The result of the subprocess.run function.

stop_workers(settings, log_level=None) #

Stop the Celery workers.

Parameters:

Name Type Description Default
settings Settings

The current acoupi settings.

required
log_level Optional[Literal]

The log level for the workers, by default None.

None

Returns:

Type Description
CompletedProcess

The result of the subprocess.run function.

acoupi.system.constants #

Path constants for acoupi system.

Classes#

CeleryConfig #

Bases: BaseModel

Configuration settings for Celery in Acoupi.

This class defines the settings used to configure the Celery task queue specifically for the Acoupi application.

Attributes#
accept_content: List[str] = Field(default_factory=lambda: ['pickle']) class-attribute instance-attribute #
broker_connection_retry_on_startup: bool = True class-attribute instance-attribute #

Retry to establish the connection to the AMQP broker on startup.

Automatically try to re-establish the connection to the AMQP broke if lost after the initial connection is made.

broker_url: str = 'pyamqp://guest@localhost//' class-attribute instance-attribute #

The URL of the message broker used by Celery.

Acoupi uses RabbitMQ with the default guest user. You may need to update this if your RabbitMQ setup is different.

enable_utc: bool = True class-attribute instance-attribute #

Whether to enable UTC for Celery.

It's generally recommended to keep this enabled for consistency across different Acoupi deployments.

result_backend: str = 'rpc://' class-attribute instance-attribute #

The URL for storing task results.

'rpc://' indicates that results are sent back directly to the client.

result_persistent: bool = False class-attribute instance-attribute #

Whether to persist task results.

In Acoupi deployments, task results are not typically needed after the task has completed, as all essential data is stored in an independent database. This setting helps to avoid unnecessary storage overhead.

result_serializer: str = 'pickle' class-attribute instance-attribute #
task_acks_late: bool = True class-attribute instance-attribute #

Whether to acknowledge tasks after they have been executed.

True means that tasks are acknowledged after they have been executed, not right before.

task_serializer: str = 'pickle' class-attribute instance-attribute #
task_soft_time_limit: int = 30 class-attribute instance-attribute #

The soft time limit (in seconds) for task execution.

If a task exceeds this limit, it will receive a warning.

If you have tasks that are expected to run longer than this limit, you should increase this value or specify the time limit directly.

task_time_limit: int = 60 class-attribute instance-attribute #

The hard time limit (in seconds) for task execution.

If a task exceeds this limit, it will be terminated.

If you have tasks that are expected to run longer than this limit, you should increase this value or specify the time limit directly.

timezone: str = 'UTC' class-attribute instance-attribute #

The timezone to use for Celery.

worker_prefetch_multiplier: int = 1 class-attribute instance-attribute #

The number of tasks a worker can prefetch.

Setting this to 1 prevents tasks from being delayed due to other tasks in the queue. Celery defaults to prefetching tasks in batches, which can cause a fast task to wait for a slower one in the same batch.

Settings #

Bases: BaseSettings

Settings for acoupi system.

Attributes#
acoupi_beat_service_file: str = 'acoupi-beat.service' class-attribute instance-attribute #
acoupi_service_file: str = 'acoupi.service' class-attribute instance-attribute #
app_name: str = 'app' class-attribute instance-attribute #
beat_script_path: Path = home / 'bin' / 'acoupi-beat.sh' class-attribute instance-attribute #
celery_config_file: Path = home / 'config' / 'celery.json' class-attribute instance-attribute #
celery_pool_type: Literal['threads', 'prefork', 'eventlet', 'gevent', 'solo', 'processes'] = 'threads' class-attribute instance-attribute #
deployment_file: Path = home / 'config' / 'deployment.json' class-attribute instance-attribute #
env_file: Path = home / 'config' / 'env' class-attribute instance-attribute #
home: Path = Path.home() / '.acoupi' class-attribute instance-attribute #
log_dir: Path = home / 'log' class-attribute instance-attribute #
log_level: str = 'INFO' class-attribute instance-attribute #
model_config = SettingsConfigDict(env_prefix='ACOUPI_') class-attribute instance-attribute #
program_config_file: Path = home / 'config' / 'program.json' class-attribute instance-attribute #
program_file: Path = home / app_name + '.py' class-attribute instance-attribute #
program_name_file: Path = home / 'config' / 'name' class-attribute instance-attribute #
restart_script_path: Path = home / 'bin' / 'acoupi-workers-restart.sh' class-attribute instance-attribute #
run_dir: Path = home / 'run' class-attribute instance-attribute #
start_script_path: Path = home / 'bin' / 'acoupi-workers-start.sh' class-attribute instance-attribute #
stop_script_path: Path = home / 'bin' / 'acoupi-workers-stop.sh' class-attribute instance-attribute #

acoupi.system.deployments #

Functions that manage Acoupi system.

This module contains utility functions for acoupi programs such as loading programs and getting celery apps from programs.

Classes#

Functions#

end_deployment(settings) #

End current deployment.

Parameters:

Name Type Description Default
settings Settings

The settings to use.

required

Returns:

Type Description
Deployment

The ended deployment.

Raises:

Type Description
DeploymentError

If the deployment has already ended or has not been started yet.

get_current_deployment(settings) #

Get current deployment.

Parameters:

Name Type Description Default
settings Settings

The settings to use.

required

Returns:

Type Description
Deployment

The current deployment.

Raises:

Type Description
DeploymentError

If the deployment file does not exist or the deployment has already ended.

load_deployment_from_file(path) #

Load deployment from file.

The deployment is loaded from a JSON file.

Parameters:

Name Type Description Default
path Path

The path to load the deployment from.

required

Returns:

Type Description
Deployment

The loaded deployment.

Raises:

Type Description
ValueError

If the deployment data is invalid.

FileNotFoundError

If the deployment file does not exist.

start_deployment(settings, name, latitude=None, longitude=None) #

Start a new deployment.

Parameters:

Name Type Description Default
settings Settings

The settings to use.

required
name str

The name of the deployment.

required

Returns:

Type Description
Deployment

Raises:

Type Description
DeploymentError

If Acoupi has already been deployed.

ValueError

If the deployment data is invalid. i.e. the latitude or longitude is invalid.

write_deployment_to_file(deployment, path) #

Write deployment to file.

The deployment is written as a JSON file.

Parameters:

Name Type Description Default
deployment Deployment

The deployment to write to file.

required
path Path

The path to write the deployment to.

required

acoupi.system.exceptions #

Custom Exceptions for Acoupi system.

Classes#

ConfigurationError(message, help=None) #

Bases: Exception

Exception raised when a configuration is invalid.

Parameters:

Name Type Description Default
message str

The error message.

required
help str

An optional help message on how to fix the error.

None
Attributes#
help = help instance-attribute #
message = message instance-attribute #

DeploymentError(message) #

Bases: Exception

Exception raised when a deployment fails.

Attributes#
message = message instance-attribute #

HealthCheckError(message) #

Bases: Exception

Exception raised when a health check fails.

Attributes#
message = message instance-attribute #

InvalidProgramError(program) #

Bases: Exception

Exception raised when a program file is invalid.

Attributes#
program = program instance-attribute #

ParameterError(value, message, help=None) #

Bases: Exception

Exception raised when a parameter is invalid.

Parameters:

Name Type Description Default
value str

The value that caused the error.

required
message str

The error message.

required
help str

An optional help message on how to fix the error.

None
Attributes#
help = help instance-attribute #
message = message instance-attribute #
value = value instance-attribute #

ProgramNotFoundError(program) #

Bases: Exception

Exception raised when program is not found.

Attributes#
program = program instance-attribute #

acoupi.system.files #

Functions to handle files.

Attributes#

DEFAULT_AUDIO_STORAGE = Path.home() / 'audio' module-attribute #

TEMP_PATH = Path('/run/shm/') module-attribute #

logger = logging.getLogger(__name__) module-attribute #

Functions#

delete_recording(recording) #

Delete the recording.

get_temp_dir(in_memory=True) #

Get a temporary directory where to store recordings.

Recordings are usually stored in a staging temporary directory before being moved to the final storage location. This function returns the path of the directory where the temporary recordings are stored.

Parameters:

Name Type Description Default
in_memory bool

If True, the temporary files will be stored in memory.

True
Notes

It is recommended to store temporary files in memory to reduce the number of writes to the SD card. It can also be useful in scenarios where recordings should not be kept in disk for legal reasons.

If in_memory is set to True but the system does not support in-memory storage, the function will return the default temporary and show a warning.

get_temp_file(path) #

Get the temporary recording UUID from the path.

get_temp_file_id(path) #

Get the temporary recording UUID from the path.

get_temp_files(path=TEMP_PATH) #

Get the list of temporary recordings.

Temporary recordings are stored in memory to avoid unnecessary writes to the SD card.

get_temp_files_paths(path) #

Get the temporary recording path.

move_recording(recording, dest, logger=None) #

Move the recording to the destination.

acoupi.system.lifecycle #

System functions for managing Acoupi programs.

Classes#

Functions#

setup_program(settings, program_name, args=None, prompt=False) #

Set up an Acoupi Program.

start_program(settings, name, latitude=None, longitude=None) #

stop_program(settings) #

acoupi.system.programs #

System functions for managing Acoupi programs.

Attributes#

Classes#

ProgramState #

Bases: str, Enum

Attributes#
ERROR = 'error' class-attribute instance-attribute #
OK = 'ok' class-attribute instance-attribute #
UNHEALTHY = 'unhealthy' class-attribute instance-attribute #

Functions#

get_program_state(settings) #

load_config_schema(settings) #

Load the configuration schema for the program.

load_program(settings) #

Load a fully configured acoupi program.

This function loads the acoupi program class from the specified module, instantiates it, and configures it with the specified config files.

Parameters:

Name Type Description Default
settings Settings

The settings object containing the paths to the program and celery config files.

required

Returns:

Type Description
AcoupiProgram

A fully configured instance of the acoupi program class.

Raises:

Type Description
ProgramNotFoundError

If the program module is not found.

InvalidProgramError

If the loaded class is not a valid acoupi program class.

load_program_class(program_name) #

Load the acoupi program class from a specified module.

This function searches the given module for a valid AcoupiProgram class. If multiple such classes exist within the module, it prioritizes classes that are not further subclassed.

Parameters:

Name Type Description Default
program_name str

The name of the module containing the acoupi program class.

required

Returns:

Type Description
Type[AcoupiProgram]

The acoupi program class.

Raises:

Type Description
ProgramNotFoundError

If the specified program module is not found.

InvalidProgramError

If the loaded class is not a valid acoupi program class.

Notes

When multiple AcoupiProgram classes are found within the module, this function will attempt to select one that is not further subclassed. If multiple such "final" classes exist, a warning will be issued, and the first one encountered will be selected. It is recommended that when exposing AcoupiProgram classes in a module, only one "final" (non-subclassable) class be made available.

load_worker_config(settings) #

Load the configuration schema for the program.

write_program_file(program_name, settings) #

Write the python script with the celery app to run.

acoupi.system.scripts #

Classes#

Functions#

get_celery_bin() #

Return the path to the celery binary.

give_executable_permissions(path) #

Give executable permissions to a file.

write_beat_script(settings, celery_bin=None) #

Write the beat script.

write_scripts(config, settings, celery_bin=None) #

Write the worker scripts.

write_workers_restart_script(config, settings, celery_bin=None) #

Write the worker restart script.

write_workers_start_script(config, settings, celery_bin=None) #

Write the worker start script.

write_workers_stop_script(config, settings, celery_bin=None) #

Write the worker stop script.

acoupi.system.services #

System functions to manage acoupi services.

Classes#

ServiceStatus #

Bases: str, Enum

Service status.

Attributes#
ACTIVATING = 'activating' class-attribute instance-attribute #
ACTIVE = 'active' class-attribute instance-attribute #
DEACTIVATING = 'deactivating' class-attribute instance-attribute #
FAILED = 'failed' class-attribute instance-attribute #
INACTIVE = 'inactive' class-attribute instance-attribute #
RELOADING = 'reloading' class-attribute instance-attribute #
UNINSTALLED = 'uninstalled' class-attribute instance-attribute #
UNKNOWN = 'unknown' class-attribute instance-attribute #

Functions#

disable_services(settings, path=None, **kwargs) #

Disable acoupi services.

enable_services(settings, path=None, **kwargs) #

Enable acoupi services.

get_acoupi_beat_service_status(settings, path=None) #

Get the status of acoupi services.

get_acoupi_service_status(settings, path=None) #

Get the status of acoupi services.

get_service_status(service_name) #

get_user_unit_dir() #

Get the user unit directory.

Returns:

Type Description
Path

The user unit directory.

install_services(settings, path=None) #

Install acoupi services.

services_are_installed(settings, path=None) #

Check if acoupi services are installed.

start_services(settings, path=None, **kwargs) #

Start acoupi services.

status_services(settings, path=None, **kwargs) #

Stop acoupi services.

stop_services(settings, path=None, **kwargs) #

Stop acoupi services.

uninstall_services(settings, path=None) #

Uninstall acoupi services.

acoupi.system.state #

Functions for accessing the state of the Acoupi system.

Classes#

AcoupiStatus #

Bases: BaseModel

Model representing the status of the Acoupi system.

Attributes:

Name Type Description
acoupi_service ServiceStatus

The status of the acoupi systemd service.

beat_service ServiceStatus

The status of the beat systemd service.

celery CeleryStatus

The status of the Celery workers.

program ProgramStatus

The status of the acoupi program.

Attributes#
celery: CeleryStatus instance-attribute #
deployment: Optional[Deployment] instance-attribute #
program: ProgramState instance-attribute #
services: ServicesStatus instance-attribute #

ServicesStatus #

Bases: BaseModel

Attributes#
acoupi: ServiceStatus instance-attribute #
beat: ServiceStatus instance-attribute #

Functions#

get_status(settings) #

Get the current status of the Acoupi system.

Parameters:

Name Type Description Default
settings Settings

The settings object containing configuration paths.

required

Returns:

Type Description
AcoupiStatus

An object containing the status of various components of the Acoupi system.

is_configured(settings) #

Check if acoupi is configured.

acoupi.system.tasks #

Acoupi system tasks module.

This module provides a set of functions to help manage and interact with the tasks of the currently configured acoupi program.

Classes#

Functions#

get_task_list(program, include_celery_tasks=False) #

Return a list of all the tasks registered in the current program.

Parameters:

Name Type Description Default
program AcoupiProgram

The AcoupiProgram instance to get the tasks from.

required
include_celery_tasks bool

Whether to include celery tasks in the list. Defaults to False.

False

Returns:

Type Description
List[str]

List of the names of available tasks.

Notes

Celery registers a number of tasks by default, which can be excluded from the list by setting include_celery_tasks to False.

profile_task(program, task_name) #

Profile a task from the current program.

Parameters:

Name Type Description Default
program AcoupiProgram

The AcoupiProgram instance to profile the task from.

required
task_name str

The name of the task to profile.

required

Raises:

Type Description
ValueError

If the specified task is not found.

Notes

This function uses cProfile to profile the task. The output can be saved to a file by providing the output parameter. The task is run in the current Python process and does not send the task to the Celery workers, so the profiling will only show the performance of the task without the overhead of the Celery workers.

run_task(program, task_name, recording=None) #

Run a task from the current program.

Parameters:

Name Type Description Default
program AcoupiProgram

The AcoupiProgram instance to run the task from.

required
task_name str

The name of the task to run.

required

Raises:

Type Description
ValueError

If the specified task is not found.

Notes

This function runs the task in the current Python process and does not send the task to the Celery workers. This can be helpful for testing a task without setting up Celery workers. To run the task through Celery workers, use the acoupi celery call <task_name> command.

acoupi.system.templates #

Jinja2 environment for rendering templates.

Attributes#

env = Environment(loader=PackageLoader('acoupi'), autoescape=select_autoescape()) module-attribute #

Functions#

render_template(template_name, **kwargs) #

Render a template with the given name and keyword arguments.