Skip to content

data_transfer.aind_watchdog

WatchdogSettings

Bases: ServiceSettings

Settings for the WatchdogDataTransferService.

Configures data transfer operations including destination paths, scheduling, and integration with the AIND watchdog service.

settings_customise_sources classmethod

settings_customise_sources(
    settings_cls: Type[BaseSettings],
    init_settings: PydanticBaseSettingsSource,
    env_settings: PydanticBaseSettingsSource,
    dotenv_settings: PydanticBaseSettingsSource,
    file_secret_settings: PydanticBaseSettingsSource,
) -> Tuple[PydanticBaseSettingsSource, ...]

Customizes the settings sources to include the safe YAML settings source.

Parameters:

Name Type Description Default
settings_cls Type[BaseSettings]

The settings class

required
init_settings PydanticBaseSettingsSource

The initial settings source

required
env_settings PydanticBaseSettingsSource

The environment settings source

required
dotenv_settings PydanticBaseSettingsSource

The dotenv settings source

required
file_secret_settings PydanticBaseSettingsSource

The file secret settings source

required

Returns:

Type Description
Tuple[PydanticBaseSettingsSource, ...]

Tuple[PydanticBaseSettingsSource, ...]: A tuple of settings sources

Source code in src/clabe/services.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
@classmethod
def settings_customise_sources(
    cls,
    settings_cls: t.Type[ps.BaseSettings],
    init_settings: ps.PydanticBaseSettingsSource,
    env_settings: ps.PydanticBaseSettingsSource,
    dotenv_settings: ps.PydanticBaseSettingsSource,
    file_secret_settings: ps.PydanticBaseSettingsSource,
) -> t.Tuple[ps.PydanticBaseSettingsSource, ...]:
    """
    Customizes the settings sources to include the safe YAML settings source.

    Args:
        settings_cls: The settings class
        init_settings: The initial settings source
        env_settings: The environment settings source
        dotenv_settings: The dotenv settings source
        file_secret_settings: The file secret settings source

    Returns:
        Tuple[PydanticBaseSettingsSource, ...]: A tuple of settings sources
    """
    return (
        init_settings,
        *(
            _SafeYamlSettingsSource(settings_cls, yaml_file=p, yaml_config_section=cls.__yml_section__)
            for p in KNOWN_CONFIG_FILES
        ),
        env_settings,
        dotenv_settings,
        file_secret_settings,
    )

WatchdogDataTransferService

WatchdogDataTransferService(
    source: PathLike | list[PathLike],
    settings: WatchdogSettings,
    session: Session,
    *,
    validate: bool = True,
    email_from_experimenter_builder: Optional[
        Callable[[str], str]
    ] = lambda user_name: f"{user_name}@alleninstitute.org",
)

Bases: DataTransfer[WatchdogSettings]

A data transfer service that uses the aind-watchdog-service to monitor and transfer data.

Integrates with the AIND data transfer infrastructure to automatically monitor directories for new data and transfer it to specified destinations with proper metadata handling and validation.

Methods:

Name Description
transfer

Executes the data transfer by generating a manifest configuration

validate

Validates the Watchdog service and its configuration

is_valid_project_name

Checks if the project name is valid

is_running

Checks if the Watchdog service is currently running

force_restart

Attempts to restart the Watchdog application

dump_manifest_config

Dumps the manifest configuration to a YAML file

prompt_input

Prompts the user to confirm manifest generation

Initializes the WatchdogDataTransferService.

Parameters:

Name Type Description Default
source PathLike | list[PathLike]

The source directory or file to monitor

required
settings WatchdogSettings

Configuration for the watchdog service

required
session Session

The session data from aind-behavior-services

required
validate bool

Whether to validate the project name

True
session_name

Name of the session

required
email_from_experimenter_builder Optional[Callable[[str], str]]

Function to build email from experimenter name

lambda user_name: f'{user_name}@alleninstitute.org'
Source code in src/clabe/data_transfer/aind_watchdog.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def __init__(
    self,
    source: PathLike | list[PathLike],
    settings: WatchdogSettings,
    session: Session,
    *,
    validate: bool = True,
    email_from_experimenter_builder: Optional[Callable[[str], str]] = lambda user_name: (
        f"{user_name}@alleninstitute.org"
    ),
) -> None:
    """
    Initializes the WatchdogDataTransferService.

    Args:
        source: The source directory or file to monitor
        settings: Configuration for the watchdog service
        session: The session data from aind-behavior-services
        validate: Whether to validate the project name
        session_name: Name of the session
        email_from_experimenter_builder: Function to build email from experimenter name
    """
    self._settings = settings
    self._sources = source if isinstance(source, list) else [source]

    self._session = session

    _default_exe = os.environ.get("WATCHDOG_EXE", None)
    _default_config = os.environ.get("WATCHDOG_CONFIG", None)

    if _default_exe is None or _default_config is None:
        raise ValueError("WATCHDOG_EXE and WATCHDOG_CONFIG environment variables must be defined.")

    self.executable_path = Path(_default_exe)
    self.config_path = Path(_default_config)

    self._watch_config: Optional[WatchConfig] = None
    self._manifest_config: Optional[ManifestConfig] = None

    self._validate_project_name = validate

    if validate:
        self.validate()

    self._watch_config = WatchConfig.model_validate(self._read_yaml(self.config_path))

    self._email_from_experimenter_builder = email_from_experimenter_builder

settings property

settings: TSettings

Returns the settings for the data transfer service.

Returns:

Name Type Description
TSettings TSettings

The service settings

transfer

transfer() -> None

Executes the data transfer by generating a Watchdog manifest configuration.

Creates and deploys a manifest configuration file that the watchdog service will use to monitor and transfer data.

Source code in src/clabe/data_transfer/aind_watchdog.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def transfer(self) -> None:
    """
    Executes the data transfer by generating a Watchdog manifest configuration.

    Creates and deploys a manifest configuration file that the watchdog service
    will use to monitor and transfer data.
    """
    try:
        if not self.is_running():
            logger.warning("Watchdog service is not running. Attempting to start it.")
            try:
                self.force_restart(kill_if_running=False)
            except subprocess.CalledProcessError as e:
                logger.error("Failed to start watchdog service. %s", e)
                raise RuntimeError("Failed to start watchdog service.") from e
            else:
                if not self.is_running():
                    logger.error("Failed to start watchdog service.")
                    raise RuntimeError("Failed to start watchdog service.")
                else:
                    logger.info("Watchdog service restarted successfully.")

        logger.debug("Creating watchdog manifest config.")

        if self._watch_config is None:
            raise ValueError("Watchdog config is not set.")

        self._manifest_config = self._create_manifest_from_session(session=self._session)
        assert self._manifest_config.name is not None, "Manifest config name must be set."
        _manifest_path = self.dump_manifest_config(
            path=Path(self._watch_config.flag_dir) / self._manifest_config.name
        )
        logger.info("Watchdog manifest config created successfully at %s.", _manifest_path)

    except (pydantic.ValidationError, ValueError, IOError) as e:
        logger.error("Failed to create watchdog manifest config. %s", e)
        raise

validate

validate() -> bool

Validates the Watchdog service and its configuration.

Checks for required executables, configuration files, service status, and project name validity.

Returns:

Type Description
bool

True if the service is valid, False otherwise

Raises:

Type Description
FileNotFoundError

If required files are missing

HTTPError

If the project name validation fails

Source code in src/clabe/data_transfer/aind_watchdog.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def validate(self) -> bool:
    """
    Validates the Watchdog service and its configuration.

    Checks for required executables, configuration files, service status,
    and project name validity.

    Returns:
        True if the service is valid, False otherwise

    Raises:
        FileNotFoundError: If required files are missing
        HTTPError: If the project name validation fails
    """
    logger.debug("Attempting to validate Watchdog service.")
    if not self.executable_path.exists():
        raise FileNotFoundError(f"Executable not found at {self.executable_path}")
    if not self.config_path.exists():
        raise FileNotFoundError(f"Config file not found at {self.config_path}")

    if not self.is_running():
        logger.warning(
            "Watchdog service is not running. \
                            After the session is over, \
                            the launcher will attempt to forcefully restart it"
        )
        return False

    if self.settings.project_name is None:
        logger.warning("Watchdog project name is not set. Skipping validation.")
    else:
        try:
            _valid_proj = self.is_valid_project_name()
            if not _valid_proj:
                logger.warning("Watchdog project name is not valid.")
        except HTTPError as e:
            logger.error("Failed to fetch project names from endpoint. %s", e)
            raise
        return _valid_proj

    return True

is_valid_project_name

is_valid_project_name() -> bool

Checks if the project name is valid by querying the metadata service.

Returns:

Type Description
bool

True if the project name is valid, False otherwise

Source code in src/clabe/data_transfer/aind_watchdog.py
203
204
205
206
207
208
209
210
211
def is_valid_project_name(self) -> bool:
    """
    Checks if the project name is valid by querying the metadata service.

    Returns:
        True if the project name is valid, False otherwise
    """
    project_names = self._get_project_names()
    return self._settings.project_name in project_names

is_running

is_running() -> bool

Checks if the Watchdog service is currently running.

Returns:

Type Description
bool

True if the service is running, False otherwise

Source code in src/clabe/data_transfer/aind_watchdog.py
454
455
456
457
458
459
460
461
462
463
464
465
def is_running(self) -> bool:
    """
    Checks if the Watchdog service is currently running.

    Returns:
        True if the service is running, False otherwise
    """
    output = subprocess.check_output(
        ["tasklist", "/FI", f"IMAGENAME eq {self.executable_path.name}"], shell=True, encoding="utf-8"
    )
    processes = [line.split()[0] for line in output.splitlines()[2:]]
    return len(processes) > 0

force_restart

force_restart(kill_if_running: bool = True) -> Popen[bytes]

Attempts to restart the Watchdog application.

Parameters:

Name Type Description Default
kill_if_running bool

Whether to terminate the service if it's already running

True

Returns:

Type Description
Popen[bytes]

A subprocess.Popen object representing the restarted service

Source code in src/clabe/data_transfer/aind_watchdog.py
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
def force_restart(self, kill_if_running: bool = True) -> subprocess.Popen[bytes]:
    """
    Attempts to restart the Watchdog application.

    Args:
        kill_if_running: Whether to terminate the service if it's already running

    Returns:
        A subprocess.Popen object representing the restarted service
    """
    if kill_if_running is True:
        while self.is_running():
            subprocess.run(["taskkill", "/IM", self.executable_path.name, "/F"], shell=True, check=True)

    cmd_factory = "{exe} -c {config}".format(exe=self.executable_path, config=self.config_path)

    return subprocess.Popen(cmd_factory, start_new_session=True, shell=True)

dump_manifest_config

dump_manifest_config(
    path: Optional[PathLike] = None, make_dir: bool = True
) -> Path

Dumps the manifest configuration to a YAML file.

Parameters:

Name Type Description Default
path Optional[PathLike]

The file path to save the manifest

None
make_dir bool

Whether to create the directory if it doesn't exist

True

Returns:

Type Description
Path

The path to the saved manifest file

Raises:

Type Description
ValueError

If the manifest or watch configuration is not set

Source code in src/clabe/data_transfer/aind_watchdog.py
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
def dump_manifest_config(self, path: Optional[os.PathLike] = None, make_dir: bool = True) -> Path:
    """
    Dumps the manifest configuration to a YAML file.

    Args:
        path: The file path to save the manifest
        make_dir: Whether to create the directory if it doesn't exist

    Returns:
        The path to the saved manifest file

    Raises:
        ValueError: If the manifest or watch configuration is not set
    """
    manifest_config = self._manifest_config
    watch_config = self._watch_config

    if manifest_config is None or watch_config is None:
        raise ValueError("ManifestConfig or WatchConfig config is not set.")

    path = (Path(path) if path else Path(watch_config.flag_dir) / f"manifest_{manifest_config.name}.yaml").resolve()

    if path.suffix not in [".yml", ".yaml"]:
        path = path.with_suffix(".yaml")

    if not path.name.startswith("manifest_"):
        logger.debug("Prefix manifest_ not found in file name. Appending it.")
        path = path.with_name(f"manifest_{path.stem}{path.suffix}")

    if make_dir and not path.parent.exists():
        path.parent.mkdir(parents=True, exist_ok=True)

    manifest_config.destination = str(Path(manifest_config.destination))
    manifest_config.schemas = [str(Path(schema)) for schema in manifest_config.schemas]
    for modality in manifest_config.modalities:
        manifest_config.modalities[modality] = [str(Path(_path)) for _path in manifest_config.modalities[modality]]

    self._write_yaml(manifest_config, path)
    return path