SFTP Monitor
The SFTP Monitor provides a set of API endpoints to manage and track files on a remote SFTP server. It's designed to monitor a specific directory structure on the server (/new, /processing, /completed, /error), track file states in a database registry, and maintain a local mirror of the files.
Overview
The workflow is as follows:
- Scan for new files: Periodically scan the
/newdirectory on the SFTP server. - Register and Mirror: When new files are found, they are added to a database registry and downloaded to a local mirror.
- Process Files: An external process can fetch the list of new files, and for each one, mark it as 'processing'. This moves the file to the
/processingdirectory on the SFTP server (and locally) and updates its status in the registry. - Mark as Completed or Error: Once processing is finished, the file can be marked as 'completed' or 'error', which moves it to the corresponding directory (
/completedor/error) and updates its state in the registry.
API Endpoints
All endpoints are available under the /sftp/ prefix.
Configuration Model
All endpoints that interact with an SFTP server require an SFTPConfig object in the request body. This config body should be stored in the monarch_config table sftp_monitoring with the source the sftp is pulling data for and the json config. This will get pulled by the SFTP monitoring UI based on what source is selected and display the registry information. When writing a custom dag to interact with the SFTP endpoints you should query the table using the /generic/sftp-config endpoint passing in either a source as the category param or nothing to retrieve all SFTP configs.
{
"host": "sftp.example.com",
"username": "user",
"password": "password",
"key_path": "/path/to/private.key",
"private_key": "-----BEGIN RSA PRIVATE KEY-----\\n...",
"port": 22,
"remote_path": "/remote/base/path"
}
host(string): The SFTP server hostname or IP address.username(string): The username for authentication.password(string, optional): The password for authentication.key_path(string, optional): The local path to the private key file for key-based authentication.private_key(string, optional): The private key content as a string.port(integer, optional): The SFTP server port. Defaults to22.remote_path(string, optional): The base directory on the remote server to monitor. Defaults to/. The monitor will look fornew,processing,completed, anderrorsubdirectories under this path.
Note: You must provide one authentication method: password, key_path, or private_key.
POST /connection-key
Returns a unique connection key for a given SFTP configuration. This key is used internally to manage state and local mirroring paths.
Request Body: SFTPConfig
Response:
{
"connection_key": "sftp_connection_sftp.example.com_/remote/base/path"
}
POST /scan
Scans the /new directory on the SFTP server. It finds new files that are not yet in the registry, downloads them to the local mirror, and adds them to the registry with a "new" status.
Request Body: SFTPConfig
Response: ScanResult
{
"new_files": [
{
"path": "file1.csv",
"size": 1024,
"modified": "2023-10-27T10:00:00Z",
"status": "new"
}
],
"total_files": 1,
"scan_timestamp": "2023-10-27T10:30:00Z",
"scan_duration": 5.2
}
GET /status
Retrieves the current state of the file registry for a given SFTP connection, including the total number of tracked files, the last scan time, and the full registry.
Request Body: SFTPConfig
Response:
{
"total_tracked_files": 3,
"last_scan_time": "2023-10-27T10:30:00Z",
"file_registry": {
"new/file1.csv": {
"path": "file1.csv",
"size": 1024,
"modified": "2023-10-27T10:00:00Z",
"status": "new"
},
"processing/file2_20231027100500.csv": {
"path": "file2_20231027100500.csv",
"size": 2048,
"modified": "2023-10-27T10:05:00Z",
"status": "processing"
}
}
}
GET /new-files
Returns a list of all files currently marked with the "new" status in the registry.
Request Body: SFTPConfig
Response:
{
"new_files": [
{
"path": "file1.csv",
"size": 1024,
"modified": "2023-10-27T10:00:00Z",
"status": "new"
}
]
}
POST /mark-processing/{filename}
Moves a file from the "new" state to "processing". The file is moved from the /new to the /processing directory on the SFTP server and in the local mirror. The filename is appended with a timestamp to avoid name collisions.
URL Parameter:
filename(string): The name of the file to mark as processing.
Request Body: SFTPConfig
Response:
{
"message": "File file1.csv moved to processing.",
"new_filename": "file1_20231027100000.csv"
}
POST /mark-completed/{filename}
Moves a file from the "processing" state to "completed". The file is moved from /processing to /completed on the SFTP server and locally.
URL Parameter:
filename(string): The name of the file in the processing directory (e.g.,file1_20231027100000.csv).
Request Body: SFTPConfig
Response:
{
"message": "File file1_20231027100000.csv moved to completed."
}
POST /mark-error/{filename}
Moves a file from the "processing" state to "error". The file is moved from /processing to /error on the SFTP server and locally.
URL Parameter:
filename(string): The name of the file in the processing directory (e.g.,file1_20231027100000.csv).
Request Body: SFTPConfig
Response:
{
"message": "File file1_20231027100000.csv moved to error."
}
DELETE /clear-registry
Deletes the entire file registry for a given SFTP connection from the database. This is a destructive operation and should be used with caution.
Request Body: SFTPConfig
Response:
{
"message": "Registry cleared"
}