SSH2FA 在保持双因素认证的安全性的同时,提供了类似扫码的便捷体验, 适用于个人/小团体使用

SSH2FA ssh实现自定义API服务器二次登录验证
10 mins

服务器 SSH 登录的安全性,我实现了 SSH2FA,在保持双因素认证的安全性的同时,提供了类似扫码的便捷体验, 适用于个人/小团体使用

核心功能h3

  1. Web 审批:登录时终端输出一个链接,点击后在浏览器确认即可放行。
  2. 断网降级:若无法连接认证服务器,自动回退到传统 TOTP 验证,确保不被锁在门外。

url认证

认证服务器失联TOPT

实现思路h3

  • 使用 sshd_configForceCommand 在登录成功后先运行自定义程序,验证通过后再启动用户的 Shell。
  • 项目分为两部分:
    • Auth Server (Python):提供 /start_login/check_status/approve 接口,管理登录会话。
    • Client (Go):编译为单文件二进制,负责请求审批链接、轮询状态以及离线 TOTP 验证。

代码实现h2

1. Auth Server (Python)h3

/usr/local/bin/auth_server.py
#!/data/miniforge3/bin/python3
from flask import Flask, request, jsonify
import uuid
import time
import threading
app = Flask(__name__)
# 内存存储:
# tokens: {session_token: {"status": "pending/approved/rejected", "username": "root", "display_token": "xxx", "ts": 12345}}
# display_to_session: {display_token: session_token}
# 生产环境建议用 Redis
tokens = {}
display_to_session = {}
def cleanup_tokens():
"""定期清理过期 Token"""
while True:
now = time.time()
expired = [t for t, v in tokens.items() if now - v['ts'] > 300]
for t in expired:
# 清理 display_to_session 映射
display_token = tokens[t].get('display_token')
if display_token and display_token in display_to_session:
del display_to_session[display_token]
del tokens[t]
time.sleep(60)
threading.Thread(target=cleanup_tokens, daemon=True).start()
@app.route('/start_login', methods=['POST'])
def start_login():
data = request.json
username = data.get('username')
# 生成两个独立的 token
session_token = str(uuid.uuid4()) # 客户端用于轮询 (私密)
display_token = str(uuid.uuid4()) # 用户看到的批准链接 (公开)
tokens[session_token] = {
"status": "pending",
"username": username,
"display_token": display_token,
"ts": time.time()
}
# 建立 display_token -> session_token 的映射
display_to_session[display_token] = session_token
# 这里模拟把链接打印到服务器日志,实际场景你可以把这个链接发短信/钉钉给管理员
print(f"\n[SERVER] New Login Request for {username}")
print(f"[SERVER] Approve Link: http://192.168.10.10:5000/approve?token={display_token}")
print(f"[SERVER] Session Token: {session_token} (kept secret)\n")
return jsonify({
"success": True,
"session_token": session_token,
"display_token": display_token
})
@app.route('/check_status', methods=['POST'])
def check_status():
data = request.json
token = data.get('token')
if token not in tokens:
return jsonify({"status": "expired"})
return jsonify({"status": tokens[token]["status"]})
@app.route('/approve', methods=['GET'])
def approve():
display_token = request.args.get('token')
# 通过 display_token 找到对应的 session_token
session_token = display_to_session.get(display_token)
if session_token and session_token in tokens:
tokens[session_token]['status'] = 'approved'
username = tokens[session_token]['username']
print(f"[SERVER] ✅ Login APPROVED for {username}")
return "<h1>Login APPROVED! You can close this window.</h1>"
return "<h1>Token Invalid or Expired</h1>", 404
@app.route('/reject', methods=['GET'])
def reject():
display_token = request.args.get('token')
# 通过 display_token 找到对应的 session_token
session_token = display_to_session.get(display_token)
if session_token and session_token in tokens:
tokens[session_token]['status'] = 'rejected'
username = tokens[session_token]['username']
print(f"[SERVER] ❌ Login REJECTED for {username}")
return "<h1>Login REJECTED.</h1>"
return "<h1>Token Invalid or Expired</h1>", 404
if __name__ == '__main__':
# 监听 0.0.0.0 方便你从手机/浏览器访问
app.run(host='0.0.0.0', port=5000)

2. Client (Go)h3

package main
import (
"bufio"
"bytes"
"encoding/json"
"flag"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"strings"
"syscall"
"time"
"github.com/pquerna/otp/totp"
)
// ================= Configuration =================
// const (
// // Backend API address
// APIHost = "http://127.0.0.1:5000"
// // Enable 2FA for non-interactive connections (SCP, SFTP, Git, rsync, etc.)
// // true = These commands must also perform 2FA authentication
// // false = These commands bypass authentication (recommended for better UX)
// EnableNonInteractive2FA = true
// // Authentication timeout (seconds)
// AuthTimeout = 60
// // Status polling interval (seconds)
// PollInterval = 2
// )
// =================================================
// SessionType represents the type of SSH session
type SessionType string
const (
Interactive SessionType = "interactive"
NonInteractive SessionType = "non-interactive"
)
// StartLoginRequest represents the request to start login
type StartLoginRequest struct {
Username string `json:"username"`
}
// StartLoginResponse represents the response from start_login
type StartLoginResponse struct {
Success bool `json:"success"`
SessionToken string `json:"session_token"` // 私密,用于轮询
DisplayToken string `json:"display_token"` // 公开,显示给用户
}
// CheckStatusRequest represents the request to check status
type CheckStatusRequest struct {
Token string `json:"token"`
}
// CheckStatusResponse represents the response from check_status
type CheckStatusResponse struct {
Status string `json:"status"` // "pending", "approved", "rejected"
}
// getSessionType determines if this is an interactive or non-interactive session
func getSessionType() (SessionType, string) {
cmd := os.Getenv("SSH_ORIGINAL_COMMAND")
if cmd != "" {
return NonInteractive, cmd
}
return Interactive, ""
}
// getUsername retrieves the current username from environment
func getUsername() string {
if user := os.Getenv("PAM_USER"); user != "" {
return user
}
if user := os.Getenv("USER"); user != "" {
return user
}
return "unknown"
}
// httpPost sends a POST request with JSON data
func httpPost(url string, data interface{}) (map[string]interface{}, error) {
jsonData, err := json.Marshal(data)
if err != nil {
return nil, err
}
client := &http.Client{Timeout: 5 * time.Second}
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var result map[string]interface{}
if err := json.Unmarshal(body, &result); err != nil {
return nil, err
}
return result, nil
}
// readTOTPSecret reads the TOTP secret for a given username from the secrets file
func readTOTPSecret(username, secretsFile string) (string, error) {
file, err := os.Open(secretsFile)
if err != nil {
return "", fmt.Errorf("cannot open TOTP secrets file: %w", err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if line == "" || strings.HasPrefix(line, "#") {
continue
}
parts := strings.SplitN(line, ":", 2)
if len(parts) == 2 && parts[0] == username {
return strings.TrimSpace(parts[1]), nil
}
}
if err := scanner.Err(); err != nil {
return "", fmt.Errorf("error reading secrets file: %w", err)
}
return "", fmt.Errorf("no TOTP secret found for user: %s", username)
}
// promptTOTPCode prompts the user to enter their TOTP code
func promptTOTPCode(username string) (string, error) {
fmt.Println()
fmt.Println("---------------------------------------------")
fmt.Printf("SERVER UNREACHABLE - OFFLINE MODE\n")
fmt.Printf("TOTP VERIFICATION FOR: %s\n", username)
fmt.Println("---------------------------------------------")
fmt.Print("Enter TOTP code from your authenticator app: ")
reader := bufio.NewReader(os.Stdin)
code, err := reader.ReadString('\n')
if err != nil {
return "", err
}
return strings.TrimSpace(code), nil
}
// performTOTPAuth handles offline TOTP authentication
func performTOTPAuth(username, secretsFile string, sessionType SessionType) (bool, error) {
// Read TOTP secret for user
secret, err := readTOTPSecret(username, secretsFile)
if err != nil {
return false, err
}
// For non-interactive sessions, we cannot prompt for TOTP
if sessionType == NonInteractive {
return false, fmt.Errorf("TOTP authentication not available for non-interactive sessions")
}
// Prompt user for TOTP code
code, err := promptTOTPCode(username)
if err != nil {
return false, fmt.Errorf("failed to read TOTP code: %w", err)
}
// Validate TOTP code with time skew tolerance
valid := totp.Validate(code, secret)
if !valid {
return false, fmt.Errorf("invalid TOTP code")
}
fmt.Println("Access Granted.")
return true, nil
}
// performAuth handles the complete 2FA authentication flow with TOTP fallback
func performAuth(username string, sessionType SessionType, pollInterval int, timeout int, apiHost string, enableTOTPFallback bool, totpSecretsFile string) (bool, error) {
startURL := fmt.Sprintf("%s/start_login", apiHost)
checkURL := fmt.Sprintf("%s/check_status", apiHost)
// 1. Request login tokens
resp, err := httpPost(startURL, StartLoginRequest{Username: username})
if err != nil {
// Server unreachable - try TOTP fallback if enabled
if enableTOTPFallback {
fmt.Fprintf(os.Stderr, "Warning: 2FA server unreachable, falling back to TOTP authentication\n")
return performTOTPAuth(username, totpSecretsFile, sessionType)
}
return false, fmt.Errorf("2FA Service Unreachable: %w", err)
}
success, ok := resp["success"].(bool)
if !ok || !success {
return false, fmt.Errorf("failed to start login session")
}
// 提取两个 token
sessionToken, ok := resp["session_token"].(string)
if !ok || sessionToken == "" {
return false, fmt.Errorf("invalid session_token received")
}
displayToken, ok := resp["display_token"].(string)
if !ok || displayToken == "" {
return false, fmt.Errorf("invalid display_token received")
}
// 使用 display_token 构建批准链接 (公开给用户)
authLink := fmt.Sprintf("%s/approve?token=%s", apiHost, displayToken)
// 2. Display prompt based on session type
if sessionType == Interactive {
// Interactive SSH: Display formatted prompt
fmt.Println()
fmt.Println("---------------------------------------------")
fmt.Printf("SECURITY CHECK FOR: %s\n", username)
fmt.Println("---------------------------------------------")
fmt.Print("Waiting for approval..\n\n")
} else {
// Non-interactive: Minimal output to stderr
fmt.Fprintf(os.Stderr, "2FA Required. Approve at: %s\n", authLink)
}
// 3. Poll for approval status (使用 session_token,私密)
maxAttempts := timeout / pollInterval
for i := 0; i < maxAttempts; i++ {
statusResp, err := httpPost(checkURL, CheckStatusRequest{Token: sessionToken})
if err == nil {
if status, ok := statusResp["status"].(string); ok {
switch status {
case "approved":
if sessionType == Interactive {
fmt.Println("Access Granted.")
}
return true, nil
case "rejected":
return false, fmt.Errorf("access denied")
}
}
}
time.Sleep(time.Duration(pollInterval) * time.Second)
}
return false, fmt.Errorf("2FA timeout")
}
// executeCommand executes the original SSH command or starts a shell
func executeCommand(originalCmd string) error {
if originalCmd != "" {
// Execute the original command
cmd := exec.Command("sh", "-c", originalCmd)
cmd.Stdin = os.Stdin
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
return cmd.Run()
}
// Start interactive shell
shell := os.Getenv("SHELL")
if shell == "" {
shell = "/bin/bash"
}
// Use syscall.Exec to replace current process with shell
return syscall.Exec(shell, []string{"-l"}, os.Environ())
}
func main() {
apiHost := flag.String("apiHost", "http://127.0.0.1:5000", "API host")
timeout := flag.Int("timeout", 30, "Timeout in seconds")
pollInterval := flag.Int("pollInterval", 1, "Poll interval in seconds")
enableNonInteractive2FA := flag.Bool("enableNonInteractive2FA", false, "Enable 2FA for non-interactive sessions")
totpSecretsFile := flag.String("totpSecretsFile", "/etc/ssh2fa/totp_secrets", "Path to TOTP secrets file")
enableTOTPFallback := flag.Bool("enableTOTPFallback", true, "Enable TOTP fallback when server is unreachable")
flag.Parse()
// Ensure we exit with error code on any panic (security-first)
defer func() {
if r := recover(); r != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", r)
os.Exit(1)
}
}()
username := getUsername()
sessionType, originalCmd := getSessionType()
// === Policy Decision ===
if sessionType == NonInteractive && !*enableNonInteractive2FA {
// Bypass 2FA for non-interactive sessions if disabled
// Uncomment for debugging:
// fmt.Fprintf(os.Stderr, "DEBUG: Skipping 2FA for command: %s\n", originalCmd)
if err := executeCommand(originalCmd); err != nil {
os.Exit(1)
}
os.Exit(0)
}
// === Perform Authentication ===
success, err := performAuth(username, sessionType, *pollInterval, *timeout, *apiHost, *enableTOTPFallback, *totpSecretsFile)
if !success {
if err != nil {
fmt.Fprintf(os.Stderr, "\n❌ %s\n", err.Error())
} else {
fmt.Fprintf(os.Stderr, "\n❌ Authentication failed\n")
}
os.Exit(1)
}
// === Execute Command After Successful Auth ===
if err := executeCommand(originalCmd); err != nil {
fmt.Fprintf(os.Stderr, "Error executing command: %v\n", err)
os.Exit(1)
}
}

使用指南h2

  1. 编译客户端
    Terminal window
    go mod init ssh2fa
    go get github.com/pquerna/otp/totp
    go build -o ssh2fa ssh2fa.go
    sudo mv ssh2fa /usr/local/bin/
  2. 配置离线 TOTP 密钥
    Terminal window
    sudo mkdir -p /etc/ssh2fa
    echo "root:JBSWY3DPEHPK3PXP" | sudo tee /etc/ssh2fa/totp_secrets
    sudo chmod 600 /etc/ssh2fa/totp_secrets
  3. 修改 sshd_config
ForceCommand /root/project/ssh2fa/ssh2fa -apiHost http://127.0.0.1:5000 -enableNonInteractive2FA true -pollInterval 1 -timeout 240

重启 SSH 服务:systemctl restart sshd


实际体验h2

  • 登录时终端输出审批链接,点击后即放行。
  • 若 Auth Server 不可用,系统提示离线模式并要求输入 TOTP,仍可正常登录。

这套方案在实际使用中提升了体验,同时保持了安全性,欢迎自行部署并根据需求改进。 ⚠️ 无法连接认证服务器 - 启用离线模式 这时候输入之前的 TOTP 码,照样能进。