Contract Examples¶
This page demonstrates how to use the Contract module.
Example Code¶
The following example shows how to work with data streams and contracts:
from pathlib import Path
from aind_behavior_services.rig import AindBehaviorRigModel
from aind_behavior_services.session import AindBehaviorSessionModel
from aind_behavior_services.task_logic import AindBehaviorTaskLogicModel
from contraqctor.contract import Dataset, DataStreamCollection
from contraqctor.contract.camera import Camera
from contraqctor.contract.csv import Csv
from contraqctor.contract.harp import (
DeviceYmlByFile,
HarpDevice,
)
from contraqctor.contract.json import PydanticModel, SoftwareEvents
from contraqctor.contract.mux import MapFromPaths
from contraqctor.contract.text import Text
from contraqctor.contract.utils import print_data_stream_tree
dataset_root = Path(r"path_to_data")
my_dataset = Dataset(
name="my_dataset",
version="1.0.0",
description="My dataset",
data_streams=[
MapFromPaths(
name="BehaviorVideos",
description="Data from BehaviorVideos modality",
reader_params=MapFromPaths.make_params(
paths=dataset_root / "behavior-videos",
include_glob_pattern=["*"],
inner_data_stream=Camera,
inner_param_factory=lambda x: Camera.make_params(path=dataset_root / "behavior-videos" / x),
),
),
DataStreamCollection(
name="Behavior",
description="Data from the Behavior modality",
data_streams=[
HarpDevice(
name="HarpBehavior",
reader_params=HarpDevice.make_params(
path=dataset_root / "behavior/Behavior.harp",
device_yml_hint=DeviceYmlByFile(),
),
),
HarpDevice(
name="HarpManipulator",
reader_params=HarpDevice.make_params(
path=dataset_root / "behavior/StepperDriver.harp",
device_yml_hint=DeviceYmlByFile(),
),
),
HarpDevice(
name="HarpTreadmill",
reader_params=HarpDevice.make_params(
path=dataset_root / "behavior/Treadmill.harp",
device_yml_hint=DeviceYmlByFile(),
),
),
HarpDevice(
name="HarpOlfactometer",
reader_params=HarpDevice.make_params(
path=dataset_root / "behavior/Olfactometer.harp",
device_yml_hint=DeviceYmlByFile(),
),
),
HarpDevice(
name="HarpSniffDetector",
reader_params=HarpDevice.make_params(
path=dataset_root / "behavior/SniffDetector.harp",
device_yml_hint=DeviceYmlByFile(),
),
),
HarpDevice(
name="HarpLickometer",
reader_params=HarpDevice.make_params(
path=dataset_root / "behavior/Lickometer.harp",
device_yml_hint=DeviceYmlByFile(),
),
),
HarpDevice(
name="HarpClockGenerator",
reader_params=HarpDevice.make_params(
path=dataset_root / "behavior/ClockGenerator.harp",
device_yml_hint=DeviceYmlByFile(),
),
),
HarpDevice(
name="HarpEnvironmentSensor",
reader_params=HarpDevice.make_params(
path=dataset_root / "behavior/EnvironmentSensor.harp",
device_yml_hint=DeviceYmlByFile(),
),
),
DataStreamCollection(
name="HarpCommands",
description="Commands sent to Harp devices",
data_streams=[
HarpDevice(
name="HarpBehavior",
reader_params=HarpDevice.make_params(
path=dataset_root / "behavior/HarpCommands/Behavior.harp",
device_yml_hint=DeviceYmlByFile(),
),
),
HarpDevice(
name="HarpManipulator",
reader_params=HarpDevice.make_params(
path=dataset_root / "behavior/HarpCommands/StepperDriver.harp",
device_yml_hint=DeviceYmlByFile(),
),
),
HarpDevice(
name="HarpTreadmill",
reader_params=HarpDevice.make_params(
path=dataset_root / "behavior/HarpCommands/Treadmill.harp",
device_yml_hint=DeviceYmlByFile(),
),
),
HarpDevice(
name="HarpOlfactometer",
reader_params=HarpDevice.make_params(
path=dataset_root / "behavior/HarpCommands/Olfactometer.harp",
device_yml_hint=DeviceYmlByFile(),
),
),
HarpDevice(
name="HarpSniffDetector",
reader_params=HarpDevice.make_params(
path=dataset_root / "behavior/HarpCommands/SniffDetector.harp",
device_yml_hint=DeviceYmlByFile(),
),
),
HarpDevice(
name="HarpLickometer",
reader_params=HarpDevice.make_params(
path=dataset_root / "behavior/HarpCommands/Lickometer.harp",
device_yml_hint=DeviceYmlByFile(),
),
),
HarpDevice(
name="HarpClockGenerator",
reader_params=HarpDevice.make_params(
path=dataset_root / "behavior/HarpCommands/ClockGenerator.harp",
device_yml_hint=DeviceYmlByFile(),
),
),
HarpDevice(
name="HarpEnvironmentSensor",
reader_params=HarpDevice.make_params(
path=dataset_root / "behavior/HarpCommands/EnvironmentSensor.harp",
device_yml_hint=DeviceYmlByFile(),
),
),
],
),
DataStreamCollection(
name="SoftwareEvents",
description="Software events generated by the workflow. The timestamps of these events are low precision and should not be used to align to physiology data.",
data_streams=[
SoftwareEvents(
name="ActivePatch",
description="An event emitted when a patch threshold is crossed.",
reader_params=SoftwareEvents.make_params(
dataset_root / "behavior/SoftwareEvents/ActivePatch.json"
),
),
SoftwareEvents(
name="ActiveSite",
description="An event emitted when a site becomes active.",
reader_params=SoftwareEvents.make_params(
dataset_root / "behavior/SoftwareEvents/ActiveSite.json"
),
),
SoftwareEvents(
name="ArmOdor",
description="An event sent each time an Odor mixture messaged is sent to arm at the olfactometer.",
reader_params=SoftwareEvents.make_params(
dataset_root / "behavior/SoftwareEvents/ArmOdor.json"
),
),
SoftwareEvents(
name="Block",
description="An event signaling block transitions.",
reader_params=SoftwareEvents.make_params(
dataset_root / "behavior/SoftwareEvents/Block.json"
),
),
SoftwareEvents(
name="ChoiceFeedback",
description="A unit event that is emitted when the subject receives feedback about their choice.",
reader_params=SoftwareEvents.make_params(
dataset_root / "behavior/SoftwareEvents/ChoiceFeedback.json"
),
),
SoftwareEvents(
name="DepletionVariable",
description="The value of the variable used to determine the depletion state of the current patch.",
reader_params=SoftwareEvents.make_params(
dataset_root / "behavior/SoftwareEvents/DepletionVariable.json"
),
),
SoftwareEvents(
name="GiveReward",
description="The amount of rward given to a subject. The value can be null if no reward was given (P=0) or 0.0 if the reward was delivered but calculated to be 0.",
reader_params=SoftwareEvents.make_params(
dataset_root / "behavior/SoftwareEvents/GiveReward.json"
),
),
SoftwareEvents(
name="PatchRewardAmount",
description="Amount of reward available to be collected in the upcoming site.",
reader_params=SoftwareEvents.make_params(
dataset_root / "behavior/SoftwareEvents/PatchRewardAmount.json"
),
),
SoftwareEvents(
name="PatchRewardAvailable",
description="Amount of reward left in the patch.",
reader_params=SoftwareEvents.make_params(
dataset_root / "behavior/SoftwareEvents/PatchRewardAvailable.json"
),
),
SoftwareEvents(
name="PatchRewardProbability",
description="Probability of reward being available to be collected in the upcoming site.",
reader_params=SoftwareEvents.make_params(
dataset_root / "behavior/SoftwareEvents/PatchRewardProbability.json"
),
),
SoftwareEvents(
name="RngSeed",
description="The value of the random number generator seed.",
reader_params=SoftwareEvents.make_params(
dataset_root / "behavior/SoftwareEvents/RngSeed.json"
),
),
SoftwareEvents(
name="StopVelocityThreshold",
description="The velocity threshold used to determine if the subject is stopped or not. In cm/s.",
reader_params=SoftwareEvents.make_params(
dataset_root / "behavior/SoftwareEvents/StopVelocityTreshold.json"
),
),
SoftwareEvents(
name="VisualCorridorSpecs",
description="Specification of the visual corridor instantiated to be rendered.",
reader_params=SoftwareEvents.make_params(
dataset_root / "behavior/SoftwareEvents/VisualCorridorSpecs.json"
),
),
SoftwareEvents(
name="WaitRewardOutcome",
description="The outcome of the period between choice and reward delivery.",
reader_params=SoftwareEvents.make_params(
dataset_root / "behavior/SoftwareEvents/WaitRewardOutcome.json"
),
),
SoftwareEvents(
name="WaitLickOutcome",
description="The outcome of the period between reward availability and lick detection.",
reader_params=SoftwareEvents.make_params(
dataset_root / "behavior/SoftwareEvents/WaitLickOutcome.json"
),
),
SoftwareEvents(
name="UpdaterStopDurationOffset",
description="Metadata for the updater of the StopDurationOffset parameter.",
reader_params=SoftwareEvents.make_params(
dataset_root / "behavior/SoftwareEvents/UpdaterStopDurationOffset.json"
),
),
SoftwareEvents(
name="UpdaterStopVelocityThreshold",
description="Metadata for the updater of the StopVelocityThreshold parameter.",
reader_params=SoftwareEvents.make_params(
dataset_root / "behavior/SoftwareEvents/UpdaterStopVelocityThreshold.json"
),
),
SoftwareEvents(
name="UpdaterRewardDelayOffset",
description="Metadata for the updater of the RewardDelayOffset parameter.",
reader_params=SoftwareEvents.make_params(
dataset_root / "behavior/SoftwareEvents/UpdaterRewardDelayOffset.json"
),
),
SoftwareEvents(
name="HabituationRewardAvailable",
description="In the habituation task mode, this event will be emitted whenever a reward is available to be collected.",
reader_params=SoftwareEvents.make_params(
dataset_root / "behavior/SoftwareEvents/HabituationRewardAvailable.json"
),
),
],
),
Csv(
"CurrentPosition",
description="The position of the animal in VR coordinates (cm). The timestamp is derived from the encoder reading that gave rise to the position change.",
reader_params=Csv.make_params(
path=dataset_root / "behavior/OperationControl/CurrentPosition.csv",
),
),
Csv(
"IsStopped",
description="The result of the ongoing stop detection algorithm. The timestamp is derived from the encoder reading that gave rise to the position change.",
reader_params=Csv.make_params(
path=dataset_root / "behavior/OperationControl/IsStopped.csv",
),
),
Csv(
"Torque",
description="The torque instructed to be applied to the treadmill. Timestamps are software-derived, use the Harp device events for hardware timestamps.",
reader_params=Csv.make_params(
path=dataset_root / "behavior/OperationControl/CurrentPosition.csv",
),
),
Csv(
name="RendererSynchState",
description="Contains information that allows the post-hoc alignment of visual stimuli to the behavior data.",
reader_params=Csv.make_params(path=dataset_root / "behavior/Renderer/RendererSynchState.csv"),
),
DataStreamCollection(
name="Logs",
data_streams=[
Text(
name="Launcher",
description="Contains the console log of the launcher process.",
reader_params=Text.make_params(
path=dataset_root / "behavior/Logs/launcher.log",
),
),
SoftwareEvents(
name="EndSession",
description="A file that determines the end of the session. If the file is empty, the session is still running or it was not closed properly.",
reader_params=SoftwareEvents.make_params(
path=dataset_root / "behavior/Logs/EndSession.json",
),
),
],
),
DataStreamCollection(
name="InputSchemas",
description="Configuration files for the behavior rig, task_logic and session.",
data_streams=[
PydanticModel(
name="Rig",
reader_params=PydanticModel.make_params(
model=AindBehaviorRigModel,
path=dataset_root / "behavior/Logs/rig_input.json",
),
),
PydanticModel(
name="TaskLogic",
reader_params=PydanticModel.make_params(
model=AindBehaviorTaskLogicModel,
path=dataset_root / "behavior/Logs/tasklogic_input.json",
),
),
PydanticModel(
name="Session",
reader_params=PydanticModel.make_params(
model=AindBehaviorSessionModel,
path=dataset_root / "behavior/Logs/session_input.json",
),
),
],
),
],
),
],
)
if __name__ == "__main__":
print(my_dataset.at("Behavior").at("HarpManipulator").load().at("WhoAmI").load().data)
len([x for x in my_dataset if ((not x.is_collection) and isinstance(x, SoftwareEvents))])
exc = my_dataset.load_all()
for e in exc if exc is not None else []:
print(f"Stream: {e[0]}")
print(f"Exception: {e[1]}")
print()
print(my_dataset.at("Behavior").at("HarpBehavior").at("WhoAmI").read())
print(my_dataset.at("Behavior").at("HarpCommands").at("HarpBehavior").at("OutputSet").read())
print(my_dataset.at("Behavior").at("SoftwareEvents"))
print(my_dataset.at("Behavior").at("SoftwareEvents").at("DepletionVariable").read())
print(my_dataset.at("Behavior").at("SoftwareEvents").at("DepletionVariable"))
print(my_dataset.at("Behavior").at("IsStopped").data)
print(my_dataset.at("Behavior").at("RendererSynchState").data)
print(my_dataset["Behavior"]["InputSchemas"]["Session"].data)
path = ""
child = my_dataset.at("Behavior").at("SoftwareEvents").at("DepletionVariable")
while child.parent is not None:
path = f"{child.name}:{path}"
child = child.parent
print(path)
print(my_dataset.at("Behavior").at("HarpBehavior").device_reader)
with open("my_dataset.md", "w", encoding="UTF-8") as f:
f.write(print_data_stream_tree(my_dataset))
print(my_dataset.at("Behavior").at("HarpBehavior").resolved_name)