-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathshell.py
More file actions
194 lines (164 loc) · 7.48 KB
/
shell.py
File metadata and controls
194 lines (164 loc) · 7.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
#! /usr/bin/env python3
# -*- coding: utf-8; py-indent-offset: 4 -*-
#
# Author: Linuxfabrik GmbH, Zurich, Switzerland
# Contact: info (at) linuxfabrik (dot) ch
# https://www.linuxfabrik.ch/
# License: The Unlicense, see LICENSE file.
# https://github.com/Linuxfabrik/monitoring-plugins/blob/main/CONTRIBUTING.rst
"""Communicates with the Shell on Linux and Windows.
"""
__author__ = 'Linuxfabrik GmbH, Zurich/Switzerland'
__version__ = '2025060401'
import os
import re
import shlex
import subprocess
from . import txt
RETC_SSHPASS = {
1: 'Invalid command line argument',
2: 'Conflicting arguments given',
3: 'General runtime error',
4: 'Unrecognized response from ssh (parse error)',
5: 'Invalid/incorrect password',
6: 'Host public key is unknown. sshpass exits without confirming the new key.',
7: 'IP public key changed. sshpass exits without confirming the new key.',
}
def get_command_output(cmd, regex=None):
"""
Execute a shell command and return its output, optionally filtered by a regular expression.
This function runs the given command using the `shell_exec()` function and processes the output
as follows:
- If `shell_exec()` indicates an execution failure, an empty string is returned.
- Retrieves standard output (stdout), standard error (stderr), and exit code.
- If stdout is empty but stderr contains output, stderr is used instead.
- The output is stripped of any leading or trailing whitespace.
- If a regex is provided, attempts to extract and return the text captured by
the first capturing group. If no match is found or an error occurs, returns an empty string.
- If no regex is provided, the complete stripped output is returned.
### Parameters
- **cmd** (`str`): The command to execute.
- **regex** (`str`, optional): A regular expression pattern with at least one capturing group to
extract specific output. Defaults to None.
### Returns
- **str**: The processed command output, or the extracted substring if regex is provided;
returns an empty string if execution fails or no match is found.
### Example
>>> get_command_output('nano --version')
GNU nano, version 5.3
(C) 1999-2011, 2013-2020 Free Software Foundation, Inc.
(C) 2014-2020 the contributors to nano
Compiled options: --enable-utf8
>>> get_command_output('nano --version', regex=r'version (.*)\\n')
5.3
"""
success, result = shell_exec(cmd)
if not success:
return ''
stdout, stderr, _ = result
output = stdout.strip() or stderr.strip()
if regex:
try:
match = re.search(regex, output)
return match.group(1).strip() if match else ''
except Exception:
return ''
return output
def shell_exec(cmd, env=None, shell=False, stdin='', cwd=None, timeout=None, lc_all='C'):
"""
Execute a command in a subprocess with flexible options for shell execution, environment
variables, piping, standard input, working directory, and a timeout.
On Windows, the function changes the code page to 65001 (UTF-8) so that command output is
handled in UTF-8.
### Parameters
- **cmd** (`str`):
The command string to execute. If using pipes (`|`), individual commands will be run
in a pipeline when `shell=False`. If `shell=True`, the entire string is passed to the shell.
- **env** (`dict`, optional):
A dictionary of environment variables to merge with the current OS environment.
Defaults to the current environment.
- **shell** (`bool`, optional):
If True, execute the command through the shell. Required when using shell features
(e.g., redirection, globbing) or when providing `stdin`. Defaults to False.
- **stdin** (`str`, optional):
A string to pass as standard input to the command. If non‐empty, `shell` will be set
to True on Windows. Defaults to an empty string.
- **cwd** (`str`, optional):
Working directory in which to execute the command. Defaults to None (current directory).
- **timeout** (`int` or `float`, optional):
Maximum time (in seconds) to allow the command to run. If exceeded, the process is
terminated. Defaults to None (no timeout).
- **lc_all** (`str`, optional):
Value to set for the `LC_ALL` environment variable, forcing command output locale.
Defaults to `'C'` (POSIX "C" locale, i.e., English).
### Returns
- **tuple**:
- On success:
`(True, (stdout, stderr, return_code))`
- **stdout** (`str`): Standard output of the command (decoded to text).
- **stderr** (`str`): Standard error of the command (decoded to text).
- **return_code** (`int`): Exit status of the command.
- On failure:
`(False, error_message)` — a string describing the error.
### Notes
- The environment is merged with `env` and always includes `LC_ALL=<lc_all>`, forcing output
to the specified locale.
- On Windows (`os.name == 'nt'`), `cmd` is automatically prefixed with `chcp 65001 &&` to
switch to UTF-8 code page, and `shell` is set to True.
- If `shell=False` and `cmd` contains pipes (`|`), the function splits `cmd` on `|` and
creates a pipeline of subprocesses. Each segment is run without a shell, with stdout of
one feeding stdin of the next.
- If `shell=True` or `stdin` is provided, the command is executed in a single shell
invocation (`subprocess.Popen(..., shell=True)`). The provided `stdin` string is passed
to `communicate()`.
- Exceptions such as `OSError`, `ValueError`, or other execution errors during process
creation are caught and reported as `(False, <error message>)`.
- If the process exceeds the specified `timeout`, it is killed, and the function returns
`(False, "Timeout after <timeout> seconds.")`.
"""
env = {**os.environ.copy(), **(env or {})}
env['LC_ALL'] = lc_all
if os.name == 'nt':
cmd = f'chcp 65001 && {cmd}'
shell = True
if shell or stdin:
try:
p = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
shell=True,
cwd=cwd,
)
except (OSError, ValueError, Exception) as e:
return False, f'Error "{e}" while calling command "{cmd}"'
stdout, stderr = p.communicate(input=txt.to_bytes(stdin) if stdin else None)
retc = p.returncode
stdout = txt.to_text(stdout).replace('Active code page: 65001\r\n', '')
stderr = txt.to_text(stderr)
return True, (stdout, stderr, retc)
cmds = cmd.split('|')
p = None
for part in cmds:
try:
args = shlex.split(part.strip())
p = subprocess.Popen(
args,
stdin=p.stdout if p else subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
shell=False,
cwd=cwd,
)
except (OSError, ValueError, Exception) as e:
return False, f'Error "{e}" while calling command "{part}"'
try:
stdout, stderr = p.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
p.kill()
p.communicate()
return False, f'Timeout after {timeout} seconds.'
return True, (txt.to_text(stdout), txt.to_text(stderr), p.returncode)