话不多说直接上代码
封装连接
@staticmethod
def connect(ip, server_user, server_port, server_path):
“””
连接服务器
:param :
:return:
“””
ssh = paramiko.SSHClient()
private_key = paramiko.RSAKey.from_private_key_file({}.ssh/id_rsa.format(server_path))
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(ip, port=server_port, username=server_user, pkey=private_key, timeout=5, allow_agent=True)
return ssh
except Exception as e:
app.logger.info(connect_error info is {}.format(e.__repr__()))
return {
result: 1,
ip: ip,
msg: e.__repr__(),
}
在服务器执行命令
def fabric_run_cmd(self, ip, server_user, server_port, server_path, cmd):
“””
批量在服务器执行命令
:return:
“””
conn = self.connect(ip, server_user, server_port, server_path)
try:
if type(conn) == dict:
return conn
else:
stdin, stdout, stderr = conn.exec_command(cmd)
app.logger.info(fabric_run_cmd_stdout info is {}.format(stdout.readlines()))
app.logger.info(fabric_run_cmd_stderr info is {}.format(stderr.readlines()))
channel = stdout.channel
status = channel.recv_exit_status()
conn.close()
app.logger.info(fabric_run_cmd_status info is {}.format(status))
if status == 0:
return {
ip: ip,
msg: 执行成功,
result: 0
}
else:
return {
ip: ip,
msg: stderr.readlines(),
result: 1
}
except Exception as e:
app.logger.info(fabric_run_cmd_error info is {}.format(e.__repr__()))
return {
ip: ip,
msg: e.__repr__(),
result: 1
}
将文件发送到服务器
def fabric_put_file(self, ip, src_file, dst_file, server_user, server_port, server_path):
“””
发送文件到服务器
:return:
“””
conn = self.connect(ip, server_user, server_port, server_path)
if type(conn) == dict:
return conn
else:
try:
ftp = conn.open_sftp()
ftp.put(dst_file, src_file) # 发送文件之前先判断有没有目标文件夹 如果没有现在服务器上创建文件夹
ftp.close() # 在此判断文件是否发送成功
return {
ip: ip,
msg: 上传成功,
result: 0
}
except Exception as e:
app.logger.info(fabric_put_file_error info is {}.format(e.__repr__()))
return {
ip: ip,
msg: e.__repr__(),
result: 1
}
能力有限,如有不妥请留言指正
以上就是python如何利用paramiko执行服务器命令的详细内容。
