SSH隧道技术文档:本地逻辑与远程智能体计算架构
概述
本文档介绍如何通过SSH隧道(SSH Tunnel)和Paramiko库实现”本地跑逻辑,远程跑智能体判断”的分布式计算架构。该架构允许本地计算机控制业务流程,同时利用远程服务器的计算资源(如GPU)进行AI模型推理。
架构原理
整体架构
本地机器(控制端) 远程服务器(计算端)
┌─────────────────────┐ ┌─────────────────────┐
│ 业务逻辑控制 │ │ AI服务/智能体 │
│ - 数据预处理 │ │ - 模型推理 │
│ - 并发控制 │ SSH隧道 │ - 计算密集型任务 │
│ - 结果后处理 │◄────────────►│ - 端口服务 │
│ - 存储管理 │ 端口转发 │ │
└─────────────────────┘ └─────────────────────┘
工作原理
SSH隧道通过端口转发技术,将本地端口的网络请求安全地转发到远程服务器的指定端口。对于应用程序来说,访问远程服务就像访问本地服务一样透明。
技术栈
- Python: 主要编程语言
- Paramiko: Python的SSHv2协议实现库
- Socket: 网络通信基础
- Threading: 多线程处理并发连接
核心实现
1. 安装依赖
pip install paramiko2. SSH隧道管理器实现
import paramiko
import socket
import select
import threading
import time
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
class SSHTunnel:
"""SSH隧道管理器"""
def __init__(self, remote_host, remote_port, username, password,
local_port=11434, remote_service_port=11434):
self.remote_host = remote_host
self.remote_port = remote_port # SSH端口
self.username = username
self.password = password
self.local_port = local_port # 本地转发端口
self.remote_service_port = remote_service_port # 远程服务端口
self.ssh = None
self.transport = None
self.server = None
self.running = False
self.accept_thread = None
def connect(self):
"""建立SSH连接"""
logger.info(f"[SSH] Connecting to {self.remote_host}:{self.remote_port}...")
self.ssh = paramiko.SSHClient()
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.ssh.connect(
self.remote_host,
port=self.remote_port,
username=self.username,
password=self.password,
timeout=30,
)
self.transport = self.ssh.get_transport()
logger.info("[SSH] Connected!")
# 检查远程服务状态
stdin, stdout, stderr = self.ssh.exec_command(
f"curl -s localhost:{self.remote_service_port}/health 2>/dev/null | head -c 100"
)
output = stdout.read().decode()
if output:
logger.info("[SSH] Remote service is responding")
else:
logger.warning("[SSH] Remote service may not be running")
def start_tunnel(self):
"""启动端口转发"""
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.bind(("127.0.0.1", self.local_port))
self.server.listen(100)
self.running = True
logger.info(f"[TUNNEL] localhost:{self.local_port} -> {self.remote_host}:{self.remote_service_port}")
def handle_client(client, addr):
"""处理客户端连接"""
try:
chan = self.transport.open_channel(
"direct-tcpip",
("localhost", self.remote_service_port),
addr
)
if chan is None:
client.close()
return
while self.running:
r, w, x = select.select([client, chan], [], [], 0.1)
if client in r:
data = client.recv(1024)
if len(data) == 0:
break
chan.send(data)
if chan in r:
data = chan.recv(1024)
if len(data) == 0:
break
client.send(data)
chan.close()
client.close()
except Exception as e:
logger.error(f"[TUNNEL] Error handling client: {e}")
try:
client.close()
except:
pass
def accept_loop():
"""接受连接循环"""
while self.running:
try:
client, addr = self.server.accept()
threading.Thread(target=handle_client, args=(client, addr)).start()
except:
break
self.accept_thread = threading.Thread(target=accept_loop)
self.accept_thread.daemon = True
self.accept_thread.start()
# 等待隧道就绪
time.sleep(2)
# 测试连接
for i in range(5):
try:
import urllib.request
response = urllib.request.urlopen(
f"http://localhost:{self.local_port}/health", timeout=3
)
if response.status == 200:
logger.info("[TUNNEL] Test successful!")
return True
except:
time.sleep(1)
logger.warning("[TUNNEL] Test failed, but tunnel may still work")
return True
def stop(self):
"""停止隧道"""
self.running = False
if self.server:
self.server.close()
if self.ssh:
self.ssh.close()
logger.info("[TUNNEL] Stopped")3. 使用示例
# 配置远程服务器信息
REMOTE_HOST = "your.remote.server.ip"
REMOTE_PORT = 50001 # SSH端口
REMOTE_USER = "your_username"
REMOTE_PASSWORD = "your_password" # 建议使用环境变量或密钥管理
LOCAL_PORT = 11434 # 本地端口
REMOTE_SERVICE_PORT = 11434 # 远程服务端口
def main():
tunnel = None
try:
# 创建并启动SSH隧道
tunnel = SSHTunnel(
remote_host=REMOTE_HOST,
remote_port=REMOTE_PORT,
username=REMOTE_USER,
password=REMOTE_PASSWORD,
local_port=LOCAL_PORT,
remote_service_port=REMOTE_SERVICE_PORT
)
tunnel.connect()
tunnel.start_tunnel()
# 现在可以通过本地端口访问远程服务
# 例如:http://localhost:11434 就等价于远程服务器的11434端口服务
# 执行业务逻辑
your_business_logic()
except KeyboardInterrupt:
logger.info("Interrupted by user")
finally:
if tunnel:
tunnel.stop()
def your_business_logic():
"""示例业务逻辑"""
import httpx
# 通过本地端口访问远程服务
client = httpx.Client(base_url=f"http://localhost:{LOCAL_PORT}")
# 发送请求到远程服务
response = client.post("/api/generate", json={
"model": "your-model-name",
"prompt": "Hello from local machine!"
})
print(f"Response: {response.json()}")
if __name__ == "__main__":
main()命令行方式使用SSH隧道
除了Python实现,也可以使用命令行建立SSH隧道:
# 基本语法
ssh -o StrictHostKeyChecking=no -p <SSH_PORT> -N -L <LOCAL_PORT>:localhost:<REMOTE_SERVICE_PORT> <USER>@<REMOTE_HOST>
# 示例:将本地11434端口转发到远程服务器的11434服务
ssh -o StrictHostKeyChecking=no -p 50001 -N -L 11434:localhost:11434 user@your.remote.server.ip参数说明:
-o StrictHostKeyChecking=no: 跳过主机密钥验证-p 50001: 指定SSH端口-N: 不执行远程命令,仅做端口转发-L: 本地端口转发
安全建议
1. 凭据管理
不要在代码中硬编码密码,推荐使用:
import os
from dotenv import load_dotenv
load_dotenv()
REMOTE_PASSWORD = os.getenv("REMOTE_PASSWORD")或在环境变量中设置:
export REMOTE_PASSWORD="your_secure_password"2. SSH密钥认证(推荐)
使用SSH密钥代替密码:
# 使用私钥文件连接
self.ssh.connect(
self.remote_host,
port=self.remote_port,
username=self.username,
key_filename="/path/to/private/key"
)3. 防火墙配置
确保远程服务器防火墙允许SSH端口访问:
# Ubuntu/Debian
sudo ufw allow 50001/tcp
# CentOS/RHEL
sudo firewall-cmd --permanent --add-port=50001/tcp
sudo firewall-cmd --reload故障排查
1. 连接失败
# 增加详细日志
paramiko.util.log_to_file("paramiko.log")2. 隧道建立但无法访问服务
- 检查远程服务是否在运行
- 确认远程服务绑定到localhost而非仅外部接口
- 验证防火墙规则
3. 性能优化
# 增加并发连接数
self.server.listen(200) # 增加监听队列
# 调整socket缓冲区
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536)
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 65536)应用场景
- AI模型推理: 本地控制逻辑,远程GPU服务器运行模型
- 数据库访问: 安全访问远程数据库
- API服务代理: 访问内网API服务
- 开发调试: 远程开发环境本地调试
总结
SSH隧道技术提供了一种安全、透明的方式来实现分布式计算架构。通过Paramiko库,我们可以在Python程序中自动化建立和管理SSH隧道,实现本地业务逻辑与远程计算资源的无缝集成。
这种架构的优势包括:
- 资源优化: 充分利用远程服务器的计算能力
- 网络透明: 对应用程序透明,无需修改代码
- 安全性: SSH加密传输,保护数据安全
- 灵活性: 支持多种服务类型和应用场景