Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions changes/368.changed
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Improved EOS remote file copy to validate scheme and query strings before connecting, use `clean_url` to prevent credential leakage, and simplify credential routing.
Changed copy command builders to include the source file path in the URL and use `flash:` as the destination, matching EOS CLI conventions.
Fixed `_uptime_to_string` to use integer division, preventing `ValueError` on format specifiers.
Fixed `check_file_exists` and `get_remote_checksum` to open the SSH connection before use, preventing `AttributeError` when called standalone.
Fixed password-prompt handling in `remote_file_copy` to wait for the transfer to complete before proceeding to verification.
Simplified checksum parsing in `get_remote_checksum` to use string splitting instead of regex.
Changed `verify_file` to return early when file does not exist and use case-insensitive checksum comparison.
Removed `include_username` parameter from `remote_file_copy` in favor of automatic credential routing based on scheme and username presence.
3 changes: 3 additions & 0 deletions changes/368.housekeeping
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Converted EOS remote file copy tests from hypothesis/pytest standalone functions to unittest TestCase with `self.assertRaises` and `subTest` for consistency with the rest of the codebase.
Removed duplicate test class `TestRemoteFileCopyCommandExecution` and consolidated into `TestRemoteFileCopy`.
Added integration tests for EOS device connectivity and remote file copy across FTP, TFTP, SCP, HTTP, HTTPS, and SFTP protocols.
147 changes: 66 additions & 81 deletions pyntc/devices/eos_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import os
import re
import time
from urllib.parse import urlparse

from netmiko import ConnectHandler, FileTransfer
from pyeapi import connect as eos_connect
Expand All @@ -26,6 +25,8 @@
from pyntc.utils import convert_list_by_key
from pyntc.utils.models import FileCopyModel

EOS_SUPPORTED_HASHING_ALGORITHMS = {"md5", "sha1", "sha256", "sha512"} # Subset of HASHING_ALGORITHMS for EOS verify
EOS_SUPPORTED_SCHEMES = {"http", "https", "scp", "ftp", "sftp", "tftp"}
BASIC_FACTS_KM = {"model": "modelName", "os_version": "internalVersion", "serial_number": "serialNumber"}
INTERFACES_KM = {
"speed": "bandwidth",
Expand Down Expand Up @@ -135,13 +136,13 @@ def _parse_response(self, response, raw_text):
return list(x["result"] for x in response)

def _uptime_to_string(self, uptime):
days = uptime / (24 * 60 * 60)
days = uptime // (24 * 60 * 60)
uptime = uptime % (24 * 60 * 60)

hours = uptime / (60 * 60)
hours = uptime // (60 * 60)
uptime = uptime % (60 * 60)

mins = uptime / 60
mins = uptime // 60
uptime = uptime % 60

seconds = uptime
Expand Down Expand Up @@ -440,6 +441,7 @@ def check_file_exists(self, filename, file_system=None):
"""
exists = False

self.open()
file_system = file_system or self._get_file_system()
command = f"dir {file_system}/{filename}"
result = self.native_ssh.send_command(command, read_timeout=30)
Expand Down Expand Up @@ -481,6 +483,13 @@ def get_remote_checksum(self, filename, hashing_algorithm="md5", **kwargs):
Raises:
CommandError: If the verify command fails (but not if file doesn't exist).
"""
if hashing_algorithm.lower() not in EOS_SUPPORTED_HASHING_ALGORITHMS:
raise ValueError(
f"Unsupported hashing algorithm '{hashing_algorithm}' for EOS. "
f"Supported algorithms: {sorted(EOS_SUPPORTED_HASHING_ALGORITHMS)}"
)

self.open()
file_system = kwargs.get("file_system")
if file_system is None:
file_system = self._get_file_system()
Expand Down Expand Up @@ -511,11 +520,11 @@ def get_remote_checksum(self, filename, hashing_algorithm="md5", **kwargs):

# Parse the checksum from the output
# Expected format: verify /sha512 (flash:nautobot.png) = <checksum>
match = re.search(r"=\s*([a-fA-F0-9]+)", result)
if match:
remote_checksum = match.group(1).lower()
log.debug("Host %s: Remote checksum for %s: %s", self.host, filename, remote_checksum)
return remote_checksum
if "=" in result:
remote_checksum = result.split("=")[-1].strip().lower()
if remote_checksum:
log.debug("Host %s: Remote checksum for %s: %s", self.host, filename, remote_checksum)
return remote_checksum

log.error("Host %s: Could not parse checksum from verify output: %s", self.host, result)
raise CommandError(command, f"Could not parse checksum from verify output: {result}")
Expand All @@ -524,123 +533,98 @@ def get_remote_checksum(self, filename, hashing_algorithm="md5", **kwargs):
log.error("Host %s: Error getting remote checksum: %s", self.host, str(e))
raise CommandError(command, f"Error getting remote checksum: {str(e)}")

def _build_url_copy_command_simple(self, src, file_system):
@staticmethod
def _netloc(src: FileCopyModel) -> str:
"""Return host:port or just host from a FileCopyModel."""
return f"{src.hostname}:{src.port}" if src.port else src.hostname

@staticmethod
def _source_path(src: FileCopyModel, dest: str) -> str:
"""Return the file path from the URL, falling back to dest if empty."""
return src.path if src.path and src.path != "/" else f"/{dest}"

def _build_url_copy_command_simple(self, src, file_system, dest):
"""Build copy command for simple URL-based transfers (TFTP, HTTP, HTTPS without credentials)."""
return f"copy {src.download_url} {file_system}", False
netloc = self._netloc(src)
path = self._source_path(src, dest)
return f"copy {src.scheme}://{netloc}{path} {file_system}", False

def _build_url_copy_command_with_creds(self, src, file_system):
def _build_url_copy_command_with_creds(self, src, file_system, dest):
"""Build copy command for URL-based transfers with credentials (HTTP/HTTPS/SCP/FTP/SFTP)."""
parsed = urlparse(src.download_url)
hostname = parsed.hostname
path = parsed.path

# Determine port based on scheme
if parsed.port:
port = parsed.port
elif src.scheme == "https":
port = "443"
elif src.scheme in ["http"]:
port = "80"
else:
port = ""

port_str = f":{port}" if port else ""
netloc = self._netloc(src)
path = self._source_path(src, dest)

# For HTTP/HTTPS, include both username and token
if src.scheme in ["http", "https"]:
command = f"copy {src.scheme}://{src.username}:{src.token}@{hostname}{port_str}{path} {file_system}"
if src.scheme in ("http", "https"):
command = f"copy {src.scheme}://{src.username}:{src.token}@{netloc}{path} {file_system}"
detect_prompt = False
# For SCP/FTP/SFTP, include only username (password via prompt)
else:
command = f"copy {src.scheme}://{src.username}@{hostname}{port_str}{path} {file_system}"
# SCP/FTP/SFTP — password provided at the interactive prompt
command = f"copy {src.scheme}://{src.username}@{netloc}{path} {file_system}"
detect_prompt = True

return command, detect_prompt

def remote_file_copy(self, src: FileCopyModel, dest=None, file_system=None, include_username=False, **kwargs):
def _check_copy_output_for_errors(self, output):
"""Raise FileTransferError if copy command output contains error indicators."""
if any(error in output.lower() for error in ["error", "invalid", "failed"]):
Comment thread
jtdub marked this conversation as resolved.
log.error("Host %s: Error detected in copy command output: %s", self.host, output)
raise FileTransferError(f"Error detected in copy command output: {output}")

def remote_file_copy(self, src: FileCopyModel, dest: str | None = None, file_system: str | None = None, **kwargs):
"""Copy a file from remote source to device.

Args:
src (FileCopyModel): The source file model with transfer parameters.
dest (str): Destination filename (defaults to src.file_name).
file_system (str): Device filesystem (auto-detected if not provided).
include_username (bool): Whether to include username in the copy command. Defaults to False.
**kwargs (Any): Passible parameters such as file_system.

Raises:
TypeError: If src is not a FileCopyModel.
ValueError: If the URL scheme is unsupported or URL contains query strings.
FileTransferError: If transfer or verification fails.
FileSystemNotFoundError: If filesystem cannot be determined.
"""
# Validate input
if not isinstance(src, FileCopyModel):
raise TypeError("src must be an instance of FileCopyModel")

# Determine file system
if src.scheme not in EOS_SUPPORTED_SCHEMES:
raise ValueError(f"Unsupported scheme: {src.scheme}")

# EOS CLI cannot handle '?' in URLs
if "?" in src.clean_url:
raise ValueError(f"URLs with query strings are not supported on EOS: {src.download_url}")

if file_system is None:
file_system = self._get_file_system()

# Determine destination
if dest is None:
dest = src.file_name

log.debug("Host %s: Starting remote file copy for %s to %s/%s", self.host, src.file_name, file_system, dest)

# Open SSH connection and enable
self.open()
self.enable()

# Validate scheme
supported_schemes = ["http", "https", "scp", "ftp", "sftp", "tftp"]
if src.scheme not in supported_schemes:
raise ValueError(f"Unsupported scheme: {src.scheme}")

# Build command based on scheme and credentials
command_builders = {
("tftp", False): lambda: self._build_url_copy_command_simple(src, file_system),
("http", False): lambda: self._build_url_copy_command_simple(src, file_system),
("https", False): lambda: self._build_url_copy_command_simple(src, file_system),
("http", True): lambda: self._build_url_copy_command_with_creds(src, file_system),
("https", True): lambda: self._build_url_copy_command_with_creds(src, file_system),
("scp", False): lambda: self._build_url_copy_command_with_creds(src, file_system),
("scp", True): lambda: self._build_url_copy_command_with_creds(src, file_system),
("ftp", False): lambda: self._build_url_copy_command_with_creds(src, file_system),
("ftp", True): lambda: self._build_url_copy_command_with_creds(src, file_system),
("sftp", False): lambda: self._build_url_copy_command_with_creds(src, file_system),
("sftp", True): lambda: self._build_url_copy_command_with_creds(src, file_system),
}

builder_key = (src.scheme, include_username and src.username is not None)
if builder_key not in command_builders:
raise ValueError(f"Unable to construct copy command for scheme {src.scheme} with provided credentials")

command, detect_prompt = command_builders[builder_key]()
if src.scheme == "tftp" or src.username is None:
command, detect_prompt = self._build_url_copy_command_simple(src, file_system, dest)
else:
command, detect_prompt = self._build_url_copy_command_with_creds(src, file_system, dest)
log.debug("Host %s: Preparing copy command for %s", self.host, src.scheme)

# Execute copy command
if detect_prompt and src.token:
# Use send_command_timing for interactive password prompt
output = self.native_ssh.send_command_timing(command, read_timeout=src.timeout, cmd_verify=False)
log.debug("Host %s: Copy command (with timing) output: %s", self.host, output)

if "password:" in output.lower():
self.native_ssh.write_channel(src.token + "\n")
# Read the response after sending password
output += self.native_ssh.read_channel()
output = self.native_ssh.send_command_timing(src.token, read_timeout=src.timeout, cmd_verify=False)
log.debug("Host %s: Output after password entry: %s", self.host, output)
elif any(error in output.lower() for error in ["error", "invalid", "failed"]):
log.error("Host %s: Error detected in copy command output: %s", self.host, output)
raise FileTransferError(f"Error detected in copy command output: {output}")
else:
# Use regular send_command for non-interactive transfers
output = self.native_ssh.send_command(command, read_timeout=src.timeout)
log.debug("Host %s: Copy command output: %s", self.host, output)

if any(error in output.lower() for error in ["error", "invalid", "failed"]):
log.error("Host %s: Error detected in copy command output: %s", self.host, output)
raise FileTransferError(f"Error detected in copy command output: {output}")
self._check_copy_output_for_errors(output)

# Verify transfer success
verification_result = self.verify_file(
src.checksum, dest, hashing_algorithm=src.hashing_algorithm, file_system=file_system
)
Expand Down Expand Up @@ -676,11 +660,12 @@ def verify_file(self, checksum, filename, hashing_algorithm="md5", **kwargs):
Returns:
(bool): True if the file is verified successfully, False otherwise.
"""
exists = self.check_file_exists(filename, **kwargs)
device_checksum = (
self.get_remote_checksum(filename, hashing_algorithm=hashing_algorithm, **kwargs) if exists else None
)
if checksum == device_checksum:
if not self.check_file_exists(filename, **kwargs):
log.debug("Host %s: File %s not found on device", self.host, filename)
return False

device_checksum = self.get_remote_checksum(filename, hashing_algorithm=hashing_algorithm, **kwargs)
if checksum.lower() == device_checksum.lower():
log.debug("Host %s: Checksum verification successful for file %s", self.host, filename)
return True

Expand Down
20 changes: 12 additions & 8 deletions pyntc/utils/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,18 @@ class FileCopyModel:
vrf: Optional[str] = None
ftp_passive: bool = True

# This field is calculated, so we don't pass it in the constructor
# Computed fields derived from download_url — not passed to the constructor
clean_url: str = field(init=False)
scheme: str = field(init=False)
hostname: str = field(init=False)
port: Optional[int] = field(init=False)
path: str = field(init=False)

def __post_init__(self):
"""Validate the input and prepare the clean URL after initialization."""
# 1. Validate the hashing algorithm choice
if self.hashing_algorithm.lower() not in HASHING_ALGORITHMS:
raise ValueError(f"Unsupported algorithm. Choose from: {HASHING_ALGORITHMS}")

# Parse the url to extract components
parsed = urlparse(self.download_url)

# Extract username/password from URL if not already provided as arguments
Expand All @@ -55,13 +56,16 @@ def __post_init__(self):
if parsed.password and not self.token:
self.token = parsed.password

# 3. Create the 'clean_url' (URL without the credentials)
# This is what you actually send to the device if using ip http client
port = f":{parsed.port}" if parsed.port else ""
self.clean_url = f"{parsed.scheme}://{parsed.hostname}{port}{parsed.path}"
# Store parsed URL components
self.scheme = parsed.scheme
self.hostname = parsed.hostname
self.port = parsed.port
self.path = parsed.path

# Create the 'clean_url' (URL without credentials)
port_str = f":{parsed.port}" if parsed.port else ""
self.clean_url = f"{parsed.scheme}://{parsed.hostname}{port_str}{parsed.path}"

# Handle query params if they exist (though we're avoiding '?' for Cisco)
if parsed.query:
self.clean_url += f"?{parsed.query}"

Expand Down
Loading
Loading