Skip to content

Tasks#

acoupi.tasks #

Process templates for Acoupi.

Acoupi offers a collection of process templates to assist in the creation of recording, detecting, and data sending processes. While Acoupi includes a variety of components to construct these processes, users may prefer to use their own components. By utilizing the provided templates, users can ensure that their custom processes integrate with Acoupi and adhere to standardized building practices. The use of templates also allows for effortless customization of processes.

The templates provided take the form of functions that return a function that can be used to start a process. Each template takes a set of arguments that are used to construct the process. The arguments are Acoupi components of the appropriate type, such as a message store, messenger, model, etc. Any object that implements the appropriate interface can be used as an argument. This allows users to use out-of-the-box components or components that they have created themselves.

Functions#

generate_detection_task(store, model, message_store, logger=logger, output_cleaners=None, processing_filters=None, message_factories=None) #

Generate a detection task.

Parameters:

Name Type Description Default
store Store

The store to store the model output.

required
model Model

The model to run on the recording.

required
message_store MessageStore

The message store to store the messages.

required
logger Logger

The logger to log messages, by default logger.

logger
output_cleaners Optional[List[ModelOutputCleaner]]

The output cleaners to clean the model output, by default None.

None
processing_filters Optional[List[ProcessingFilter]]

The processing filters to check if the recording should be processed, by default None.

None
message_factories Optional[List[MessageBuilder]]

The message factories to create messages, by default None.

None
Note

The detection task calls the following methods:

  1. filter.should_process_recording(recording) -> bool
  2. model.run(recording) -> data.ModelOutput
    • Run the model on the recording and return the output.
    • See types.Model.
  3. cleaner.clean(model_output) -> data.ModelOutput
  4. store.store_model_output(model_output) -> None
  5. message_factory.build_message(model_output) -> data.Message
  6. message_store.store_message(message) -> None

generate_file_management_task(store, file_managers, logger=logger, file_filters=None, required_models=None, tmp_path=TEMP_PATH) #

Generate a file management task.

Parameters:

Name Type Description Default
store Store

The store to get and update recordings.

required
file_managers List[RecordingSavingManager]

The file managers to save recordings.

required
logger Logger

The logger to log messages, by default logger.

logger
file_filters Optional[List[RecordingSavingFilter]]

The file filters to determine if recordings should be saved, by default None.

None
required_models Optional[List[str]]

The required models that need to be saved, by default None.

None
tmp_path Path

The path where recordings are saved temporarily, by default TEMP_PATH.

TEMP_PATH
Notes

The file management task calls the following methods:

  1. store.get_recordings_by_path(paths) -> List[Tuple[data.Recording, List[data.ModelOutput]]]
  2. filter.should_save_recording(recording, model_outputs) -> bool
  3. manager.save_recording(recording, model_outputs) -> Path
  4. store.update_recording_path(recording, new_path) -> None

generate_heartbeat_task(messengers, logger=logger) #

Generate a heartbeat task.

The heartbeat task creates a heartbeat message and emits them using the provided messengers.

Parameters:

Name Type Description Default
messengers Optional[List[Messenger]]

A list of messenger instances to pass the heartbeat message to.

required
logger Logger

Logger instance for logging heartbeat status.

logger

Returns:

Type Description
Callable[[], None]

A function that sends a heartbeat message when called.

generate_recording_task(recorder, store, logger=logger, recording_conditions=None) #

Generate a recording task.

Parameters:

Name Type Description Default
recorder AudioRecorder

The audio recorder to record audio.

required
store Store

The store to store the recording metadata.

required
logger Logger

The logger to log messages, by default logger.

logger
recording_conditions Optional[List[T]]

The recording conditions to check if audio should be recorded, by default None.

None

Returns:

Type Description
Callable[[], Optional[Recording]]

The recording metadata if the recording was successful, otherwise None.

Notes

The recording task calls the following methods:

  1. condition.should_record(now) -> bool
  2. store.get_current_deployment() -> data.Deployment
  3. recorder.record(deployment) -> data.Recording
  4. store.store_recording(recording) -> None

generate_send_messages_task(message_store, messengers=None, logger=logger) #

Generate a send data task.

Parameters:

Name Type Description Default
message_store MessageStore

The message store to get and store messages.

required
messengers Optional[List[Messenger]]

The messengers to send messages, by default None.

None
logger Logger

The logger to log messages, by default logger.

logger
Notes

The send data task calls the following methods:

  1. message_store.get_unsent_messages() -> List[data.Message]
  2. messenger.send_message(message) -> data.Response
  3. message_store.store_response(response) -> None

generate_summariser_task(summarisers, message_store, logger=logger) #

Generate a summariser task.

Parameters:

Name Type Description Default
summarisers List[Summariser]

The summarisers to generate the summary message.

required
message_store MessageStore

The message store to store the summary message.

required
logger Logger

The logger to log messages, by default logger.

logger
Notes

The summary process calls the following methods:

  1. summariser.build_summary(now) -> data.Message
  2. message_store.store_message(message) -> None