Skip to content

Programs#

acoupi.programs.core.workers #

Acoupi Workers Module.

This module provides the functionality for managing and configuring AcoupiWorkers, which are essential components of the Acoupi framework for building smart acoustic sensors. AcoupiWorkers are responsible for executing tasks related to acoustic sensing and analysis. Each Acoupi program relies on a set of AcoupiWorkers, each with a unique name and configuration.

The AcoupiWorker is implemented as a Celery worker, which listens to designated queues for task execution and coordination.

Attributes#

DEFAULT_WORKER_CONFIG = WorkerConfig(workers=[AcoupiWorker(name='recording', queues=['recording'], concurrency=1), AcoupiWorker(name='default', queues=['celery'])]) module-attribute #

Classes#

AcoupiWorker #

Bases: BaseModel

AcoupiWorker Class.

Represents an individual worker instance within the Acoupi framework. It is responsible for executing tasks related to acoustic sensing and analysis. Each worker is implemented as a Celery worker and listens to designated queues for task execution and coordination.

Attributes#
concurrency: Optional[int] = None class-attribute instance-attribute #

Number of concurrent tasks the worker should run.

If None, the worker will run as many tasks as possible.

This setting should be set to 1 if the worker if at most one task can be run at a time. This is useful for tasks that are not thread safe or that require use of a resource that can only be used by one task at a time.

name: str instance-attribute #

Name of the worker. Should be unique among different workers.

queues: List[str] = Field(default_factory=list) class-attribute instance-attribute #

Queues the worker should listen to.

If empty, the worker will listen to all queues.

WorkerConfig #

Bases: BaseModel

WorkerConfig Class.

A configuration class used to define the set of workers to be used for an Acoupi program. It allows you to specify the details of each worker, such as its name, concurrency level, and the queues it listens to.

Attributes#
workers: List[AcoupiWorker] = Field(default_factory=list) class-attribute instance-attribute #
Functions#
unique_worker_names(workers) #

Validate that all workers have unique names.

acoupi.programs.core.base #

Definition of what a program is.

Attributes#

B = TypeVar('B') module-attribute #

C = TypeVar('C', bound=BaseModel, covariant=False, contravariant=True) module-attribute #

ProgramConfig = TypeVar('ProgramConfig', bound=BaseModel) module-attribute #

Classes#

AcoupiProgram(program_config, app) #

Bases: ABC, Generic[ProgramConfig]

A program is a collection of tasks.

Attributes#
app: Celery = app instance-attribute #
config: ProgramConfig = program_config instance-attribute #
config_schema: Type[ProgramConfig] instance-attribute #
logger: logging.Logger = get_task_logger(self.__class__.__name__) instance-attribute #
tasks = {} instance-attribute #
worker_config: Optional[WorkerConfig] = None class-attribute instance-attribute #
Functions#
add_task(function, callbacks=None, schedule=None, queue=None, name=None) #

Add a task to the program.

add_task_to_queue(task_name, queue) #

Add a task to a queue.

check(config) #

Check the configurations.

This method should raise an exception if the configurations are invalid. The exception should be an instance of HealthCheckError.

User defined programs should override this method if they want to validate their configurations. The default implementation does nothing.

Ideally this method should be called before a deployment is made.

get_config_schema() classmethod #

Get the config class.

get_queue_names() classmethod #

Get the queue names.

get_worker_config() classmethod #

Get the worker config class.

on_end(deployment) #

End a deployment.

Called when the user ends a deployment.

This method should be overridden by user defined programs if they want to do something when the program ends. The default implementation does nothing.

on_start(deployment) #

Start a deployment.

Called when the user starts a deployment.

This method should be overridden by user defined programs if they want to do something when the program starts. The default implementation does nothing.

setup(config) #

Set up the program.

InvalidAcoupiConfiguration #

Bases: ValueError

Raised when a configuration is invalid.

NoUserPrompt #

No user prompt annotation.

Use this class to annotate fields that should not be prompted to the user.

acoupi.programs #

Attributes#

DEFAULT_WORKER_CONFIG = WorkerConfig(workers=[AcoupiWorker(name='recording', queues=['recording'], concurrency=1), AcoupiWorker(name='default', queues=['celery'])]) module-attribute #

ProgramConfig = TypeVar('ProgramConfig', bound=BaseModel) module-attribute #

Classes#

AcoupiProgram(program_config, app) #

Bases: ABC, Generic[ProgramConfig]

A program is a collection of tasks.

Attributes#
app: Celery = app instance-attribute #
config: ProgramConfig = program_config instance-attribute #
config_schema: Type[ProgramConfig] instance-attribute #
logger: logging.Logger = get_task_logger(self.__class__.__name__) instance-attribute #
tasks = {} instance-attribute #
worker_config: Optional[WorkerConfig] = None class-attribute instance-attribute #
Functions#
add_task(function, callbacks=None, schedule=None, queue=None, name=None) #

Add a task to the program.

add_task_to_queue(task_name, queue) #

Add a task to a queue.

check(config) #

Check the configurations.

This method should raise an exception if the configurations are invalid. The exception should be an instance of HealthCheckError.

User defined programs should override this method if they want to validate their configurations. The default implementation does nothing.

Ideally this method should be called before a deployment is made.

get_config_schema() classmethod #

Get the config class.

get_queue_names() classmethod #

Get the queue names.

get_worker_config() classmethod #

Get the worker config class.

on_end(deployment) #

End a deployment.

Called when the user ends a deployment.

This method should be overridden by user defined programs if they want to do something when the program ends. The default implementation does nothing.

on_start(deployment) #

Start a deployment.

Called when the user starts a deployment.

This method should be overridden by user defined programs if they want to do something when the program starts. The default implementation does nothing.

setup(config) #

Set up the program.

AcoupiWorker #

Bases: BaseModel

AcoupiWorker Class.

Represents an individual worker instance within the Acoupi framework. It is responsible for executing tasks related to acoustic sensing and analysis. Each worker is implemented as a Celery worker and listens to designated queues for task execution and coordination.

Attributes#
concurrency: Optional[int] = None class-attribute instance-attribute #

Number of concurrent tasks the worker should run.

If None, the worker will run as many tasks as possible.

This setting should be set to 1 if the worker if at most one task can be run at a time. This is useful for tasks that are not thread safe or that require use of a resource that can only be used by one task at a time.

name: str instance-attribute #

Name of the worker. Should be unique among different workers.

queues: List[str] = Field(default_factory=list) class-attribute instance-attribute #

Queues the worker should listen to.

If empty, the worker will listen to all queues.

NoUserPrompt #

No user prompt annotation.

Use this class to annotate fields that should not be prompted to the user.

WorkerConfig #

Bases: BaseModel

WorkerConfig Class.

A configuration class used to define the set of workers to be used for an Acoupi program. It allows you to specify the details of each worker, such as its name, concurrency level, and the queues it listens to.

Attributes#
workers: List[AcoupiWorker] = Field(default_factory=list) class-attribute instance-attribute #
Functions#
unique_worker_names(workers) #

Validate that all workers have unique names.

acoupi.programs.templates #

Acoupi program templates.

This module provides base classes and configuration schemas to simplify the creation of Acoupi programs.

Available Templates:

  • Basic Program: Provides a foundation for building Acoupi programs, including features for audio recording, metadata storage, and file management.
  • Base class: BasicProgram
  • Configuration schema: BasicProgramConfiguration
  • Messaging Program: Extends the BasicProgram with messaging capabilities, enabling programs to send messages and heartbeats via HTTP or MQTT.
  • Base class: MessagingProgram
  • Configuration schema: MessagingProgramConfiguration
  • Detection Program: Extends the MessagingProgram with audio detection capabilities, allowing programs to run detection models on recordings and generate messages based on the results.
  • Base class: DetectionProgram
  • Configuration schema: DetectionProgramConfiguration

Each template includes a base class that provides core functionality and a configuration schema to define the program's settings.

For detailed usage instructions, customization options, and examples, refer to the individual template documentation.

Classes#

AudioConfiguration #

Bases: BaseModel

Audio configuration schema.

Attributes#
chunksize: Annotated[int, NoUserPrompt] = 8192 class-attribute instance-attribute #

Chunksize of audio recording.

duration: int = 3 class-attribute instance-attribute #

Duration of each audio recording in seconds.

interval: int = 10 class-attribute instance-attribute #

Interval between each audio recording in seconds.

schedule_end: datetime.time = Field(default=datetime.time(hour=22, minute=30, second=0)) class-attribute instance-attribute #

End time for recording schedule.

schedule_start: datetime.time = Field(default=datetime.time(hour=6, minute=0, second=0)) class-attribute instance-attribute #

Start time for recording schedule.

BasicProgram(program_config, app) #

Bases: AcoupiProgram[ProgramConfig]

Basic Acoupi Program.

This class provides a base for creating basic Acoupi programs. It offers essential features for audio recording, metadata storage, and file management.

Components:

  • Audio Recorder: Records audio clips according to the program's configuration.
  • Store: Provides an interface for storing and retrieving metadata associated with the program and its recordings.

Tasks:

Using the components above, this class creates and manages the following tasks:

  • Audio Recording: Records audio at regular intervals, configurable through the audio settings in the BasicProgramConfiguration schema.
  • File Management: Periodically performs file management operations, such as moving recordings from temporary to permanent storage.

Customization:

customise the program's behavior by overriding these methods:

  • get_recording_conditions: Define the specific conditions that must be met for audio recording to continue when the recording task is triggered by the scheduler.
  • get_recording_filters: Add filters to determine which recordings to save.
  • get_recording_callbacks: Define actions to perform after a recording is made.

Examples:

import datetime
from acoupi import components, data
from acoupi.programs.templates import (
    BasicProgram,
    BasicProgramConfiguration,
)


class Config(BasicProgramConfiguration):
    pass


class Program(BasicProgram):
    configuration_schema = Config

    def get_recording_conditions(self, config: Config):
        # Get the default recording conditions
        conditions = super().get_recording_conditions(
            config
        )
        return [
            components.IsInInterval(
                data.TimeInterval(
                    start=datetime.time(hour=3),
                    end=datetime.time(hour=6),
                )
            ),
            *conditions,
        ]
Attributes#
recorder: types.AudioRecorder instance-attribute #
store: types.Store instance-attribute #
worker_config = DEFAULT_WORKER_CONFIG class-attribute instance-attribute #
Functions#
check(config) #

Check the program's components.

This method performs checks on the program's components to ensure they are functioning correctly. Currently, it only checks the PyAudio recorder if it is being used.

configure_recorder(config) #

Configure the audio recorder.

This method creates and configures an instance of the PyAudioRecorder based on the provided configuration.

Returns:

Type Description
AudioRecorder

The configured audio recorder instance.

configure_store(config) #

Configure the metadata store.

This method creates and configures an instance of the SqliteStore based on the provided configuration.

Returns:

Type Description
Store

The configured metadata store instance.

create_file_management_task(config) #

Create the file management task.

This method creates the task responsible for managing audio files.

Returns:

Type Description
Callable[[], None]

The file management task.

create_recording_task(config) #

Create the recording task.

This method creates the task responsible for recording audio.

Returns:

Type Description
Callable[[], Optional[Recording]]

The recording task.

get_file_managers(config) #

Get the file managers.

This method defines how audio recordings should be saved and managed. It returns a list of file managers that are responsible for determining the final storage location of each recording.

When a recording is marked for saving, the program iterates through the list of file managers in order. Each manager can either:

  • Return a path where the recording should be saved.
  • Return None to indicate that it cannot handle the recording, allowing the next manager in the list to be used.

By default, this method returns a list containing a single DateFileManager, which saves recordings in a structured folder hierarchy based on the recording date.

Returns:

Type Description
list[RecordingSavingManager]

A list of file manager instances.

get_recording_callbacks(config) #

Get the recording callbacks.

This method defines callbacks to be executed after a recording is completed. By default, it returns an empty list.

Returns:

Type Description
list[Callable[[Optional[Recording]], None]]

A list of recording callbacks.

get_recording_conditions(config) #

Get the recording conditions.

This method defines the conditions under which audio recording should be performed. By default, it uses the schedule defined in the configuration.

Returns:

Type Description
RecordingCondition

A recording condition.

get_recording_filters(config) #

Get the recording saving filters.

This method defines filters that determine which recordings should be saved permanently. By default, it returns an empty list, meaning all recordings are saved.

Returns:

Type Description
list[RecordingSavingFilter]

A list of recording saving filters.

get_required_models(config) #

Get the required models for a recording to be considered ready.

This method specifies which bioacoustic models must process a recording before it is considered "ready" to be moved from temporary storage.

By default, no models are required, meaning recordings are immediately considered ready. However, you can override this method to define specific models that must process the recordings based on the program's configuration.

Returns:

Type Description
list[str]

A list of model names that are required to process a recording before it is considered ready.

on_end(deployment) #

Handle program end event.

This method is called when the program ends. It updates the deployment information in the metadata store, and ensure that remaining tasks are completed before the program is stopped.

Tasks to check are: - file_management_task (if implemented). Check if there are remaining files in the temporary directory and move them to the correct directory.

on_start(deployment) #

Handle program start event.

This method is called when the program starts and stores the deployment information in the metadata store.

register_file_management_task(config) #

Register the file management task.

This method registers the file management task with the program's scheduler.

register_recording_task(config) #

Register the recording task.

This method registers the recording task with the program's scheduler.

setup(config) #

Set up the basic program.

This method initialises the program's components (audio recorder, store, and file manager), registers the recording and file management tasks, and performs necessary setup operations.

validate_dirs(config) #

Validate the directories used by the program.

This method ensures that the necessary directories for storing audio and metadata exist. If they don't, it creates them.

BasicProgramConfiguration #

Bases: BaseModel

Configuration schema for a basic program.

Attributes#
microphone: MicrophoneConfig instance-attribute #

Microphone configuration.

paths: PathsConfiguration = Field(default_factory=PathsConfiguration) class-attribute instance-attribute #

Data configuration.

recording: AudioConfiguration = Field(default_factory=AudioConfiguration) class-attribute instance-attribute #

Audio configuration.

timezone: TimeZoneName = Field(default=TimeZoneName('Europe/London')) class-attribute instance-attribute #

Time zone where the device will be deployed.

DetectionProgram(program_config, app) #

Bases: MessagingProgram[C], ABC

Detection Program.

This abstract class extends the MessagingProgram to provide a foundation for detection programs. It includes functionality for configuring and running a detection model, processing model outputs, and generating messages based on detection results.

Components:

  • Detection Model: An audio detection model that processes recordings to identify specific sounds or events.

Tasks:

  • Detection Task: Runs the detection model on audio recordings and processes the results, including:
    • Cleaning the model output using get_output_cleaners. By default this includes a threshold cleaner that filters out detections below a specified threshold.
    • Filtering the processed output using get_processing_filters.
    • Generating messages based on the results using get_message_factories.

Inherited Components:

This class inherits the following components from MessagingProgram:

  • Messenger: A component responsible for sending messages and heartbeats via a configured communication protocol (HTTP or MQTT).
  • Message Store: A database for storing messages before they are sent.

This class also inherits the following components from BasicProgram:

  • Audio Recorder: Records audio clips according to the program's configuration.
  • File Manager: Manages the storage of audio recordings, including saving them to permanent storage and handling temporary files.
  • Store: Provides an interface for storing and retrieving metadata associated with the program and its recordings.

Inherited Tasks:

This program inherits the following tasks from MessagingProgram:

  • Heartbeat Task: Periodically sends heartbeat messages to indicate that the program is running.
  • Send Messages Task: Periodically sends messages from the message store to the configured messenger.

This program also inherits the following tasks from BasicProgram:

  • Audio Recording: Records audio at regular intervals, configurable through the audio settings in the BasicProgramConfiguration schema.
  • File Management: Periodically performs file management operations, such as moving recordings from temporary to permanent storage.

Customization:

You can customise the detection process by overriding the following methods:

  • configure_model: Required. Implement this method to configure and return an instance of your detection model.
  • get_output_cleaners: To clean up the model's raw output. Either override this method completely or call super().get_output_cleaners() to include the default threshold cleaner.
  • get_processing_filters: To filter recordings before processing.
  • get_message_factories: To customise the messages generated based on detection results.

Examples:

from acoupi.programs.templates import (
    DetectionProgram,
    DetectionProgramConfiguration,
)
from acoupi.components import types

# This import should be replaced with your actual model import
# This model does not exist in the acoupi package
from acoupi.models import SimpleBirdModel


class MyBirdDetectionConfiguration(
    DetectionProgramConfiguration
):
    # Add any configuration specific to bird detection
    pass


class MyBirdDetectionProgram(
    DetectionProgram[MyBirdDetectionConfiguration]
):
    configuration_schema = MyBirdDetectionConfiguration

    def configure_model(
        self, config: MyBirdDetectionConfiguration
    ) -> types.Model:
        return (
            SimpleBirdModel()
        )  # Replace with your actual model
Attributes#
model: types.Model instance-attribute #

The configured detection model instance.

Functions#
check(config) #

Check the program's components.

This method performs checks on the program's components, including the detection model, to ensure they are functioning correctly.

Parameters:

Name Type Description Default
config C

The program's configuration.

required
configure_model(config) abstractmethod #

Configure the detection model.

This method must be implemented by subclasses to configure and return an instance of the detection model.

Parameters:

Name Type Description Default
config C

The program's configuration.

required

Returns:

Type Description
Model

The configured detection model instance.

create_detection_task(config) #

Create the detection task.

This method creates the task responsible for running the detection model and processing its output.

Parameters:

Name Type Description Default
config C

The program's configuration.

required

Returns:

Type Description
Callable[[Recording], None]

The detection task.

Notes

This method uses the generate_detection_task function to create the detection task. You can override this method to customise the task creation process.

get_message_factories(config) #

Get the message factories.

This method can be overridden to define a list of message factories that will be used to generate messages based on the processed detection results.

Parameters:

Name Type Description Default
config C

The program's configuration.

required

Returns:

Type Description
List[MessageBuilder]

A list of message factories.

get_output_cleaners(config) #

Get the model output cleaners.

This method can be overridden to define a list of output cleaners that will be applied to the model's raw output to clean it up or extract relevant information. By default, it includes a threshold cleaner that filters out detections below a specified threshold.

Parameters:

Name Type Description Default
config C

The program's configuration.

required

Returns:

Type Description
List[ModelOutputCleaner]

A list of model output cleaners.

get_processing_filters(config) #

Get the processing filters.

This method can be overridden to define a list of processing filters that will be applied to each recording before it is processed by the model. These filters determine whether a recording should be processed at all.

This can be useful to avoid unnecessary model processing when it is not required by the context or based on simple heuristics on the recording content. Model processing can be computationally expensive, so it is beneficial to avoid it if possible.

Parameters:

Name Type Description Default
config C

The program's configuration.

required

Returns:

Type Description
List[ProcessingFilter]

A list of processing filters.

get_recording_callbacks(config) #

Get the recording callbacks.

This method adds the detection task as a callback to be executed after each recording is completed.

Parameters:

Name Type Description Default
config C

The program's configuration.

required

Returns:

Type Description
List[Callable]

A list of recording callbacks, including the detection task.

get_required_models(config) #
on_end(deployment) #

Handle program end event.

This method is called when the program ends. It updates the deployment information in the metadata store, and ensure that remaining tasks are completed before the program is stopped.

Tasks to check are: - file_management_task (if implemented). Check if there are remaining files in the temporary directory and move them to the correct directory. - detection_task (if implemented). Check if there are remaining files to be processed and process them.

setup(config) #

Set up the Detection Program.

This method initialises the detection model and performs any necessary setup.

DetectionProgramConfiguration #

Bases: MessagingProgramConfiguration

Detection Program Configuration schema.

This schema extends the MessagingProgramConfiguration to include any additional settings required for detection programs.

Attributes#
detections: DetectionsConfiguration = Field(default_factory=DetectionsConfiguration) class-attribute instance-attribute #

Detection settings.

DetectionsConfiguration #

Bases: BaseModel

Detection settings schema.

Attributes#
threshold: float = 0.2 class-attribute instance-attribute #

Detections with a score below this threshold will be ignored.

MessagingConfig #

Bases: BaseModel

Messaging configuration schema.

This schema defines the configuration for messaging components, including the message store, message sending interval, heartbeat interval, and messenger configurations (HTTP or MQTT).

Attributes#
heartbeat_interval: int = 60 * 60 class-attribute instance-attribute #

Interval between sending heartbeats in seconds.

http: Optional[messengers.HTTPConfig] = None class-attribute instance-attribute #

HTTP messenger configuration.

message_send_interval: int = 120 class-attribute instance-attribute #

Interval between sending messages in seconds.

messages_db: Path = Field(default=Path.home() / 'storages' / 'messages.db') class-attribute instance-attribute #

Path to the message database.

mqtt: Optional[messengers.MQTTConfig] = None class-attribute instance-attribute #

MQTT messenger configuration.

MessagingProgram(program_config, app) #

Bases: BasicProgram[ProgramConfig]

Messaging Acoupi Program.

This class extends the BasicProgram to provide functionality for sending messages and heartbeats with a configured messenger (HTTP or MQTT).

Components:

  • Messenger: A component responsible for sending messages and heartbeats via a configured communication protocol (HTTP or MQTT).
  • Message Store: A database for storing messages before they are sent.

Tasks:

Using the components above, this class creates and manages the following tasks:

  • Heartbeat Task: Periodically sends heartbeat messages to indicate that the program is running.
  • Send Messages Task: Periodically sends messages from the message store to the configured messenger.

This program includes all the functionality of BasicProgram, inheriting its components and tasks for audio recording, metadata storage, and file management.

Attributes#
message_store: types.MessageStore instance-attribute #

The configured message store instance.

messenger: Optional[types.Messenger] instance-attribute #

The configured messenger instance.

Functions#
check(config) #

Check the messenger connection.

This method checks the connection to the configured messenger (HTTP or MQTT).

configure_message_store(config) #

Configure the message store.

This method creates and configures an instance of the SqliteMessageStore based on the provided configuration.

Returns:

Type Description
MessageStore

The configured message store instance.

configure_messenger(config) #

Configure the messenger.

This method creates and configures an instance of the HTTPMessenger or MQTTMessenger based on the provided configuration.

Returns:

Type Description
Messenger

The configured messenger instance.

Raises:

Type Description
ValueError

If no messenger configuration is provided.

create_heartbeat_task(config) #

Create the heartbeat task.

This method creates the task responsible for sending heartbeats.

Returns:

Type Description
Callable

The heartbeat task.

create_messaging_task(config) #

Create the messaging task.

This method creates the task responsible for sending messages.

Returns:

Type Description
Callable

The messaging task.

on_end(deployment) #

End a deployment.

This method is called when the program is stopped. It updates the deployment information in the metadata store, and ensure and performs any necessary cleanup tasks (i.e., file_management_task, messaging_task).

register_heartbeat_task(config) #

Register the heartbeat task.

This method registers the heartbeat task with the program's scheduler.

register_messaging_task(config) #

Register the messaging task.

This method registers the messaging task with the program's scheduler.

setup(config) #

Set up the Messaging Program.

This method initialises the message store and messenger, registers the messaging and heartbeat tasks, and performs any necessary setup.

validate_dirs(config) #

Validate the directories used by the program.

This method ensures that the necessary directories for storing audio and metadata exist. If they don't, it creates them.

MessagingProgramConfiguration #

Bases: BasicProgramConfiguration

Messaging Program Configuration schema.

This schema extends the BasicProgramConfiguration to include settings for messaging functionality.

Attributes#
messaging: MessagingConfig instance-attribute #

PathsConfiguration #

Bases: BaseModel

Data configuration schema.

Attributes#
db_metadata: Path = Field(default_factory=lambda: Path.home() / 'storages' / 'metadata.db') class-attribute instance-attribute #

Path to the metadata database.

recordings: Path = Field(default_factory=lambda: Path.home() / 'storages' / 'recordings') class-attribute instance-attribute #

Directory for storing audio files permanently.

tmp_audio: Path = Field(default_factory=get_temp_dir) class-attribute instance-attribute #

Temporary directory for storing audio files.