Skip to content

Programs#

acoupi.programs.core.workers #

Acoupi Workers Module.

This module provides the functionality for managing and configuring AcoupiWorkers, which are essential components of the Acoupi framework for building smart acoustic sensors. AcoupiWorkers are responsible for executing tasks related to acoustic sensing and analysis. Each Acoupi program relies on a set of AcoupiWorkers, each with a unique name and configuration.

The AcoupiWorker is implemented as a Celery worker, which listens to designated queues for task execution and coordination.

Classes:

Name Description
AcoupiWorker

AcoupiWorker Class.

WorkerConfig

WorkerConfig Class.

Attributes:

Name Type Description
DEFAULT_WORKER_CONFIG

Attributes#

DEFAULT_WORKER_CONFIG = WorkerConfig(workers=[AcoupiWorker(name='recording', queues=['recording'], concurrency=1), AcoupiWorker(name='default', queues=['celery'])]) module-attribute #

Classes#

AcoupiWorker #

Bases: BaseModel

AcoupiWorker Class.

Represents an individual worker instance within the Acoupi framework. It is responsible for executing tasks related to acoustic sensing and analysis. Each worker is implemented as a Celery worker and listens to designated queues for task execution and coordination.

Attributes:

Name Type Description
concurrency Optional[int]

Number of concurrent tasks the worker should run.

name str

Name of the worker. Should be unique among different workers.

queues List[str]

Queues the worker should listen to.

Attributes#
concurrency = None class-attribute instance-attribute #

Number of concurrent tasks the worker should run.

If None, the worker will run as many tasks as possible.

This setting should be set to 1 if the worker if at most one task can be run at a time. This is useful for tasks that are not thread safe or that require use of a resource that can only be used by one task at a time.

name instance-attribute #

Name of the worker. Should be unique among different workers.

queues = Field(default_factory=list) class-attribute instance-attribute #

Queues the worker should listen to.

If empty, the worker will listen to all queues.

WorkerConfig #

Bases: BaseModel

WorkerConfig Class.

A configuration class used to define the set of workers to be used for an Acoupi program. It allows you to specify the details of each worker, such as its name, concurrency level, and the queues it listens to.

Methods:

Name Description
unique_worker_names

Validate that all workers have unique names.

Attributes:

Name Type Description
workers List[AcoupiWorker]
Attributes#
workers = Field(default_factory=list) class-attribute instance-attribute #
Functions#
unique_worker_names(workers) #

Validate that all workers have unique names.

acoupi.programs.core.base #

Definition of what a program is.

Classes:

Name Description
AcoupiProgram

A program is a collection of tasks.

NoUserPrompt

No user prompt annotation.

Attributes:

Name Type Description
ProgramConfig

Attributes#

B = TypeVar('B') module-attribute #

C = TypeVar('C', bound=BaseModel, covariant=False, contravariant=True) module-attribute #

ProgramConfig = TypeVar('ProgramConfig', bound=BaseModel) module-attribute #

Classes#

AcoupiProgram(program_config, app) #

Bases: ABC, Generic[ProgramConfig]

A program is a collection of tasks.

Methods:

Name Description
add_task

Add a task to the program.

add_task_to_queue

Add a task to a queue.

check

Check the configurations.

get_config_schema

Get the config class.

get_queue_names

Get the queue names.

get_worker_config

Get the worker config class.

on_end

End a deployment.

on_start

Start a deployment.

setup

Set up the program.

Attributes:

Name Type Description
app Celery
config ProgramConfig
config_schema Type[ProgramConfig]
logger Logger
tasks
worker_config Optional[WorkerConfig]
Attributes#
app = app instance-attribute #
config = program_config instance-attribute #
config_schema instance-attribute #
logger = get_task_logger(self.__class__.__name__) instance-attribute #
tasks = {} instance-attribute #
worker_config = None class-attribute instance-attribute #
Functions#
add_task(function, callbacks=None, schedule=None, queue=None, name=None) #

Add a task to the program.

add_task_to_queue(task_name, queue) #

Add a task to a queue.

check(config) #

Check the configurations.

This method should raise an exception if the configurations are invalid. The exception should be an instance of HealthCheckError.

User defined programs should override this method if they want to validate their configurations. The default implementation does nothing.

Ideally this method should be called before a deployment is made.

get_config_schema() classmethod #

Get the config class.

get_queue_names() classmethod #

Get the queue names.

get_worker_config() classmethod #

Get the worker config class.

on_end(deployment) #

End a deployment.

Called when the user ends a deployment.

This method should be overridden by user defined programs if they want to do something when the program ends. The default implementation does nothing.

on_start(deployment) #

Start a deployment.

Called when the user starts a deployment.

This method should be overridden by user defined programs if they want to do something when the program starts. The default implementation does nothing.

setup(config) #

Set up the program.

InvalidAcoupiConfiguration #

Bases: ValueError

Raised when a configuration is invalid.

NoUserPrompt #

No user prompt annotation.

Use this class to annotate fields that should not be prompted to the user.

acoupi.programs #

Modules:

Name Description
connected

Acoupi Connected Program.

core
default

Acoupi Default Program.

templates

Acoupi program templates.

test

Test Acoupi Program.

Classes:

Name Description
AcoupiProgram

A program is a collection of tasks.

AcoupiWorker

AcoupiWorker Class.

NoUserPrompt

No user prompt annotation.

WorkerConfig

WorkerConfig Class.

Attributes:

Name Type Description
DEFAULT_WORKER_CONFIG
ProgramConfig

Attributes#

DEFAULT_WORKER_CONFIG = WorkerConfig(workers=[AcoupiWorker(name='recording', queues=['recording'], concurrency=1), AcoupiWorker(name='default', queues=['celery'])]) module-attribute #

ProgramConfig = TypeVar('ProgramConfig', bound=BaseModel) module-attribute #

Classes#

AcoupiProgram(program_config, app) #

Bases: ABC, Generic[ProgramConfig]

A program is a collection of tasks.

Methods:

Name Description
add_task

Add a task to the program.

add_task_to_queue

Add a task to a queue.

check

Check the configurations.

get_config_schema

Get the config class.

get_queue_names

Get the queue names.

get_worker_config

Get the worker config class.

on_end

End a deployment.

on_start

Start a deployment.

setup

Set up the program.

Attributes:

Name Type Description
app Celery
config ProgramConfig
config_schema Type[ProgramConfig]
logger Logger
tasks
worker_config Optional[WorkerConfig]
Attributes#
app = app instance-attribute #
config = program_config instance-attribute #
config_schema instance-attribute #
logger = get_task_logger(self.__class__.__name__) instance-attribute #
tasks = {} instance-attribute #
worker_config = None class-attribute instance-attribute #
Functions#
add_task(function, callbacks=None, schedule=None, queue=None, name=None) #

Add a task to the program.

add_task_to_queue(task_name, queue) #

Add a task to a queue.

check(config) #

Check the configurations.

This method should raise an exception if the configurations are invalid. The exception should be an instance of HealthCheckError.

User defined programs should override this method if they want to validate their configurations. The default implementation does nothing.

Ideally this method should be called before a deployment is made.

get_config_schema() classmethod #

Get the config class.

get_queue_names() classmethod #

Get the queue names.

get_worker_config() classmethod #

Get the worker config class.

on_end(deployment) #

End a deployment.

Called when the user ends a deployment.

This method should be overridden by user defined programs if they want to do something when the program ends. The default implementation does nothing.

on_start(deployment) #

Start a deployment.

Called when the user starts a deployment.

This method should be overridden by user defined programs if they want to do something when the program starts. The default implementation does nothing.

setup(config) #

Set up the program.

AcoupiWorker #

Bases: BaseModel

AcoupiWorker Class.

Represents an individual worker instance within the Acoupi framework. It is responsible for executing tasks related to acoustic sensing and analysis. Each worker is implemented as a Celery worker and listens to designated queues for task execution and coordination.

Attributes:

Name Type Description
concurrency Optional[int]

Number of concurrent tasks the worker should run.

name str

Name of the worker. Should be unique among different workers.

queues List[str]

Queues the worker should listen to.

Attributes#
concurrency = None class-attribute instance-attribute #

Number of concurrent tasks the worker should run.

If None, the worker will run as many tasks as possible.

This setting should be set to 1 if the worker if at most one task can be run at a time. This is useful for tasks that are not thread safe or that require use of a resource that can only be used by one task at a time.

name instance-attribute #

Name of the worker. Should be unique among different workers.

queues = Field(default_factory=list) class-attribute instance-attribute #

Queues the worker should listen to.

If empty, the worker will listen to all queues.

NoUserPrompt #

No user prompt annotation.

Use this class to annotate fields that should not be prompted to the user.

WorkerConfig #

Bases: BaseModel

WorkerConfig Class.

A configuration class used to define the set of workers to be used for an Acoupi program. It allows you to specify the details of each worker, such as its name, concurrency level, and the queues it listens to.

Methods:

Name Description
unique_worker_names

Validate that all workers have unique names.

Attributes:

Name Type Description
workers List[AcoupiWorker]
Attributes#
workers = Field(default_factory=list) class-attribute instance-attribute #
Functions#
unique_worker_names(workers) #

Validate that all workers have unique names.

acoupi.programs.templates #

Acoupi program templates.

This module provides base classes and configuration schemas to simplify the creation of Acoupi programs.

Available Templates:

  • Basic Program: Provides a foundation for building Acoupi programs, including features for audio recording, metadata storage, and file management.
  • Base class: BasicProgram
  • Configuration schema: BasicProgramConfiguration
  • Messaging Program: Extends the BasicProgram with messaging capabilities, enabling programs to send messages and heartbeats via HTTP or MQTT.
  • Base class: MessagingProgram
  • Configuration schema: MessagingProgramConfiguration
  • Detection Program: Extends the MessagingProgram with audio detection capabilities, allowing programs to run detection models on recordings and generate messages based on the results.
  • Base class: DetectionProgram
  • Configuration schema: DetectionProgramConfiguration

Each template includes a base class that provides core functionality and a configuration schema to define the program's settings.

For detailed usage instructions, customization options, and examples, refer to the individual template documentation.

Modules:

Name Description
basic

Basic Program template module.

detection

Detection Program template module.

messaging

Messaging Program template module.

Classes:

Name Description
AudioConfiguration

Audio configuration schema.

BasicProgram

Basic Acoupi Program.

BasicProgramConfiguration

Configuration schema for a basic program.

DetectionProgram

Detection Program.

DetectionProgramConfiguration

Detection Program Configuration schema.

DetectionsConfiguration

Detection settings schema.

MessagingConfig

Messaging configuration schema.

MessagingProgram

Messaging Acoupi Program.

MessagingProgramConfiguration

Messaging Program Configuration schema.

PathsConfiguration

Data configuration schema.

Classes#

AudioConfiguration #

Bases: BaseModel

Audio configuration schema.

Attributes:

Name Type Description
chunksize Annotated[int, NoUserPrompt]

Chunksize of audio recording.

duration int

Duration of each audio recording in seconds.

interval int

Interval between each audio recording in seconds.

schedule_end time

End time for recording schedule.

schedule_start time

Start time for recording schedule.

Attributes#
chunksize = 8192 class-attribute instance-attribute #

Chunksize of audio recording.

duration = 3 class-attribute instance-attribute #

Duration of each audio recording in seconds.

interval = 10 class-attribute instance-attribute #

Interval between each audio recording in seconds.

schedule_end = Field(default=(datetime.time(hour=22, minute=30, second=0))) class-attribute instance-attribute #

End time for recording schedule.

schedule_start = Field(default=(datetime.time(hour=6, minute=0, second=0))) class-attribute instance-attribute #

Start time for recording schedule.

BasicProgram(program_config, app) #

Bases: AcoupiProgram[ProgramConfig]

Basic Acoupi Program.

This class provides a base for creating basic Acoupi programs. It offers essential features for audio recording, metadata storage, and file management.

Components:

  • Audio Recorder: Records audio clips according to the program's configuration.
  • Store: Provides an interface for storing and retrieving metadata associated with the program and its recordings.

Tasks:

Using the components above, this class creates and manages the following tasks:

  • Audio Recording: Records audio at regular intervals, configurable through the audio settings in the BasicProgramConfiguration schema.
  • File Management: Periodically performs file management operations, such as moving recordings from temporary to permanent storage.

Customization:

customise the program's behavior by overriding these methods:

  • get_recording_conditions: Define the specific conditions that must be met for audio recording to continue when the recording task is triggered by the scheduler.
  • get_recording_filters: Add filters to determine which recordings to save.
  • get_recording_callbacks: Define actions to perform after a recording is made.

Examples:

import datetime
from acoupi import components, data
from acoupi.programs.templates import (
    BasicProgram,
    BasicProgramConfiguration,
)


class Config(BasicProgramConfiguration):
    pass


class Program(BasicProgram):
    configuration_schema = Config

    def get_recording_conditions(self, config: Config):
        # Get the default recording conditions
        conditions = super().get_recording_conditions(
            config
        )
        return [
            components.IsInInterval(
                data.TimeInterval(
                    start=datetime.time(hour=3),
                    end=datetime.time(hour=6),
                )
            ),
            *conditions,
        ]

Methods:

Name Description
check

Check the program's components.

configure_recorder

Configure the audio recorder.

configure_store

Configure the metadata store.

create_file_management_task

Create the file management task.

create_recording_task

Create the recording task.

get_file_managers

Get the file managers.

get_recording_callbacks

Get the recording callbacks.

get_recording_conditions

Get the recording conditions.

get_recording_filters

Get the recording saving filters.

get_required_models

Get the required models for a recording to be considered ready.

on_end

Handle program end event.

on_start

Handle program start event.

register_file_management_task

Register the file management task.

register_recording_task

Register the recording task.

setup

Set up the basic program.

validate_dirs

Validate the directories used by the program.

Attributes:

Name Type Description
recorder AudioRecorder
store Store
worker_config
Attributes#
recorder instance-attribute #
store instance-attribute #
worker_config = DEFAULT_WORKER_CONFIG class-attribute instance-attribute #
Functions#
check(config) #

Check the program's components.

This method performs checks on the program's components to ensure they are functioning correctly. Currently, it only checks the PyAudio recorder if it is being used.

configure_recorder(config) #

Configure the audio recorder.

This method creates and configures an instance of the PyAudioRecorder based on the provided configuration.

Returns:

Type Description
AudioRecorder

The configured audio recorder instance.

configure_store(config) #

Configure the metadata store.

This method creates and configures an instance of the SqliteStore based on the provided configuration.

Returns:

Type Description
Store

The configured metadata store instance.

create_file_management_task(config) #

Create the file management task.

This method creates the task responsible for managing audio files.

Returns:

Type Description
Callable[[], None]

The file management task.

create_recording_task(config) #

Create the recording task.

This method creates the task responsible for recording audio.

Returns:

Type Description
Callable[[], Optional[Recording]]

The recording task.

get_file_managers(config) #

Get the file managers.

This method defines how audio recordings should be saved and managed. It returns a list of file managers that are responsible for determining the final storage location of each recording.

When a recording is marked for saving, the program iterates through the list of file managers in order. Each manager can either:

  • Return a path where the recording should be saved.
  • Return None to indicate that it cannot handle the recording, allowing the next manager in the list to be used.

By default, this method returns a list containing a single DateFileManager, which saves recordings in a structured folder hierarchy based on the recording date.

Returns:

Type Description
list[RecordingSavingManager]

A list of file manager instances.

get_recording_callbacks(config) #

Get the recording callbacks.

This method defines callbacks to be executed after a recording is completed. By default, it returns an empty list.

Returns:

Type Description
list[Callable[[Optional[Recording]], None]]

A list of recording callbacks.

get_recording_conditions(config) #

Get the recording conditions.

This method defines the conditions under which audio recording should be performed. By default, it uses the schedule defined in the configuration.

Returns:

Type Description
RecordingCondition

A recording condition.

get_recording_filters(config) #

Get the recording saving filters.

This method defines filters that determine which recordings should be saved permanently. By default, it returns an empty list, meaning all recordings are saved.

Returns:

Type Description
list[RecordingSavingFilter]

A list of recording saving filters.

get_required_models(config) #

Get the required models for a recording to be considered ready.

This method specifies which bioacoustic models must process a recording before it is considered "ready" to be moved from temporary storage.

By default, no models are required, meaning recordings are immediately considered ready. However, you can override this method to define specific models that must process the recordings based on the program's configuration.

Returns:

Type Description
list[str]

A list of model names that are required to process a recording before it is considered ready.

on_end(deployment) #

Handle program end event.

This method is called when the program ends. It updates the deployment information in the metadata store, and ensure that remaining tasks are completed before the program is stopped.

Tasks to check are: - file_management_task (if implemented). Check if there are remaining files in the temporary directory and move them to the correct directory.

on_start(deployment) #

Handle program start event.

This method is called when the program starts and stores the deployment information in the metadata store.

register_file_management_task(config) #

Register the file management task.

This method registers the file management task with the program's scheduler.

register_recording_task(config) #

Register the recording task.

This method registers the recording task with the program's scheduler.

setup(config) #

Set up the basic program.

This method initialises the program's components (audio recorder, store, and file manager), registers the recording and file management tasks, and performs necessary setup operations.

validate_dirs(config) #

Validate the directories used by the program.

This method ensures that the necessary directories for storing audio and metadata exist. If they don't, it creates them.

BasicProgramConfiguration #

Bases: BaseModel

Configuration schema for a basic program.

Attributes:

Name Type Description
microphone MicrophoneConfig

Microphone configuration.

paths PathsConfiguration

Data configuration.

recording AudioConfiguration

Audio configuration.

timezone TimeZoneName

Time zone where the device will be deployed.

Attributes#
microphone instance-attribute #

Microphone configuration.

paths = Field(default_factory=PathsConfiguration) class-attribute instance-attribute #

Data configuration.

recording = Field(default_factory=AudioConfiguration) class-attribute instance-attribute #

Audio configuration.

timezone = Field(default=(TimeZoneName('Europe/London'))) class-attribute instance-attribute #

Time zone where the device will be deployed.

DetectionProgram(program_config, app) #

Bases: MessagingProgram[C], ABC

Detection Program.

This abstract class extends the MessagingProgram to provide a foundation for detection programs. It includes functionality for configuring and running a detection model, processing model outputs, and generating messages based on detection results.

Components:

  • Detection Model: An audio detection model that processes recordings to identify specific sounds or events.

Tasks:

  • Detection Task: Runs the detection model on audio recordings and processes the results, including:
    • Cleaning the model output using get_output_cleaners. By default this includes a threshold cleaner that filters out detections below a specified threshold.
    • Filtering the processed output using get_processing_filters.
    • Generating messages based on the results using get_message_factories.

Inherited Components:

This class inherits the following components from MessagingProgram:

  • Messenger: A component responsible for sending messages and heartbeats via a configured communication protocol (HTTP or MQTT).
  • Message Store: A database for storing messages before they are sent.

This class also inherits the following components from BasicProgram:

  • Audio Recorder: Records audio clips according to the program's configuration.
  • File Manager: Manages the storage of audio recordings, including saving them to permanent storage and handling temporary files.
  • Store: Provides an interface for storing and retrieving metadata associated with the program and its recordings.

Inherited Tasks:

This program inherits the following tasks from MessagingProgram:

  • Heartbeat Task: Periodically sends heartbeat messages to indicate that the program is running.
  • Send Messages Task: Periodically sends messages from the message store to the configured messenger.

This program also inherits the following tasks from BasicProgram:

  • Audio Recording: Records audio at regular intervals, configurable through the audio settings in the BasicProgramConfiguration schema.
  • File Management: Periodically performs file management operations, such as moving recordings from temporary to permanent storage.

Customization:

You can customise the detection process by overriding the following methods:

  • configure_model: Required. Implement this method to configure and return an instance of your detection model.
  • get_output_cleaners: To clean up the model's raw output. Either override this method completely or call super().get_output_cleaners() to include the default threshold cleaner.
  • get_processing_filters: To filter recordings before processing.
  • get_message_factories: To customise the messages generated based on detection results.

Examples:

from acoupi.programs.templates import (
    DetectionProgram,
    DetectionProgramConfiguration,
)
from acoupi.components import types

# This import should be replaced with your actual model import
# This model does not exist in the acoupi package
from acoupi.models import SimpleBirdModel


class MyBirdDetectionConfiguration(
    DetectionProgramConfiguration
):
    # Add any configuration specific to bird detection
    pass


class MyBirdDetectionProgram(
    DetectionProgram[MyBirdDetectionConfiguration]
):
    configuration_schema = MyBirdDetectionConfiguration

    def configure_model(
        self, config: MyBirdDetectionConfiguration
    ) -> types.Model:
        return (
            SimpleBirdModel()
        )  # Replace with your actual model

Methods:

Name Description
check

Check the program's components.

configure_model

Configure the detection model.

create_detection_task

Create the detection task.

get_message_factories

Get the message factories.

get_output_cleaners

Get the model output cleaners.

get_processing_filters

Get the processing filters.

get_recording_callbacks

Get the recording callbacks.

get_required_models
on_end

Handle program end event.

setup

Set up the Detection Program.

Attributes:

Name Type Description
model Model

The configured detection model instance.

Attributes#
model instance-attribute #

The configured detection model instance.

Functions#
check(config) #

Check the program's components.

This method performs checks on the program's components, including the detection model, to ensure they are functioning correctly.

Parameters:

Name Type Description Default
config C

The program's configuration.

required
configure_model(config) abstractmethod #

Configure the detection model.

This method must be implemented by subclasses to configure and return an instance of the detection model.

Parameters:

Name Type Description Default
config C

The program's configuration.

required

Returns:

Type Description
Model

The configured detection model instance.

create_detection_task(config) #

Create the detection task.

This method creates the task responsible for running the detection model and processing its output.

Parameters:

Name Type Description Default
config C

The program's configuration.

required

Returns:

Type Description
Callable[[Recording], None]

The detection task.

Notes

This method uses the generate_detection_task function to create the detection task. You can override this method to customise the task creation process.

get_message_factories(config) #

Get the message factories.

This method can be overridden to define a list of message factories that will be used to generate messages based on the processed detection results.

Parameters:

Name Type Description Default
config C

The program's configuration.

required

Returns:

Type Description
List[MessageBuilder]

A list of message factories.

get_output_cleaners(config) #

Get the model output cleaners.

This method can be overridden to define a list of output cleaners that will be applied to the model's raw output to clean it up or extract relevant information. By default, it includes a threshold cleaner that filters out detections below a specified threshold.

Parameters:

Name Type Description Default
config C

The program's configuration.

required

Returns:

Type Description
List[ModelOutputCleaner]

A list of model output cleaners.

get_processing_filters(config) #

Get the processing filters.

This method can be overridden to define a list of processing filters that will be applied to each recording before it is processed by the model. These filters determine whether a recording should be processed at all.

This can be useful to avoid unnecessary model processing when it is not required by the context or based on simple heuristics on the recording content. Model processing can be computationally expensive, so it is beneficial to avoid it if possible.

Parameters:

Name Type Description Default
config C

The program's configuration.

required

Returns:

Type Description
List[ProcessingFilter]

A list of processing filters.

get_recording_callbacks(config) #

Get the recording callbacks.

This method adds the detection task as a callback to be executed after each recording is completed.

Parameters:

Name Type Description Default
config C

The program's configuration.

required

Returns:

Type Description
List[Callable]

A list of recording callbacks, including the detection task.

get_required_models(config) #
on_end(deployment) #

Handle program end event.

This method is called when the program ends. It updates the deployment information in the metadata store, and ensure that remaining tasks are completed before the program is stopped.

Tasks to check are: - file_management_task (if implemented). Check if there are remaining files in the temporary directory and move them to the correct directory. - detection_task (if implemented). Check if there are remaining files to be processed and process them.

setup(config) #

Set up the Detection Program.

This method initialises the detection model and performs any necessary setup.

DetectionProgramConfiguration #

Bases: MessagingProgramConfiguration

Detection Program Configuration schema.

This schema extends the MessagingProgramConfiguration to include any additional settings required for detection programs.

Attributes:

Name Type Description
detections DetectionsConfiguration

Detection settings.

Attributes#
detections = Field(default_factory=DetectionsConfiguration) class-attribute instance-attribute #

Detection settings.

DetectionsConfiguration #

Bases: BaseModel

Detection settings schema.

Attributes:

Name Type Description
threshold float

Detections with a score below this threshold will be ignored.

Attributes#
threshold = 0.2 class-attribute instance-attribute #

Detections with a score below this threshold will be ignored.

MessagingConfig #

Bases: BaseModel

Messaging configuration schema.

This schema defines the configuration for messaging components, including the message store, message sending interval, heartbeat interval, and messenger configurations (HTTP or MQTT).

Attributes:

Name Type Description
heartbeat_interval int

Interval between sending heartbeats in seconds.

http Optional[HTTPConfig]

HTTP messenger configuration.

message_send_interval int

Interval between sending messages in seconds.

messages_db Path

Path to the message database.

mqtt Optional[MQTTConfig]

MQTT messenger configuration.

Attributes#
heartbeat_interval = 60 * 60 class-attribute instance-attribute #

Interval between sending heartbeats in seconds.

http = None class-attribute instance-attribute #

HTTP messenger configuration.

message_send_interval = 120 class-attribute instance-attribute #

Interval between sending messages in seconds.

messages_db = Field(default=(Path.home() / 'storages' / 'messages.db')) class-attribute instance-attribute #

Path to the message database.

mqtt = None class-attribute instance-attribute #

MQTT messenger configuration.

MessagingProgram(program_config, app) #

Bases: BasicProgram[ProgramConfig]

Messaging Acoupi Program.

This class extends the BasicProgram to provide functionality for sending messages and heartbeats with a configured messenger (HTTP or MQTT).

Components:

  • Messenger: A component responsible for sending messages and heartbeats via a configured communication protocol (HTTP or MQTT).
  • Message Store: A database for storing messages before they are sent.

Tasks:

Using the components above, this class creates and manages the following tasks:

  • Heartbeat Task: Periodically sends heartbeat messages to indicate that the program is running.
  • Send Messages Task: Periodically sends messages from the message store to the configured messenger.

This program includes all the functionality of BasicProgram, inheriting its components and tasks for audio recording, metadata storage, and file management.

Methods:

Name Description
check

Check the messenger connection.

configure_message_store

Configure the message store.

configure_messenger

Configure the messenger.

create_heartbeat_task

Create the heartbeat task.

create_messaging_task

Create the messaging task.

on_end

End a deployment.

register_heartbeat_task

Register the heartbeat task.

register_messaging_task

Register the messaging task.

setup

Set up the Messaging Program.

validate_dirs

Validate the directories used by the program.

Attributes:

Name Type Description
message_store MessageStore

The configured message store instance.

messenger Optional[Messenger]

The configured messenger instance.

Attributes#
message_store instance-attribute #

The configured message store instance.

messenger instance-attribute #

The configured messenger instance.

Functions#
check(config) #

Check the messenger connection.

This method checks the connection to the configured messenger (HTTP or MQTT).

configure_message_store(config) #

Configure the message store.

This method creates and configures an instance of the SqliteMessageStore based on the provided configuration.

Returns:

Type Description
MessageStore

The configured message store instance.

configure_messenger(config) #

Configure the messenger.

This method creates and configures an instance of the HTTPMessenger or MQTTMessenger based on the provided configuration.

Returns:

Type Description
Messenger

The configured messenger instance.

Raises:

Type Description
ValueError

If no messenger configuration is provided.

create_heartbeat_task(config) #

Create the heartbeat task.

This method creates the task responsible for sending heartbeats.

Returns:

Type Description
Callable

The heartbeat task.

create_messaging_task(config) #

Create the messaging task.

This method creates the task responsible for sending messages.

Returns:

Type Description
Callable

The messaging task.

on_end(deployment) #

End a deployment.

This method is called when the program is stopped. It updates the deployment information in the metadata store, and ensure and performs any necessary cleanup tasks (i.e., file_management_task, messaging_task).

register_heartbeat_task(config) #

Register the heartbeat task.

This method registers the heartbeat task with the program's scheduler.

register_messaging_task(config) #

Register the messaging task.

This method registers the messaging task with the program's scheduler.

setup(config) #

Set up the Messaging Program.

This method initialises the message store and messenger, registers the messaging and heartbeat tasks, and performs any necessary setup.

validate_dirs(config) #

Validate the directories used by the program.

This method ensures that the necessary directories for storing audio and metadata exist. If they don't, it creates them.

MessagingProgramConfiguration #

Bases: BasicProgramConfiguration

Messaging Program Configuration schema.

This schema extends the BasicProgramConfiguration to include settings for messaging functionality.

Attributes:

Name Type Description
messaging MessagingConfig
Attributes#
messaging instance-attribute #

PathsConfiguration #

Bases: BaseModel

Data configuration schema.

Attributes:

Name Type Description
db_metadata Path

Path to the metadata database.

recordings Path

Directory for storing audio files permanently.

tmp_audio Path

Temporary directory for storing audio files.

Attributes#
db_metadata = Field(default_factory=(lambda: Path.home() / 'storages' / 'metadata.db')) class-attribute instance-attribute #

Path to the metadata database.

recordings = Field(default_factory=(lambda: Path.home() / 'storages' / 'recordings')) class-attribute instance-attribute #

Directory for storing audio files permanently.

tmp_audio = Field(default_factory=get_temp_dir) class-attribute instance-attribute #

Temporary directory for storing audio files.