diff --git a/changes/367.added b/changes/367.added new file mode 100644 index 00000000..bbc5ba06 --- /dev/null +++ b/changes/367.added @@ -0,0 +1,2 @@ +Added remote file copy feature to Cisco NXOS devices. +Added unittests for remote file copy for Cisco NXOS devices. \ No newline at end of file diff --git a/pyntc/devices/nxos_device.py b/pyntc/devices/nxos_device.py index 1517c28d..adc8bfc5 100644 --- a/pyntc/devices/nxos_device.py +++ b/pyntc/devices/nxos_device.py @@ -4,6 +4,7 @@ import re import time +from netmiko import ConnectHandler from pynxos.device import Device as NXOSNative from pynxos.errors import CLIError from pynxos.features.file_copy import FileTransferError as NXOSFileTransferError @@ -14,11 +15,16 @@ from pyntc.errors import ( CommandError, CommandListError, + FileSystemNotFoundError, FileTransferError, NTCFileNotFoundError, OSInstallError, RebootTimeoutError, ) +from pyntc.utils.models import FileCopyModel + +NXOS_SUPPORTED_HASHING_ALGORITHMS = {"md5", "sha256", "sha512", "chk"} +NXOS_SUPPORTED_SCHEMES = {"http", "https", "scp", "ftp", "sftp", "tftp"} @fix_docs @@ -45,9 +51,18 @@ def __init__(self, host, username, password, transport="http", timeout=30, port= super().__init__(host, username, password, device_type="cisco_nxos_nxapi") self.transport = transport self.timeout = timeout + self.port = port + self.verify = verify + # Use self.native for NXAPI self.native = NXOSNative( host, username, password, transport=transport, timeout=timeout, port=port, verify=verify ) + # Use self.native_ssh for Netmiko SSH + self.native_ssh = None + self._connected = False + self._redundancy_state = None + self._active_redundancy_states = None + self.open() log.init(host=host) def _image_booted(self, image_name, **vendor_specifics): @@ -104,9 +119,12 @@ def checkpoint(self, filename): log.debug("Host %s: checkpoint is %s.", self.host, filename) return self.native.checkpoint(filename) - def close(self): # noqa: D401 - """Implements ``pass``.""" - pass # pylint: disable=unnecessary-pass + def close(self): + """Disconnect from device.""" + if self.connected: + self.native_ssh.disconnect() + self._connected = False + log.debug("Host %s: Connection closed.", self.host) def config(self, command): """Send configuration command. @@ -300,6 +318,288 @@ def file_copy_remote_exists(self, src, dest=None, file_system="bootflash:"): ) return self.native.file_copy_remote_exists(src, dest, file_system=file_system) + def _get_file_system(self): + """Determine the default file system or directory for device. + + Returns: + (str): The name of the default file system or directory for the device. + + Raises: + FileSystemNotFoundError: When the module is unable to determine the default file system. + """ + raw_data = self.show("dir", raw_text=True) + try: + file_system = re.search(r"bootflash:", raw_data).group(0) + except AttributeError: + log.error("Host %s: File system not found with command 'dir'.", self.host) + raise FileSystemNotFoundError(hostname=self.host, command="dir") + + log.debug("Host %s: File system %s.", self.host, file_system) + return file_system + + @staticmethod + def _netloc(src: FileCopyModel) -> str: + """Return host:port or just host from a FileCopyModel.""" + return f"{src.hostname}:{src.port}" if src.port else src.hostname + + @staticmethod + def _source_path(src: FileCopyModel, dest: str) -> str: + """Return the file path from the URL, falling back to dest if empty.""" + return src.path if src.path and src.path != "/" else f"/{dest}" + + def _build_url_copy_command_simple(self, src, file_system, dest): + """Build copy command for simple URL-based transfers (TFTP, HTTP, HTTPS without credentials).""" + netloc = self._netloc(src) + path = self._source_path(src, dest) + return f"copy {src.scheme}://{netloc}{path} {file_system}", False + + def _build_url_copy_command_with_creds(self, src, file_system, dest): + """Build copy command for URL-based transfers with credentials (HTTP/HTTPS/SCP/FTP/SFTP).""" + netloc = self._netloc(src) + path = self._source_path(src, dest) + + if src.scheme in ("http", "https"): + command = f"copy {src.scheme}://{src.username}:{src.token}@{netloc}{path} {file_system}" + else: + # SCP/FTP/SFTP — password provided at the interactive prompt + command = f"copy {src.scheme}://{src.username}@{netloc}{path} {file_system}" + + return command + + def check_file_exists(self, filename, file_system=None): + """Check if a remote file exists by filename. + + Args: + filename (str): The name of the file to check for on the remote device. + file_system (str): Supported only for Arista. The file system for the + remote file. If no file_system is provided, then the `get_file_system` + method is used to determine the correct file system to use. + + Returns: + (bool): True if the remote file exists, False if it doesn't. + + Raises: + CommandError: If there is an error in executing the command to check if the file exists. + """ + exists = False + + self.open() + file_system = file_system or self._get_file_system() + command = f"dir {file_system}/{filename}" + result = self.native_ssh.send_command(command, read_timeout=30) + + log.debug( + "Host %s: Checking if file %s exists on remote with command '%s' and result: %s", + self.host, + filename, + command, + result, + ) + + # Check for error patterns + if re.search(r"% Error listing directory|No such file|No files found|Path does not exist", result): + log.debug("Host %s: File %s does not exist on remote.", self.host, filename) + exists = False + elif filename in result: + # NXOS shows file details directly, just check if filename appears in output + log.debug("Host %s: File %s exists on remote.", self.host, filename) + exists = True + else: + raise CommandError(command, f"Unable to determine if file {filename} exists on remote: {result}") + + return exists + + def get_remote_checksum(self, filename, hashing_algorithm="md5", **kwargs): + """Get the checksum of a remote file on Cisco NXOS device using netmiko SSH. + + Uses NXOS's 'show file' command via SSH to compute file checksums. + + Args: + filename (str): The name of the file to check for on the remote device. + hashing_algorithm (str): The hashing algorithm to use (default: "md5"). + **kwargs (Any): Passible parameters such as file_system. + + Returns: + (str): The checksum of the remote file. + + Raises: + CommandError: If the verify command fails (but not if file doesn't exist). + """ + if hashing_algorithm not in NXOS_SUPPORTED_HASHING_ALGORITHMS: + raise ValueError( + f"Unsupported hashing algorithm '{hashing_algorithm}' for NXOS. " + f"Supported algorithms: {sorted(NXOS_SUPPORTED_HASHING_ALGORITHMS)}" + ) + + self.open() + file_system = kwargs.get("file_system") + if file_system is None: + file_system = self._get_file_system() + + # Normalize file_system + if not file_system.startswith("/") and not file_system.endswith(":"): + file_system = f"{file_system}:" + + # Use NXOS verify command to get the checksum + # Example: show file bootflash:nautobot.png sha512sum + command = f"show file {file_system}/{filename} {hashing_algorithm}sum" + + try: + result = self.native_ssh.send_command(command, read_timeout=30) + log.debug( + "Host %s: Getting remote checksum for file %s with command '%s' and result: %s", + self.host, + filename, + command, + result, + ) + print(f"result: {result}") + remote_checksum = result + return remote_checksum + + except Exception as e: + log.error("Host %s: Error getting remote checksum: %s", self.host, str(e)) + raise CommandError(command, f"Error getting remote checksum: {str(e)}") + + def remote_file_copy(self, src: FileCopyModel, dest=None, file_system=None, **kwargs): # noqa: R0912 pylint: disable=too-many-branches + """Copy a file from remote source to device. Skips if file already exists and is verified on remote device. + + Args: + src (FileCopyModel): The source file model with transfer parameters. + dest (str): Destination filename (defaults to src.file_name). + file_system (str): Device filesystem (auto-detected if not provided). + **kwargs (Any): Passible parameters such as file_system. + + Raises: + TypeError: If src is not a FileCopyModel. + FileTransferError: If transfer or verification fails. + FileSystemNotFoundError: If filesystem cannot be determined. + """ + timeout = src.timeout or 30 + + if not isinstance(src, FileCopyModel): + raise TypeError("src must be an instance of FileCopyModel") + + if src.scheme not in NXOS_SUPPORTED_SCHEMES: + raise ValueError( + f"Unsupported URL scheme '{src.scheme}' in src. Supported schemes: {sorted(NXOS_SUPPORTED_SCHEMES)}" + ) + + if "?" in src.clean_url: + raise ValueError(f"URLs with query strings are not supported on NXOS: {src.download_url}") + + if file_system is None: + file_system = self._get_file_system() + + if dest is None: + dest = src.file_name + + if src.scheme == "tftp" or src.username is None: + command = self._build_url_copy_command_simple(src, file_system, dest) + else: + command = self._build_url_copy_command_with_creds(src, file_system, dest) + log.debug("Host %s: Preparing copy command for %s", self.host, src.scheme) + + # Add VRF if specified + if src.vrf: + command += f" vrf {src.vrf}" + + log.debug( + "Host %s: Verifying file %s exists on filesystem %s before attempting a copy", + self.host, + dest, + file_system, + ) + if not self.verify_file(src.checksum, dest, hashing_algorithm=src.hashing_algorithm, file_system=file_system): + current_prompt = self.native_ssh.find_prompt() + + # Define prompt mapping for expected prompts during file copy + prompt_answers = { + r"Password": src.token or "", + r"Source username": src.username or "", + r"yes/no|Are you sure you want to continue connecting": "yes", + r"(confirm|Address or name of remote host|Source filename|Destination filename)": "", + } + keys = list(prompt_answers.keys()) + [current_prompt] + expect_regex = f"({'|'.join(keys)})" + + log.debug("Host %s: Starting remote file copy for %s to %s/%s", self.host, src.file_name, file_system, dest) + output = self.native_ssh.send_command(command, expect_string=expect_regex, read_timeout=timeout) + + while current_prompt not in output: + # Check for success message in output to break loop and avoid waiting for next prompt + if re.search(r"Copy complete|bytes copied in|File transfer successful", output, re.IGNORECASE): + log.info( + "Host %s: File %s transferred successfully with output: %s", self.host, src.file_name, output + ) + break + # Check for errors explicitly to avoid infinite loops on failure + if re.search(r"(Error|Invalid|Failed|Aborted|denied)", output, re.IGNORECASE): + log.error("Host %s: File transfer error %s", self.host, FileTransferError.default_message) + raise FileTransferError + for prompt, answer in prompt_answers.items(): + if re.search(prompt, output, re.IGNORECASE): + is_password = "Password" in prompt + output = self.native_ssh.send_command( + answer, expect_string=expect_regex, read_timeout=timeout, cmd_verify=not is_password + ) + break # Exit the for loop and check the new output for the next prompt + + # Verify file after transfer + if not self.verify_file( + src.checksum, dest, hashing_algorithm=src.hashing_algorithm, file_system=file_system + ): + log.error( + "Host %s: File verification failed after transfer for file %s", + self.host, + dest, + ) + raise FileTransferError("File verification failed after transfer") + + log.info( + "Host %s: File %s transferred successfully.", + self.host, + dest, + ) + else: + log.info( + "Host %s: File %s already exists on remote and passed verification. File copy not performed.", + self.host, + dest, + ) + + def verify_file(self, checksum, filename, hashing_algorithm="md5", file_system=None, **kwargs): + """Verify a file on the device by comparing checksums. + + Args: + checksum (str): The expected checksum of the file. + filename (str): The name of the file on the device. + hashing_algorithm (str): The hashing algorithm to use (default: "md5"). + file_system (str): The file system where the file is located. + **kwargs (Any): Passible parameters such as file_system. + + Returns: + (bool): True if the file is verified successfully, False otherwise. + """ + exists = self.check_file_exists(filename, file_system=file_system, **kwargs) + device_checksum = ( + self.get_remote_checksum(filename, hashing_algorithm=hashing_algorithm, file_system=file_system, **kwargs) + if exists + else None + ) + if checksum == device_checksum: + log.debug("Host %s: Checksum verification successful for file %s", self.host, filename) + return True + + log.debug( + "Host %s: Checksum verification failed for file %s - Expected: %s, Actual: %s", + self.host, + filename, + checksum, + device_checksum, + ) + return False + def install_os(self, image_name, **vendor_specifics): """Upgrade device with provided image. @@ -329,9 +629,83 @@ def install_os(self, image_name, **vendor_specifics): log.info("Host %s: Image %s is already running on the device.", self.host, image_name) return False - def open(self): # noqa: D401 - """Implements ``pass``.""" - pass # pylint: disable=unnecessary-pass + @property + def connected(self): # noqa: D401 + """ + Get connection status of the device. + + Returns: + (bool): True if the device is connected, else False. + """ + return self._connected + + @connected.setter + def connected(self, value): + self._connected = value + + @property + def redundancy_state(self): + """Get redundancy state of the device. + + Returns: + (str): Redundancy state of the device (e.g., "active", "standby", "init"). + """ + if self._redundancy_state is None: + try: + output = self.native.show("show redundancy state", raw_text=True) + # Parse the redundancy state from output + # Example output: "Redundancy state = active" + match = re.search(r"Redundancy\s+state\s*=\s*(\w+)", output, re.IGNORECASE) + if match: + self._redundancy_state = match.group(1).lower() + else: + # If no redundancy info, device may not support HA + self._redundancy_state = "active" + except CLIError: + # If command fails, assume active (non-HA or error condition) + self._redundancy_state = "active" + + return self._redundancy_state + + @property + def active_redundancy_states(self): + """Get list of states that indicate the device is active. + + Returns: + (list): List of active redundancy states. + """ + if self._active_redundancy_states is None: + self._active_redundancy_states = ["active", "master"] + return self._active_redundancy_states + + def is_active(self): + """ + Determine if the current processor is the active processor. + + Returns: + (bool): True if the processor is active or does not support HA, else False. + """ + return self.redundancy_state in self.active_redundancy_states + + def open(self): + """Open a connection to the network device.""" + if self.connected: + try: + self.native_ssh.find_prompt() + except: # noqa E722 # pylint: disable=bare-except + self._connected = False + + if not self.connected: + self.native_ssh = ConnectHandler( + device_type="cisco_nxos", + host=self.host, + username=self.username, + password=self.password, + timeout=self.timeout, + ) + self._connected = True + + log.debug("Host %s: SSH connection opened successfully.", self.host) def reboot(self, wait_for_reload=False, **kwargs): """ diff --git a/pyntc/log.py b/pyntc/log.py index 79b2f423..13c57744 100644 --- a/pyntc/log.py +++ b/pyntc/log.py @@ -51,10 +51,11 @@ def init(**kwargs): kwargs.setdefault("format", log_format) kwargs.setdefault("level", log_level) + host = kwargs.pop("host", None) logging.basicConfig(**kwargs) # info is defined at the end of the file - info("Logging initialized for host %s.", kwargs.pop("host", None)) + info("Logging initialized for host %s.", host) def logger(level): diff --git a/pyntc/utils/models.py b/pyntc/utils/models.py index 3d047a81..18c55e31 100644 --- a/pyntc/utils/models.py +++ b/pyntc/utils/models.py @@ -36,17 +36,18 @@ class FileCopyModel: vrf: Optional[str] = None ftp_passive: bool = True - # This field is calculated, so we don't pass it in the constructor + # Computed fields derived from download_url — not passed to the constructor clean_url: str = field(init=False) scheme: str = field(init=False) + hostname: str = field(init=False) + port: Optional[int] = field(init=False) + path: str = field(init=False) def __post_init__(self): """Validate the input and prepare the clean URL after initialization.""" - # 1. Validate the hashing algorithm choice if self.hashing_algorithm.lower() not in HASHING_ALGORITHMS: raise ValueError(f"Unsupported algorithm. Choose from: {HASHING_ALGORITHMS}") - # Parse the url to extract components parsed = urlparse(self.download_url) # Extract username/password from URL if not already provided as arguments @@ -55,13 +56,16 @@ def __post_init__(self): if parsed.password and not self.token: self.token = parsed.password - # 3. Create the 'clean_url' (URL without the credentials) - # This is what you actually send to the device if using ip http client - port = f":{parsed.port}" if parsed.port else "" - self.clean_url = f"{parsed.scheme}://{parsed.hostname}{port}{parsed.path}" + # Store parsed URL components self.scheme = parsed.scheme + self.hostname = parsed.hostname + self.port = parsed.port + self.path = parsed.path + + # Create the 'clean_url' (URL without credentials) + port_str = f":{parsed.port}" if parsed.port else "" + self.clean_url = f"{parsed.scheme}://{parsed.hostname}{port_str}{parsed.path}" - # Handle query params if they exist (though we're avoiding '?' for Cisco) if parsed.query: self.clean_url += f"?{parsed.query}" diff --git a/tests/unit/test_devices/test_nxos_device.py b/tests/unit/test_devices/test_nxos_device.py index cd45730c..8109a195 100644 --- a/tests/unit/test_devices/test_nxos_device.py +++ b/tests/unit/test_devices/test_nxos_device.py @@ -5,7 +5,14 @@ from pyntc.devices.base_device import RollbackError from pyntc.devices.nxos_device import NXOSDevice -from pyntc.errors import CommandError, CommandListError, FileTransferError, NTCFileNotFoundError +from pyntc.errors import ( + CommandError, + CommandListError, + FileSystemNotFoundError, + FileTransferError, + NTCFileNotFoundError, +) +from pyntc.utils.models import FileCopyModel from .device_mocks.nxos import show, show_list @@ -26,15 +33,19 @@ class TestNXOSDevice(unittest.TestCase): + @mock.patch("pyntc.devices.nxos_device.ConnectHandler", create=True) @mock.patch("pyntc.devices.nxos_device.NXOSNative", autospec=True) @mock.patch("pynxos.device.Device.facts", new_callable=mock.PropertyMock) - def setUp(self, mock_device, mock_facts): + def setUp(self, mock_facts, mock_device, mock_connect_handler): + self.mock_native_ssh = mock_connect_handler.return_value self.device = NXOSDevice("host", "user", "pass") mock_device.show.side_effect = show mock_device.show_list.side_effect = show_list mock_facts.return_value = DEVICE_FACTS self.device.native = mock_device + self.device.native_ssh = self.mock_native_ssh + self.device.native._facts = {} type(self.device.native).facts = mock_facts.return_value def test_config(self): @@ -256,6 +267,123 @@ def test_refresh(self): self.assertIsNone(self.device._uptime) self.assertFalse(hasattr(self.device.native, "_facts")) + @mock.patch.object(NXOSDevice, "show", return_value="bootflash:") + def test_get_file_system(self, mock_show): + self.assertEqual(self.device._get_file_system(), "bootflash:") + mock_show.assert_called_with("dir", raw_text=True) + + @mock.patch.object(NXOSDevice, "show", return_value="no filesystems here") + def test_get_file_system_not_found(self, mock_show): + with self.assertRaises(FileSystemNotFoundError): + self.device._get_file_system() + mock_show.assert_called_with("dir", raw_text=True) + + def test_check_file_exists_true(self): + self.device.native_ssh.send_command.return_value = "12345 bootflash:/nxos.bin" + result = self.device.check_file_exists("nxos.bin", file_system="bootflash:") + self.assertTrue(result) + self.device.native_ssh.send_command.assert_called_with("dir bootflash:/nxos.bin", read_timeout=30) + + def test_check_file_exists_false(self): + self.device.native_ssh.send_command.return_value = "No such file or directory" + result = self.device.check_file_exists("nxos.bin", file_system="bootflash:") + self.assertFalse(result) + self.device.native_ssh.send_command.assert_called_with("dir bootflash:/nxos.bin", read_timeout=30) + + def test_check_file_exists_command_error(self): + self.device.native_ssh.send_command.return_value = "some ambiguous output" + with self.assertRaises(CommandError): + self.device.check_file_exists("nxos.bin", file_system="bootflash:") + + def test_get_remote_checksum(self): + self.device.native_ssh.send_command.return_value = "abc123" + result = self.device.get_remote_checksum("nxos.bin", hashing_algorithm="md5", file_system="bootflash:") + self.assertEqual(result, "abc123") + self.device.native_ssh.send_command.assert_called_with("show file bootflash:/nxos.bin md5sum", read_timeout=30) + + def test_get_remote_checksum_invalid_algorithm(self): + with self.assertRaises(ValueError): + self.device.get_remote_checksum("nxos.bin", hashing_algorithm="sha1", file_system="bootflash:") + + def test_verify_file_true(self): + with ( + mock.patch.object(NXOSDevice, "check_file_exists", return_value=True), + mock.patch.object(NXOSDevice, "get_remote_checksum", return_value="abc123"), + ): + result = self.device.verify_file("abc123", "nxos.bin", file_system="bootflash:") + self.assertTrue(result) + + def test_verify_file_false(self): + with ( + mock.patch.object(NXOSDevice, "check_file_exists", return_value=True), + mock.patch.object(NXOSDevice, "get_remote_checksum", return_value="different"), + ): + result = self.device.verify_file("abc123", "nxos.bin", file_system="bootflash:") + self.assertFalse(result) + + def test_remote_file_copy_existing_verified_file(self): + src = FileCopyModel( + download_url="http://example.com/nxos.bin", + checksum="abc123", + file_name="nxos.bin", + hashing_algorithm="md5", + timeout=30, + ) + with mock.patch.object(NXOSDevice, "verify_file", return_value=True) as verify_mock: + self.device.remote_file_copy(src, file_system="bootflash:") + verify_mock.assert_called_once_with("abc123", "nxos.bin", hashing_algorithm="md5", file_system="bootflash:") + self.device.native_ssh.send_command.assert_not_called() + + def test_remote_file_copy_transfer_success(self): + src = FileCopyModel( + download_url="http://example.com/nxos.bin", + checksum="abc123", + file_name="nxos.bin", + hashing_algorithm="md5", + timeout=30, + ) + self.device.native_ssh.find_prompt.return_value = "host#" + self.device.native_ssh.send_command.return_value = "Copy complete" + with mock.patch.object(NXOSDevice, "verify_file", side_effect=[False, True]): + self.device.remote_file_copy(src, file_system="bootflash:") + self.device.native_ssh.send_command.assert_called_once() + + def test_remote_file_copy_transfer_fails_verification(self): + src = FileCopyModel( + download_url="http://example.com/nxos.bin", + checksum="abc123", + file_name="nxos.bin", + hashing_algorithm="md5", + timeout=30, + ) + self.device.native_ssh.find_prompt.return_value = "host#" + self.device.native_ssh.send_command.return_value = "Copy complete" + with mock.patch.object(NXOSDevice, "verify_file", side_effect=[False, False]): + with self.assertRaises(FileTransferError): + self.device.remote_file_copy(src, file_system="bootflash:") + + def test_remote_file_copy_invalid_scheme(self): + src = FileCopyModel( + download_url="smtp://example.com/nxos.bin", + checksum="abc123", + file_name="nxos.bin", + hashing_algorithm="md5", + timeout=30, + ) + with self.assertRaises(ValueError): + self.device.remote_file_copy(src, file_system="bootflash:") + + def test_remote_file_copy_query_string_not_supported(self): + src = FileCopyModel( + download_url="https://example.com/nxos.bin?token=foo", + checksum="abc123", + file_name="nxos.bin", + hashing_algorithm="md5", + timeout=30, + ) + with self.assertRaises(ValueError): + self.device.remote_file_copy(src, file_system="bootflash:") + if __name__ == "__main__": unittest.main() diff --git a/tests/unit/test_infra.py b/tests/unit/test_infra.py index 36568f3f..881bd12a 100644 --- a/tests/unit/test_infra.py +++ b/tests/unit/test_infra.py @@ -15,10 +15,11 @@ @mock.patch("pyntc.devices.f5_device.ManagementRoot") @mock.patch("pyntc.devices.asa_device.ASADevice.open") @mock.patch("pyntc.devices.ios_device.IOSDevice.open") +@mock.patch("pyntc.devices.nxos_device.NXOSDevice.open") @mock.patch("pyntc.devices.jnpr_device.JunosNativeSW") @mock.patch("pyntc.devices.jnpr_device.JunosNativeDevice.open") @mock.patch("pyntc.devices.jnpr_device.JunosNativeDevice.timeout") -def test_device_creation(j_timeout, j_open, j_nsw, i_open, a_open, f_mr, air_open, device_type, expected): +def test_device_creation(j_timeout, j_open, j_nsw, nx_open, i_open, a_open, f_mr, air_open, device_type, expected): device = ntc_device(device_type, "host", "user", "pass") assert isinstance(device, expected) @@ -29,8 +30,9 @@ def test_unsupported_device(): @mock.patch("pyntc.devices.ios_device.IOSDevice.open") +@mock.patch("pyntc.devices.nxos_device.NXOSDevice.open") @mock.patch("pyntc.devices.jnpr_device.JunosDevice.open") -def test_device_by_name(j_open, i_open): +def test_device_by_name(j_open, nx_open, i_open): config_filepath = os.path.join(FIXTURES_DIR, ".ntc.conf.sample") nxos_device = ntc_device_by_name("test_nxos", filename=config_filepath)