Skip to content

Tasks#

acoupi.tasks #

Process templates for Acoupi.

Acoupi offers a collection of process templates to assist in the creation of recording, detecting, and data sending processes. While Acoupi includes a variety of components to construct these processes, users may prefer to use their own components. By utilizing the provided templates, users can ensure that their custom processes integrate with Acoupi and adhere to standardized building practices. The use of templates also allows for effortless customization of processes.

The templates provided take the form of functions that return a function that can be used to start a process. Each template takes a set of arguments that are used to construct the process. The arguments are Acoupi components of the appropriate type, such as a message store, messenger, model, etc. Any object that implements the appropriate interface can be used as an argument. This allows users to use out-of-the-box components or components that they have created themselves.

Modules:

Name Description
detection

Detection Task.

heartbeat
management

Management Task for recordings.

messaging

Messaging Task.

recording

Recording Task.

schedules

Custom Celery schedules used by Acoupi tasks.

summary

Summary Task.

Classes:

Name Description
aligned_schedule

Run tasks on absolute clock boundaries at a fixed interval.

Functions:

Name Description
generate_detection_task

Generate a detection task.

generate_file_management_task

Generate a file management task.

generate_heartbeat_task

Generate a heartbeat task.

generate_recording_task

Generate a recording task.

generate_send_messages_task

Generate a send data task.

generate_summariser_task

Generate a summariser task.

Classes#

aligned_schedule(run_every, offset_seconds=0, nowfun=None, app=None) #

Bases: BaseSchedule

Run tasks on absolute clock boundaries at a fixed interval.

This behaves like an interval schedule, but aligns executions to wall clock boundaries instead of anchoring them to the previous run time. For example, a run_every of 10 seconds with offset_seconds=0 will run at seconds 0, 10, 20, 30, 40, 50 of every minute.

Parameters:

Name Type Description Default
run_every timedelta

Interval between valid schedule slots.

required
offset_seconds int

Offset applied within each interval. With run_every=10 seconds and offset_seconds=5, the schedule runs at :05, :15, :25 and so on. Must be non-negative and smaller than run_every.

0
nowfun callable

Override used by Celery to obtain the current time. Primarily useful for testing.

None
app Celery

Celery application instance passed through to BaseSchedule.

None

Methods:

Name Description
is_due

Attributes:

Name Type Description
offset_seconds
run_every
seconds float
Attributes#
offset_seconds = offset_seconds instance-attribute #
run_every = run_every instance-attribute #
seconds property #
Methods:#
is_due(last_run_at) #

Functions:#

generate_detection_task(store, model, message_store, logger=logger, output_cleaners=None, processing_filters=None, message_factories=None) #

Generate a detection task.

Parameters:

Name Type Description Default
store Store

The store to store the model output.

required
model Model

The model to run on the recording.

required
message_store MessageStore

The message store to store the messages.

required
logger Logger

The logger to log messages, by default logger.

logger
output_cleaners Optional[List[ModelOutputCleaner]]

The output cleaners to clean the model output, by default None.

None
processing_filters Optional[List[ProcessingFilter]]

The processing filters to check if the recording should be processed, by default None.

None
message_factories Optional[List[MessageBuilder]]

The message factories to create messages, by default None.

None
Note

The detection task calls the following methods:

  1. filter.should_process_recording(recording) -> bool
  2. model.run(recording) -> data.ModelOutput
    • Run the model on the recording and return the output.
    • See types.Model.
  3. cleaner.clean(model_output) -> data.ModelOutput
  4. store.store_model_output(model_output) -> None
  5. message_factory.build_message(model_output) -> data.Message
  6. message_store.store_message(message) -> None

generate_file_management_task(store, file_managers, logger=logger, file_filters=None, required_models=None, management_conditions=None, tmp_path=TEMP_PATH) #

Generate a file management task.

Parameters:

Name Type Description Default
store Store

The store to get and update recordings.

required
file_managers List[RecordingSavingManager]

The file managers to save recordings.

required
logger Logger

The logger to log messages, by default logger.

logger
file_filters Optional[List[RecordingSavingFilter]]

The file filters to determine if recordings should be saved, by default None.

None
management_conditions Optional[List[RecordingManagementCondition]]

Conditions that must return True before a recording is managed. If one returns False, the recording stays in temporary storage. If no conditions are provided, all recordings are managed.

None
required_models Optional[List[str]]

Model names that must process a recording before it can be managed, by default None.

None
tmp_path Path

The path where recordings are saved temporarily, by default TEMP_PATH.

TEMP_PATH
Notes

The file management task calls the following methods:

  1. store.get_recordings_by_path(paths) -> List[Tuple[data.Recording, List[data.ModelOutput]]]
  2. filter.should_save_recording(recording, model_outputs) -> bool
  3. manager.save_recording(recording, model_outputs) -> Path
  4. store.update_recording_path(recording, new_path) -> None

generate_heartbeat_task(messengers, metrics=None, serializer=_to_json, logger=logger) #

Generate a heartbeat task.

The heartbeat task creates a heartbeat payload, optionally augments it with runtime metrics, serializes it, and emits it using the provided messengers.

Metrics are supplied as callables so values are captured at send time. Each callable must return an acoupi.data.Metric.

The serializer receives the Heartbeat model and must return the message content as str or bytes.

Parameters:

Name Type Description Default
messengers list[Messenger]

A list of messenger instances to pass the heartbeat message to.

required
metrics list[MetricFn] | None

Optional metric collector functions evaluated when the task runs. See acoupi.devices.metrics for built-in heartbeat metrics.

None
serializer HeartbeatSerializer

Function used to serialize the heartbeat payload before sending.

_to_json
logger Logger

Logger instance for logging heartbeat status.

logger

Returns:

Type Description
Callable[[], None]

A function that sends a heartbeat message when called.

Notes

When a metric function needs arguments bound in advance, prefer functools.partial over lambdas so the task configuration is more serialization-friendly in Celery environments.

Examples:

>>> from functools import partial
>>> from pathlib import Path
>>> from acoupi.devices.metrics import (
...     get_free_memory,
...     get_remaining_storage,
... )
>>> task = generate_heartbeat_task(
...     messengers=[messenger],
...     metrics=[
...         get_free_memory,
...         partial(get_remaining_storage, Path("/tmp")),
...     ],
... )
>>> task()

generate_recording_task(recorder, store, logger=logger, recording_conditions=None) #

Generate a recording task.

Parameters:

Name Type Description Default
recorder AudioRecorder

The audio recorder to record audio.

required
store Store

The store to store the recording metadata.

required
logger Logger

The logger to log messages, by default logger.

logger
recording_conditions Optional[List[T]]

The recording conditions to check if audio should be recorded, by default None.

None

Returns:

Type Description
Callable[[], Optional[Recording]]

The recording metadata if the recording was successful, otherwise None.

Notes

The recording task calls the following methods:

  1. condition.should_record(now) -> bool
  2. store.get_current_deployment() -> data.Deployment
  3. recorder.record(deployment) -> data.Recording
  4. store.store_recording(recording) -> None

generate_send_messages_task(message_store, messengers=None, logger=logger) #

Generate a send data task.

Parameters:

Name Type Description Default
message_store MessageStore

The message store to get and store messages.

required
messengers Optional[List[Messenger]]

The messengers to send messages, by default None.

None
logger Logger

The logger to log messages, by default logger.

logger
Notes

The send data task calls the following methods:

  1. message_store.get_unsent_messages() -> List[data.Message]
  2. messenger.send_message(message) -> data.Response
  3. message_store.store_response(response) -> None

generate_summariser_task(summarisers, message_store, logger=logger) #

Generate a summariser task.

Parameters:

Name Type Description Default
summarisers List[Summariser]

The summarisers to generate the summary message.

required
message_store MessageStore

The message store to store the summary message.

required
logger Logger

The logger to log messages, by default logger.

logger
Notes

The summary process calls the following methods:

  1. summariser.build_summary(now) -> data.Message
  2. message_store.store_message(message) -> None