Skip to main content

Agent SDK

Agent-triggered flows

Objects for triggering a Flow from an Agent can be found in agent_sdk for Agents v5.0+.

note

The agent_sdk is only available for Agents v5.0+. Prior to v5.0, these functions were included in ganymede_sdk.agent.

class FileWatcherResult

FileWatcherResult is a dictionary of FileParam objects indexed by node name.param name.

  • param files: dict[str, fileParam | list[FileParam]] - Dictionary of FileParam objects indexed by node name.param name
  • param tags: list[UnappliedFileTag] | None - List of tags to be applied to all files

class TriggerFlowParams

TriggerFlowParams specifies the inputs for the Flow executed when all files are observed. It includes the following parameters:

  • param single_file_params: dict[str, FileParam] | None - Dict of FileParam objects indexed by node name.param name. These parameters are used for Nodes that accept a single file as input.
  • param multi_file_params: dict[str, MultiFileParam] | None - Dict of MultiFileParam objects indexed by node name.param name. These parameters are used for Nodes that accept multiple files as input.
  • param benchling_tag: Tag | None - Additional parameters to be passed to flow. This parameter is used for inputs to the Input_Benchling node.
  • param additional_params: dict[str, str] | None - Additional parameters to be passed to flow. This parameter is used for inputs to the Input_Param node; the key is the name if the Node name for the input parameter, and the value is the string to pass into the Node.

class FileParam

FileParam specifies files to be uploaded to Ganymede Cloud and their corresponding Flow parameters. These parameters are provided to the execute function once all files are detected.

  • param filename: str - Name of the file, e.g. "my_file.txt"
  • param content_type: str - Content type of the file, e.g. "text/plain". If not specified, the content type of the first file in the files dict will be used.
  • param body: bytes - File contents in bytes
  • param param: str - Name of parameter to be used in Flow, e.g. node_name.parameter_field_name
  • param parent_dir: str - Path within the Agent watch directory containing the file. For example. if C:/Users/username/watch_dir/ is being watched and C:/Users/username/watch_dir/abc/def/my_file.txt is found, then parent_dir would be "abc/def"
  • param upload_ts: str - Timestamp string in ISO format of when file was uploaded to the Agent watch directory, e.g. "2021-01-01T00:00:00Z"
  • param upload_path: str | None - Path in Ganymede storage where file will be uploaded
  • param tags: list[FileTag] | None - List of tags to be applied to the file
  • param bucket_name: str - Bucket associated with file
  • param files: str - Alternative method for specifying file contents, where the key is the filename and the value is the file body.

class MultiFileParam

MultiFileParam is used for submitting multiple files to a single node. It includes the following parameters:

  • param files: str - Alternative method for specifying file contents, where the key is the filename and the value is the file body.
  • param content_type: str - Content type of file, e.g. "text/plain". If not specified, the content type of the first file in the files dict will be used.
  • param param: str - Name of parameter to be used in flow, e.g. node_name.parameter_field_name
  • param parent_dir: str - Path within Agent watch directory that contains file. For example. if C:/Users/username/watch_dir/ is being watched and C:/Users/username/watch_dir/abc/def/my_file.txt is found, then parent_dir would be "abc/def"
  • param upload_ts: str - Timestamp string in ISO format of when file was uploaded to Agent watch directory, e.g. "2021-01-01T00:00:00Z"
  • param upload_paths: list[str] | None - Path in Ganymede storage where file will be uploaded
  • param tags: list[FileTag] | None - List of tags to be applied to file
  • param bucket_name: str - Bucket associated with file

The MultiFileParam object contains a method for initiation from a list of FileParam objects as shown below. The content type of the object is assumed to take on the content type of the first item in the list.

# assume fp1 and fp2 are FileParam objects
m = agent_sdk.file_params_list_to_multi([fp1, fp2])

class NoOpFileTagParams

NoOpFileTagParams is a used to specify that tags should be applied to a file, but that no Flow should be triggered upon file upload to Ganymede.

  • param files: list[FileParam | list[FileParam]] - List of FileParam objects to apply tags to

function fp

fp returns a function that performs pattern matching against a file path. Specifically, the function returns callable[[str], bool] - a function that takes a file path and returns True if the file path matches the pattern, and False otherwise.

This function can be useful as a template for creating your own pattern matching functions.

  • param watch_dir: str - Directory to watch for files
  • param pattern: str - Glob pattern to match against the file path
  • param seconds_since_modification: int | None - if set, filters for files last modified within the number of seconds specified, by default None
  • param seconds_since_access: int | NOne - if set, filters for files last accessed within the number of seconds specified, by default None

function file_params_list_to_multi

file_params_list_to_multi converts a list of FileParam objects to a MultiFileParam object.

  • param file_params: list[FileParam] - List of FileParam objects to convert to MultiFileParam

class FlowTag

The FlowTag class is used to represent a tag that can be applied to a file. This class is not used for applying tags, but rather for interacting with tags already applied to files.

  • param tag_id: str - Name of the tag type applied to a file.
  • param display_tag: str - Value of the tag applied to a file.
  • param upload_ts: datetime - Timestamp of when tag was applied

function add_file_tag_to_fileparam

add_file_tag_to_fileparam adds a Tag to a FileParam object, returning a FileParam | MultiFileParam object with the tag applied.

param file_param: FileParam | MultiFileParam - FileParam object to add Tag to param tag_type_id: str - Tag type of Tag to add param display_value: str - Value of Tag to add param tag_id: str | None - Optional Tag ID which can be used to reference the Tag in code param url: str | None - Optional URL to associate with the Tag

Checksum functions

Agent utility functions are provided in agent_sdk for validating data integrity and interacting with file systems.

Computing file checksums

Ganymede provides functions to validate file integrity; these values can be used to verify the integrity of a file uploaded to cloud storage

function calculate_crc32c

The function returns the CRC32C checksum of a file as a string encoded in utf-8.

  • param file_path: str - Path to file to generate checksum for
  • param blocksize: int | None - Block size to use for the checksum calculation. If not specified, the default block size is 2**20.

function calculate_md5

The function returns the MD5 hash of a file as a string encoded in utf-8.

  • param file_path: str - Path to file to generate MD5 hash for

Examples

# Before Agent v5.0
# from ganymede_sdk.agent.utils import calculate_md5, calculate_crc32c

from agent_sdk import calculate_md5, calculate_crc32c

file_path = "path/to/local/file"

# either md5 or crc32c can be used to validate the integrity of a file
md5 = calculate_md5(file_path)
crc32c = calculate_crc32c(file_path)

You can also calculate the checksum of a file uploaded to Ganymede Cloud by creating a a tempfile.TemporaryFile object, writing the file contents to it, and then calculating the checksum:

from agent_sdk import calculate_md5, calculate_crc32c
import os
import tempfile

data = b"Example data to calculate checksum"

with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
tmp_file.write(data)
tmp_file_name = tmp_file.name

md5 = calculate_md5(tmp_file_name)
crc32c = calculate_crc32c(tmp_file_name)

os.remove(tmp_file_name)

File system utilities

agent_sdk provides a number of convenience functions, which can be helpful to use with cron Agents that involve more complex logic prior to invoking a flow. Some examples of this are when a file is written to multiple times before being processed, or if there is a variable number of files being processed, such that the trigger for invoking a flow requires more than just the presence of a file.

class ScanResult

ScanResult is a frozen dataclass stores file paths for files of interest. Two files are considered to be the same if they have the same relative_path amd modified_time.

  • param relative_path: str - Path to file
  • param modified_time: datetime - Datetime of when file was last modified

function list_files_recursive

list_files_recursive returns a list of all filepaths in a directory and its subdirectories as a list[str].

  • param file_path: str - Path to directory to list files from

function matches_pattern

matches_pattern returns True if a file path matches at least one of the specified regex patterns specified and False otherwise.

  • param filename: str - Name of file
  • param pattern: str | re.Pattern | list[re.Pattern] - Regex pattern or list of regex patterns to match against

function is_file_ready

is_file_ready returns True if a file has the modified time is within the last interval_in_seconds seconds, or if the size of the file has changed in that same timespan; otherwise, it returns False.

  • param file_path: str - Path to file to watch
  • param threshold_seconds: float - Number of seconds to wait between checks, by default 0.1

function get_most_recent_modified_result

get_most_recent_modified_result returns a ScanResult object referencing the most recently modified file in a directory, or None if no files are found.

  • param directory: Path - Path to directory to watch

function filter_by_age

filter_by_age returns a list[str] of file paths that have not been modified within the last age_in_minutes minutes.

  • param scan_results: Iterable[ScanResult] - List of ScanResult objects
  • param age_in_minutes: int - Minimum age in minutes

function zip_directory

zip_directory creates a zip file of a directory and its contents.

  • param directory: str - Path to directory to zip
  • param zip_file: str - Path to zip file to create

function scan_for_finished_files

scan_for_finished_files scans a directory, returning paths to files with a modified date older than the specified number of minutes as a list[str].

  • param directory: str - Path to directory to scan
  • param age_in_minutes: int - Minimum age in minutes for files to be included in the results
  • param pattern: re.Pattern | list[re.Pattern] - Regex pattern to match files against; only files that match against at least one of the specified patterns will be included in results

Example

You can use scan_for_finished_files to continuously scan a directory for files, uploading them to Ganymede Cloud for processing when they are older than a specified number of minutes. The Flow could query previously uploaded files using the list_files_recursive method to avoid uploading the same file multiple times.

Accessing Ganymede Cloud

function read_sql_query

read_sql_query returns a pandas DataFrame object containing the results of a SQL query run against the Ganymede DB.

  • param sql_query: str - SQL query to run

Example

from agent_sdk.query import read_sql_query

df = read_sql_query('SELECT * FROM instrument_methods')

function get_secret

get_secret returns the value of a secret stored in Ganymede.

  • param secret_name: str - Name of the secret to retrieve

Logging Methods

Ganymede Agents (v5.0+) support user-defined logging messages in the agent_sdk, aligning with logging level for Agent messages. Each level corresponds with a separate method in agent_sdk.

from agent_sdk import internal, debug, info, activity, error

# log internal
internal('Display internal message')

# log debug
debug('Display debug message')

# log info
info('Display info message')

# log activity
activity('Display activity message')

# log error
error('Display error message')

In the UI, these log messages are viewable and filterable on the corresponding Connections page.