Programs#
acoupi.programs.core.workers
#
Acoupi Workers Module.
This module provides the functionality for managing and configuring AcoupiWorkers, which are essential components of the Acoupi framework for building smart acoustic sensors. AcoupiWorkers are responsible for executing tasks related to acoustic sensing and analysis. Each Acoupi program relies on a set of AcoupiWorkers, each with a unique name and configuration.
The AcoupiWorker is implemented as a Celery worker, which listens to designated queues for task execution and coordination.
Attributes#
DEFAULT_WORKER_CONFIG = WorkerConfig(workers=[AcoupiWorker(name='recording', queues=['recording'], concurrency=1), AcoupiWorker(name='default', queues=['celery'])])
module-attribute
#
Classes#
AcoupiWorker
#
Bases: BaseModel
AcoupiWorker Class.
Represents an individual worker instance within the Acoupi framework. It is responsible for executing tasks related to acoustic sensing and analysis. Each worker is implemented as a Celery worker and listens to designated queues for task execution and coordination.
Attributes#
concurrency: Optional[int] = None
class-attribute
instance-attribute
#
Number of concurrent tasks the worker should run.
If None, the worker will run as many tasks as possible.
This setting should be set to 1 if the worker if at most one task can be run at a time. This is useful for tasks that are not thread safe or that require use of a resource that can only be used by one task at a time.
name: str
instance-attribute
#
Name of the worker. Should be unique among different workers.
queues: List[str] = Field(default_factory=list)
class-attribute
instance-attribute
#
Queues the worker should listen to.
If empty, the worker will listen to all queues.
WorkerConfig
#
Bases: BaseModel
WorkerConfig Class.
A configuration class used to define the set of workers to be used for an Acoupi program. It allows you to specify the details of each worker, such as its name, concurrency level, and the queues it listens to.
acoupi.programs.core.base
#
Definition of what a program is.
Attributes#
B = TypeVar('B')
module-attribute
#
C = TypeVar('C', bound=BaseModel, covariant=False, contravariant=True)
module-attribute
#
ProgramConfig = TypeVar('ProgramConfig', bound=BaseModel)
module-attribute
#
Classes#
AcoupiProgram(program_config, app)
#
Bases: ABC
, Generic[ProgramConfig]
A program is a collection of tasks.
Attributes#
app: Celery = app
instance-attribute
#
config: ProgramConfig = program_config
instance-attribute
#
config_schema: Type[ProgramConfig]
instance-attribute
#
logger: logging.Logger = get_task_logger(self.__class__.__name__)
instance-attribute
#
tasks = {}
instance-attribute
#
worker_config: Optional[WorkerConfig] = None
class-attribute
instance-attribute
#
Functions#
add_task(function, callbacks=None, schedule=None, queue=None, name=None)
#
Add a task to the program.
add_task_to_queue(task_name, queue)
#
Add a task to a queue.
check(config)
#
Check the configurations.
This method should raise an exception if the configurations are invalid. The exception should be an instance of HealthCheckError.
User defined programs should override this method if they want to validate their configurations. The default implementation does nothing.
Ideally this method should be called before a deployment is made.
get_config_schema()
classmethod
#
Get the config class.
get_queue_names()
classmethod
#
Get the queue names.
get_worker_config()
classmethod
#
Get the worker config class.
on_end(deployment)
#
End a deployment.
Called when the user ends a deployment.
This method should be overridden by user defined programs if they want to do something when the program ends. The default implementation does nothing.
on_start(deployment)
#
Start a deployment.
Called when the user starts a deployment.
This method should be overridden by user defined programs if they want to do something when the program starts. The default implementation does nothing.
setup(config)
#
Set up the program.
InvalidAcoupiConfiguration
#
Bases: ValueError
Raised when a configuration is invalid.
NoUserPrompt
#
No user prompt annotation.
Use this class to annotate fields that should not be prompted to the user.
acoupi.programs
#
Attributes#
DEFAULT_WORKER_CONFIG = WorkerConfig(workers=[AcoupiWorker(name='recording', queues=['recording'], concurrency=1), AcoupiWorker(name='default', queues=['celery'])])
module-attribute
#
ProgramConfig = TypeVar('ProgramConfig', bound=BaseModel)
module-attribute
#
Classes#
AcoupiProgram(program_config, app)
#
Bases: ABC
, Generic[ProgramConfig]
A program is a collection of tasks.
Attributes#
app: Celery = app
instance-attribute
#
config: ProgramConfig = program_config
instance-attribute
#
config_schema: Type[ProgramConfig]
instance-attribute
#
logger: logging.Logger = get_task_logger(self.__class__.__name__)
instance-attribute
#
tasks = {}
instance-attribute
#
worker_config: Optional[WorkerConfig] = None
class-attribute
instance-attribute
#
Functions#
add_task(function, callbacks=None, schedule=None, queue=None, name=None)
#
Add a task to the program.
add_task_to_queue(task_name, queue)
#
Add a task to a queue.
check(config)
#
Check the configurations.
This method should raise an exception if the configurations are invalid. The exception should be an instance of HealthCheckError.
User defined programs should override this method if they want to validate their configurations. The default implementation does nothing.
Ideally this method should be called before a deployment is made.
get_config_schema()
classmethod
#
Get the config class.
get_queue_names()
classmethod
#
Get the queue names.
get_worker_config()
classmethod
#
Get the worker config class.
on_end(deployment)
#
End a deployment.
Called when the user ends a deployment.
This method should be overridden by user defined programs if they want to do something when the program ends. The default implementation does nothing.
on_start(deployment)
#
Start a deployment.
Called when the user starts a deployment.
This method should be overridden by user defined programs if they want to do something when the program starts. The default implementation does nothing.
setup(config)
#
Set up the program.
AcoupiWorker
#
Bases: BaseModel
AcoupiWorker Class.
Represents an individual worker instance within the Acoupi framework. It is responsible for executing tasks related to acoustic sensing and analysis. Each worker is implemented as a Celery worker and listens to designated queues for task execution and coordination.
Attributes#
concurrency: Optional[int] = None
class-attribute
instance-attribute
#
Number of concurrent tasks the worker should run.
If None, the worker will run as many tasks as possible.
This setting should be set to 1 if the worker if at most one task can be run at a time. This is useful for tasks that are not thread safe or that require use of a resource that can only be used by one task at a time.
name: str
instance-attribute
#
Name of the worker. Should be unique among different workers.
queues: List[str] = Field(default_factory=list)
class-attribute
instance-attribute
#
Queues the worker should listen to.
If empty, the worker will listen to all queues.
NoUserPrompt
#
No user prompt annotation.
Use this class to annotate fields that should not be prompted to the user.
WorkerConfig
#
Bases: BaseModel
WorkerConfig Class.
A configuration class used to define the set of workers to be used for an Acoupi program. It allows you to specify the details of each worker, such as its name, concurrency level, and the queues it listens to.
acoupi.programs.templates
#
Acoupi program templates.
This module provides base classes and configuration schemas to simplify the creation of Acoupi programs.
Available Templates:
- Basic Program: Provides a foundation for building Acoupi programs, including features for audio recording, metadata storage, and file management.
- Base class:
BasicProgram
- Configuration schema:
BasicProgramConfiguration
- Messaging Program: Extends the
BasicProgram
with messaging capabilities, enabling programs to send messages and heartbeats via HTTP or MQTT. - Base class:
MessagingProgram
- Configuration schema:
MessagingProgramConfiguration
- Detection Program: Extends the
MessagingProgram
with audio detection capabilities, allowing programs to run detection models on recordings and generate messages based on the results. - Base class:
DetectionProgram
- Configuration schema:
DetectionProgramConfiguration
Each template includes a base class that provides core functionality and a configuration schema to define the program's settings.
For detailed usage instructions, customization options, and examples, refer to the individual template documentation.
Classes#
AudioConfiguration
#
Bases: BaseModel
Audio configuration schema.
Attributes#
chunksize: Annotated[int, NoUserPrompt] = 8192
class-attribute
instance-attribute
#
Chunksize of audio recording.
duration: int = 3
class-attribute
instance-attribute
#
Duration of each audio recording in seconds.
interval: int = 10
class-attribute
instance-attribute
#
Interval between each audio recording in seconds.
schedule_end: datetime.time = Field(default=datetime.time(hour=22, minute=30, second=0))
class-attribute
instance-attribute
#
End time for recording schedule.
schedule_start: datetime.time = Field(default=datetime.time(hour=6, minute=0, second=0))
class-attribute
instance-attribute
#
Start time for recording schedule.
BasicProgram(program_config, app)
#
Bases: AcoupiProgram[ProgramConfig]
Basic Acoupi Program.
This class provides a base for creating basic Acoupi programs. It offers essential features for audio recording, metadata storage, and file management.
Components:
- Audio Recorder: Records audio clips according to the program's configuration.
- Store: Provides an interface for storing and retrieving metadata associated with the program and its recordings.
Tasks:
Using the components above, this class creates and manages the following tasks:
- Audio Recording: Records audio at regular intervals, configurable
through the
audio
settings in theBasicProgramConfiguration
schema. - File Management: Periodically performs file management operations, such as moving recordings from temporary to permanent storage.
Customization:
customise the program's behavior by overriding these methods:
get_recording_conditions
: Define the specific conditions that must be met for audio recording to continue when the recording task is triggered by the scheduler.get_recording_filters
: Add filters to determine which recordings to save.get_recording_callbacks
: Define actions to perform after a recording is made.
Examples:
import datetime
from acoupi import components, data
from acoupi.programs.templates import (
BasicProgram,
BasicProgramConfiguration,
)
class Config(BasicProgramConfiguration):
pass
class Program(BasicProgram):
configuration_schema = Config
def get_recording_conditions(self, config: Config):
# Get the default recording conditions
conditions = super().get_recording_conditions(
config
)
return [
components.IsInInterval(
data.TimeInterval(
start=datetime.time(hour=3),
end=datetime.time(hour=6),
)
),
*conditions,
]
Attributes#
recorder: types.AudioRecorder
instance-attribute
#
store: types.Store
instance-attribute
#
worker_config = DEFAULT_WORKER_CONFIG
class-attribute
instance-attribute
#
Functions#
check(config)
#
Check the program's components.
This method performs checks on the program's components to ensure they are functioning correctly. Currently, it only checks the PyAudio recorder if it is being used.
configure_recorder(config)
#
Configure the audio recorder.
This method creates and configures an instance of the PyAudioRecorder
based on the provided configuration.
Returns:
Type | Description |
---|---|
AudioRecorder
|
The configured audio recorder instance. |
configure_store(config)
#
Configure the metadata store.
This method creates and configures an instance of the SqliteStore
based on the provided configuration.
Returns:
Type | Description |
---|---|
Store
|
The configured metadata store instance. |
create_file_management_task(config)
#
Create the file management task.
This method creates the task responsible for managing audio files.
Returns:
Type | Description |
---|---|
Callable[[], None]
|
The file management task. |
create_recording_task(config)
#
get_file_managers(config)
#
Get the file managers.
This method defines how audio recordings should be saved and managed. It returns a list of file managers that are responsible for determining the final storage location of each recording.
When a recording is marked for saving, the program iterates through the list of file managers in order. Each manager can either:
- Return a path where the recording should be saved.
- Return
None
to indicate that it cannot handle the recording, allowing the next manager in the list to be used.
By default, this method returns a list containing a single
DateFileManager
, which saves recordings in a structured folder
hierarchy based on the recording date.
Returns:
Type | Description |
---|---|
list[RecordingSavingManager]
|
A list of file manager instances. |
get_recording_callbacks(config)
#
get_recording_conditions(config)
#
Get the recording conditions.
This method defines the conditions under which audio recording should be performed. By default, it uses the schedule defined in the configuration.
Returns:
Type | Description |
---|---|
RecordingCondition
|
A recording condition. |
get_recording_filters(config)
#
Get the recording saving filters.
This method defines filters that determine which recordings should be saved permanently. By default, it returns an empty list, meaning all recordings are saved.
Returns:
Type | Description |
---|---|
list[RecordingSavingFilter]
|
A list of recording saving filters. |
get_required_models(config)
#
Get the required models for a recording to be considered ready.
This method specifies which bioacoustic models must process a recording before it is considered "ready" to be moved from temporary storage.
By default, no models are required, meaning recordings are immediately considered ready. However, you can override this method to define specific models that must process the recordings based on the program's configuration.
Returns:
Type | Description |
---|---|
list[str]
|
A list of model names that are required to process a recording before it is considered ready. |
on_end(deployment)
#
Handle program end event.
This method is called when the program ends. It updates the deployment information in the metadata store, and ensure that remaining tasks are completed before the program is stopped.
Tasks to check are: - file_management_task (if implemented). Check if there are remaining files in the temporary directory and move them to the correct directory.
on_start(deployment)
#
Handle program start event.
This method is called when the program starts and stores the deployment information in the metadata store.
register_file_management_task(config)
#
Register the file management task.
This method registers the file management task with the program's scheduler.
register_recording_task(config)
#
Register the recording task.
This method registers the recording task with the program's scheduler.
setup(config)
#
Set up the basic program.
This method initialises the program's components (audio recorder, store, and file manager), registers the recording and file management tasks, and performs necessary setup operations.
validate_dirs(config)
#
Validate the directories used by the program.
This method ensures that the necessary directories for storing audio and metadata exist. If they don't, it creates them.
BasicProgramConfiguration
#
Bases: BaseModel
Configuration schema for a basic program.
Attributes#
microphone: MicrophoneConfig
instance-attribute
#
Microphone configuration.
paths: PathsConfiguration = Field(default_factory=PathsConfiguration)
class-attribute
instance-attribute
#
Data configuration.
recording: AudioConfiguration = Field(default_factory=AudioConfiguration)
class-attribute
instance-attribute
#
Audio configuration.
timezone: TimeZoneName = Field(default=TimeZoneName('Europe/London'))
class-attribute
instance-attribute
#
Time zone where the device will be deployed.
DetectionProgram(program_config, app)
#
Bases: MessagingProgram[C]
, ABC
Detection Program.
This abstract class extends the MessagingProgram
to provide a foundation
for detection programs. It includes functionality for configuring and
running a detection model, processing model outputs, and generating
messages based on detection results.
Components:
- Detection Model: An audio detection model that processes recordings to identify specific sounds or events.
Tasks:
- Detection Task: Runs the detection model on audio recordings and
processes the results, including:
- Cleaning the model output using
get_output_cleaners
. By default this includes a threshold cleaner that filters out detections below a specified threshold. - Filtering the processed output using
get_processing_filters
. - Generating messages based on the results using
get_message_factories
.
- Cleaning the model output using
Inherited Components:
This class inherits the following components from MessagingProgram
:
- Messenger: A component responsible for sending messages and heartbeats via a configured communication protocol (HTTP or MQTT).
- Message Store: A database for storing messages before they are sent.
This class also inherits the following components from BasicProgram
:
- Audio Recorder: Records audio clips according to the program's configuration.
- File Manager: Manages the storage of audio recordings, including saving them to permanent storage and handling temporary files.
- Store: Provides an interface for storing and retrieving metadata associated with the program and its recordings.
Inherited Tasks:
This program inherits the following tasks from MessagingProgram
:
- Heartbeat Task: Periodically sends heartbeat messages to indicate that the program is running.
- Send Messages Task: Periodically sends messages from the message store to the configured messenger.
This program also inherits the following tasks from BasicProgram
:
- Audio Recording: Records audio at regular intervals, configurable
through the
audio
settings in theBasicProgramConfiguration
schema. - File Management: Periodically performs file management operations, such as moving recordings from temporary to permanent storage.
Customization:
You can customise the detection process by overriding the following methods:
configure_model
: Required. Implement this method to configure and return an instance of your detection model.get_output_cleaners
: To clean up the model's raw output. Either override this method completely or callsuper().get_output_cleaners()
to include the default threshold cleaner.get_processing_filters
: To filter recordings before processing.get_message_factories
: To customise the messages generated based on detection results.
Examples:
from acoupi.programs.templates import (
DetectionProgram,
DetectionProgramConfiguration,
)
from acoupi.components import types
# This import should be replaced with your actual model import
# This model does not exist in the acoupi package
from acoupi.models import SimpleBirdModel
class MyBirdDetectionConfiguration(
DetectionProgramConfiguration
):
# Add any configuration specific to bird detection
pass
class MyBirdDetectionProgram(
DetectionProgram[MyBirdDetectionConfiguration]
):
configuration_schema = MyBirdDetectionConfiguration
def configure_model(
self, config: MyBirdDetectionConfiguration
) -> types.Model:
return (
SimpleBirdModel()
) # Replace with your actual model
Attributes#
model: types.Model
instance-attribute
#
The configured detection model instance.
Functions#
check(config)
#
Check the program's components.
This method performs checks on the program's components, including the detection model, to ensure they are functioning correctly.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
C
|
The program's configuration. |
required |
configure_model(config)
abstractmethod
#
Configure the detection model.
This method must be implemented by subclasses to configure and return an instance of the detection model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
C
|
The program's configuration. |
required |
Returns:
Type | Description |
---|---|
Model
|
The configured detection model instance. |
create_detection_task(config)
#
Create the detection task.
This method creates the task responsible for running the detection model and processing its output.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
C
|
The program's configuration. |
required |
Returns:
Type | Description |
---|---|
Callable[[Recording], None]
|
The detection task. |
Notes
This method uses the generate_detection_task
function to create the
detection task. You can override this method to customise the task
creation process.
get_message_factories(config)
#
Get the message factories.
This method can be overridden to define a list of message factories that will be used to generate messages based on the processed detection results.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
C
|
The program's configuration. |
required |
Returns:
Type | Description |
---|---|
List[MessageBuilder]
|
A list of message factories. |
get_output_cleaners(config)
#
Get the model output cleaners.
This method can be overridden to define a list of output cleaners that will be applied to the model's raw output to clean it up or extract relevant information. By default, it includes a threshold cleaner that filters out detections below a specified threshold.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
C
|
The program's configuration. |
required |
Returns:
Type | Description |
---|---|
List[ModelOutputCleaner]
|
A list of model output cleaners. |
get_processing_filters(config)
#
Get the processing filters.
This method can be overridden to define a list of processing filters that will be applied to each recording before it is processed by the model. These filters determine whether a recording should be processed at all.
This can be useful to avoid unnecessary model processing when it is not required by the context or based on simple heuristics on the recording content. Model processing can be computationally expensive, so it is beneficial to avoid it if possible.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
C
|
The program's configuration. |
required |
Returns:
Type | Description |
---|---|
List[ProcessingFilter]
|
A list of processing filters. |
get_recording_callbacks(config)
#
Get the recording callbacks.
This method adds the detection task as a callback to be executed after each recording is completed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
C
|
The program's configuration. |
required |
Returns:
Type | Description |
---|---|
List[Callable]
|
A list of recording callbacks, including the detection task. |
get_required_models(config)
#
on_end(deployment)
#
Handle program end event.
This method is called when the program ends. It updates the deployment information in the metadata store, and ensure that remaining tasks are completed before the program is stopped.
Tasks to check are: - file_management_task (if implemented). Check if there are remaining files in the temporary directory and move them to the correct directory. - detection_task (if implemented). Check if there are remaining files to be processed and process them.
setup(config)
#
Set up the Detection Program.
This method initialises the detection model and performs any necessary setup.
DetectionProgramConfiguration
#
Bases: MessagingProgramConfiguration
Detection Program Configuration schema.
This schema extends the MessagingProgramConfiguration
to include any
additional settings required for detection programs.
DetectionsConfiguration
#
MessagingConfig
#
Bases: BaseModel
Messaging configuration schema.
This schema defines the configuration for messaging components, including the message store, message sending interval, heartbeat interval, and messenger configurations (HTTP or MQTT).
Attributes#
heartbeat_interval: int = 60 * 60
class-attribute
instance-attribute
#
Interval between sending heartbeats in seconds.
http: Optional[messengers.HTTPConfig] = None
class-attribute
instance-attribute
#
HTTP messenger configuration.
message_send_interval: int = 120
class-attribute
instance-attribute
#
Interval between sending messages in seconds.
messages_db: Path = Field(default=Path.home() / 'storages' / 'messages.db')
class-attribute
instance-attribute
#
Path to the message database.
mqtt: Optional[messengers.MQTTConfig] = None
class-attribute
instance-attribute
#
MQTT messenger configuration.
MessagingProgram(program_config, app)
#
Bases: BasicProgram[ProgramConfig]
Messaging Acoupi Program.
This class extends the BasicProgram
to provide functionality for
sending messages and heartbeats with a configured messenger (HTTP or MQTT).
Components:
- Messenger: A component responsible for sending messages and heartbeats via a configured communication protocol (HTTP or MQTT).
- Message Store: A database for storing messages before they are sent.
Tasks:
Using the components above, this class creates and manages the following tasks:
- Heartbeat Task: Periodically sends heartbeat messages to indicate that the program is running.
- Send Messages Task: Periodically sends messages from the message store to the configured messenger.
This program includes all the functionality of
BasicProgram
, inheriting its
components and tasks for audio recording, metadata storage, and file
management.
Attributes#
message_store: types.MessageStore
instance-attribute
#
The configured message store instance.
messenger: Optional[types.Messenger]
instance-attribute
#
The configured messenger instance.
Functions#
check(config)
#
Check the messenger connection.
This method checks the connection to the configured messenger (HTTP or MQTT).
configure_message_store(config)
#
Configure the message store.
This method creates and configures an instance of the
SqliteMessageStore
based on the provided configuration.
Returns:
Type | Description |
---|---|
MessageStore
|
The configured message store instance. |
configure_messenger(config)
#
Configure the messenger.
This method creates and configures an instance of the
HTTPMessenger
or MQTTMessenger
based on the provided
configuration.
Returns:
Type | Description |
---|---|
Messenger
|
The configured messenger instance. |
Raises:
Type | Description |
---|---|
ValueError
|
If no messenger configuration is provided. |
create_heartbeat_task(config)
#
Create the heartbeat task.
This method creates the task responsible for sending heartbeats.
Returns:
Type | Description |
---|---|
Callable
|
The heartbeat task. |
create_messaging_task(config)
#
Create the messaging task.
This method creates the task responsible for sending messages.
Returns:
Type | Description |
---|---|
Callable
|
The messaging task. |
on_end(deployment)
#
End a deployment.
This method is called when the program is stopped. It updates the deployment information in the metadata store, and ensure and performs any necessary cleanup tasks (i.e., file_management_task, messaging_task).
register_heartbeat_task(config)
#
Register the heartbeat task.
This method registers the heartbeat task with the program's scheduler.
register_messaging_task(config)
#
Register the messaging task.
This method registers the messaging task with the program's scheduler.
setup(config)
#
Set up the Messaging Program.
This method initialises the message store and messenger, registers the messaging and heartbeat tasks, and performs any necessary setup.
validate_dirs(config)
#
Validate the directories used by the program.
This method ensures that the necessary directories for storing audio and metadata exist. If they don't, it creates them.
MessagingProgramConfiguration
#
Bases: BasicProgramConfiguration
Messaging Program Configuration schema.
This schema extends the BasicProgramConfiguration
to include settings
for messaging functionality.
PathsConfiguration
#
Bases: BaseModel
Data configuration schema.
Attributes#
db_metadata: Path = Field(default_factory=lambda: Path.home() / 'storages' / 'metadata.db')
class-attribute
instance-attribute
#
Path to the metadata database.
recordings: Path = Field(default_factory=lambda: Path.home() / 'storages' / 'recordings')
class-attribute
instance-attribute
#
Directory for storing audio files permanently.
tmp_audio: Path = Field(default_factory=get_temp_dir)
class-attribute
instance-attribute
#
Temporary directory for storing audio files.