使用paramiko自动化打包部署运行项目
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
@File : package_util.py
@Time : 2023/05/30 17:41:58
@Author : longsongpong
@Version : 1.0
@Desc : None
'''
import os
import tarfile
import time
import paramiko
CONFIG = {
"tar_info": {
"local_path": '/Users/david/uk/gb-visa',
"remote_path": '/home/lsp',
"filename": 'uk_spider.tar.gz' # 解压命令 tar -xf uk_spider.tar.gz
},
# 远程服务器信息和文件路径
"remote_info": {
'hostname': '*.*.*.*',
'port': 1019,
'username': 'root',
},
# 私钥文件路径
"key_file_path": '/Users/david/.ssh/id_rsa',
# 远程执行的命令序列
"command_sequence": 'cd /home/lsp && tar -xf uk_spider.tar.gz && cd gb-visa && bash build.sh d\n'
}
class MyPackageUtils(object):
"""打包项目上传到服务器
Args:
object (_type_): _description_
"""
def __init__(self, **kwargs):
self.local_path = None
self.remote_path = None
self.remote_filename = None
self.key_file_path = None
self.command_sequence = None
self.remote_info = None
self._parse_config(**kwargs)
self.ssh_client = self._init_ssh_client()
def _parse_config(self, **kwargs):
"""
解析配置信息
"""
self.local_path = kwargs.get("tar_info").get('local_path')
self.remote_path = kwargs.get("tar_info").get('remote_path')
self.filename = kwargs.get("tar_info").get('filename')
self.remote_info = kwargs.get("remote_info")
self.key_file_path = kwargs.get('key_file_path')
self.command_sequence = kwargs.get('command_sequence')
def _init_ssh_client(self):
# 创建 SSH 客户端并进行连接
private_key = paramiko.RSAKey.from_private_key_file(self.key_file_path)
ssh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh_client.connect(**self.remote_info, pkey=private_key)
return ssh_client
def _execute_tar(self):
# 创建 tar 文件
self.tarfile_path = os.path.join(self.local_path, self.filename)
with tarfile.open(self.tarfile_path, mode='w') as tarf:
tarf.add(self.local_path, arcname=os.path.basename(self.local_path))
def _execute_sftp(self):
# 创建 SFTP 客户端并上传文件
with self.ssh_client.open_sftp() as sftp_client:
try:
sftp_client.chdir(self.remote_path)
except IOError:
# 目录不存在则创建目录
sftp_client.mkdir(self.remote_path)
sftp_client.chdir(self.remote_path)
sftp_client.put(self.tarfile_path, self.filename)
def _execute_cmd(self):
"""执行远程命令
Args:
command (_type_): _description_
"""
stdin, stdout, stderr = self.ssh_client.exec_command(
self.command_sequence)
output = stdout.readlines()
error = stderr.readlines()
print('Command: {} \nOutput: {} \nError: {}'.format(
self.command_sequence, output, error))
time.sleep(2)
self.ssh_client.close()
def start(self):
self._execute_tar()
self._execute_sftp()
self._execute_cmd()
if __name__ == '__main__':
p = MyPackageUtils(**CONFIG)
p.start()