如何基于Nginx构建微服务网关

描述

基于Nginx的微服务网关实现与最佳实践:从零打造企业级API Gateway

引言:为什么你的微服务架构需要一个强大的网关?

还记得上次生产环境的那个事故吗?某个服务突然涌入大量请求,没有限流保护,直接把下游服务打挂了。或者是那次安全审计,发现有些API接口裸奔在公网上,没有任何认证机制。又或者是运维同学半夜被叫起来,因为某个服务的日志分散在十几台机器上,根本无法快速定位问题...

这些痛点,其实都指向同一个解决方案:你需要一个统一的API网关

今天,我将分享我们团队如何基于Nginx构建了一个日均处理10亿+请求的微服务网关,以及踩过的那些坑。这套方案已经稳定运行2年+,经历过多次大促考验。

一、架构设计:不只是反向代理那么简单

1.1 整体架构设计

我们的网关架构分为四层:

 

├── 接入层(DNS + CDN)
├── 网关层(Nginx + OpenResty)
├── 服务层(微服务集群)
└── 数据层(Redis + MySQL + MongoDB)

 

核心设计理念:

• 高可用:多活部署,自动故障转移

• 高性能:充分利用Nginx的事件驱动模型

• 可扩展:基于OpenResty的Lua脚本扩展

• 可观测:完整的监控和日志体系

1.2 技术选型对比

方案 优势 劣势 适用场景
Nginx + OpenResty 性能极高、稳定性好、运维成熟 功能相对简单、需要二次开发 高并发、低延迟要求
Kong 功能丰富、插件生态好 性能损耗较大、运维复杂 中小规模、快速搭建
Spring Cloud Gateway Java生态友好、功能完善 性能一般、资源占用高 Java技术栈
Envoy 云原生、功能强大 学习曲线陡、配置复杂 K8s环境

二、核心功能实现:从配置到代码

2.1 动态路由配置

传统的Nginx配置需要reload才能生效,这在生产环境是不可接受的。我们的解决方案:

 

# nginx.conf 核心配置
http {
    # 引入Lua模块
    lua_package_path "/usr/local/openresty/lualib/?.lua;;";
    lua_shared_dict routes_cache 100m;
    lua_shared_dict upstream_cache 100m;
    
    # 初始化阶段加载路由
    init_by_lua_block {
        local route_manager = require "gateway.route_manager"
        route_manager.init()
    }
    
    # 定时更新路由配置
    init_worker_by_lua_block {
        local route_manager = require "gateway.route_manager"
        -- 每10秒从配置中心拉取最新路由
        ngx.timer.every(10, route_manager.sync_routes)
    }
    
    server {
        listen 80;
        server_name api.example.com;
        
        location / {
            # 动态路由处理
            access_by_lua_block {
                local router = require "gateway.router"
                router.route()
            }
            
            # 动态upstream
            proxy_pass http://$upstream;
            
            # 标准代理配置
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header X-Request-Id $request_id;
        }
    }
}

 

对应的Lua路由模块:

 

-- gateway/router.lua
local _M = {}
local routes_cache = ngx.shared.routes_cache
local cjson = require "cjson"

function _M.route()
    local uri = ngx.var.uri
    local method = ngx.var.request_method
    
    -- 从缓存获取路由配置
    local route_key = method .. ":" .. uri
    local route_data = routes_cache:get(route_key)
    
    if not route_data then
        -- 模糊匹配逻辑
        route_data = _M.fuzzy_match(uri, method)
    end
    
    if route_data then
        local route = cjson.decode(route_data)
        
        -- 设置upstream
        ngx.var.upstream = route.upstream
        
        -- 添加自定义header
        if route.headers then
            for k, v in pairs(route.headers) do
                ngx.req.set_header(k, v)
            end
        end
        
        -- 路径重写
        if route.rewrite then
            ngx.req.set_uri(route.rewrite)
        end
    else
        ngx.exit(404)
    end
end

function _M.fuzzy_match(uri, method)
    -- 实现路径参数匹配 /api/user/{id} -> /api/user/123
    local all_routes = routes_cache:get("all_routes")
    if not all_routes then
        return nil
    end
    
    local routes = cjson.decode(all_routes)
    for _, route in ipairs(routes) do
        local pattern = route.path:gsub("{.-}", "([^/]+)")
        local matches = {ngx.re.match(uri, "^" .. pattern .. "$")}
        
        if matches and route.method == method then
            -- 提取路径参数
            local params = {}
            for i, match in ipairs(matches) do
                if i > 1 then
                    params[route.params[i-1]] = match
                end
            end
            
            -- 将参数传递给upstream
            ngx.ctx.path_params = params
            return cjson.encode(route)
        end
    end
    
    return nil
end

return _M

 

2.2 智能负载均衡

不仅仅是轮询,我们实现了基于响应时间的动态权重调整:

 

-- gateway/balancer.lua
local _M = {}
local upstream_cache = ngx.shared.upstream_cache

function _M.get_server(upstream_name)
    local servers_key = "servers:" .. upstream_name
    local servers_data = upstream_cache:get(servers_key)
    
    if not servers_data then
        return nil
    end
    
    local servers = cjson.decode(servers_data)
    
    -- 基于加权响应时间选择服务器
    local total_weight = 0
    local weighted_servers = {}
    
    for _, server in ipairs(servers) do
        -- 获取服务器统计信息
        local stats_key = "stats:" .. server.host .. ":" .. server.port
        local stats = upstream_cache:get(stats_key)
        
        if stats then
            stats = cjson.decode(stats)
            -- 响应时间越短,权重越高
            local weight = 1000 / (stats.avg_response_time + 1)
            -- 考虑错误率
            weight = weight * (1 - stats.error_rate)
            -- 考虑服务器配置的基础权重
            weight = weight * server.weight
            
            table.insert(weighted_servers, {
                server = server,
                weight = weight,
                range_start = total_weight,
                range_end = total_weight + weight
            })
            
            total_weight = total_weight + weight
        else
            -- 新服务器,给予默认权重
            table.insert(weighted_servers, {
                server = server,
                weight = server.weight,
                range_start = total_weight,
                range_end = total_weight + server.weight
            })
            total_weight = total_weight + server.weight
        end
    end
    
    -- 加权随机选择
    local random_weight = math.random() * total_weight
    
    for _, ws in ipairs(weighted_servers) do
        if random_weight >= ws.range_start and random_weight < ws.range_end then
            return ws.server
        end
    end
    
    -- 兜底返回第一个
    return servers[1]
end

-- 更新服务器统计信息
function _M.update_stats(server, response_time, is_error)
    local stats_key = "stats:" .. server.host .. ":" .. server.port
    local stats = upstream_cache:get(stats_key)
    
    if not stats then
        stats = {
            total_requests = 0,
            total_response_time = 0,
            avg_response_time = 0,
            error_count = 0,
            error_rate = 0
        }
    else
        stats = cjson.decode(stats)
    end
    
    -- 更新统计
    stats.total_requests = stats.total_requests + 1
    stats.total_response_time = stats.total_response_time + response_time
    stats.avg_response_time = stats.total_response_time / stats.total_requests
    
    if is_error then
        stats.error_count = stats.error_count + 1
    end
    
    stats.error_rate = stats.error_count / stats.total_requests
    
    -- 保存统计,设置过期时间防止内存泄漏
    upstream_cache:set(stats_key, cjson.encode(stats), 300)
end

return _M

 

2.3 限流熔断机制

基于令牌桶算法的分布式限流实现:

 

-- gateway/rate_limiter.lua
local _M = {}
local redis = require "resty.redis"

-- 令牌桶限流
function _M.token_bucket_limit(key, rate, capacity)
    local red = redis:new()
    red:set_timeout(1000)
    
    local ok, err = red:connect("127.0.0.1", 6379)
    if not ok then
        ngx.log(ngx.ERR, "Redis连接失败: ", err)
        return true  -- 降级放行
    end
    
    -- Lua脚本原子操作
    local script = [[
        local key = KEYS[1]
        local rate = tonumber(ARGV[1])
        local capacity = tonumber(ARGV[2])
        local now = tonumber(ARGV[3])
        local requested = tonumber(ARGV[4] or 1)
        
        local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
        local tokens = tonumber(bucket[1] or capacity)
        local last_refill = tonumber(bucket[2] or now)
        
        -- 计算应该添加的令牌数
        local elapsed = math.max(0, now - last_refill)
        local tokens_to_add = elapsed * rate
        tokens = math.min(capacity, tokens + tokens_to_add)
        
        if tokens >= requested then
            tokens = tokens - requested
            redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
            redis.call('EXPIRE', key, capacity / rate + 1)
            return 1
        else
            redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
            redis.call('EXPIRE', key, capacity / rate + 1)
            return 0
        end
    ]]
    
    local now = ngx.now()
    local res = red:eval(script, 1, key, rate, capacity, now, 1)
    
    red:set_keepalive(10000, 100)
    
    return res == 1
end

-- 熔断器实现
function _M.circuit_breaker(service_name)
    local breaker_key = "breaker:" .. service_name
    local breaker_cache = ngx.shared.breaker_cache
    
    -- 获取熔断器状态
    local state = breaker_cache:get(breaker_key .. ":state") or "closed"
    
    if state == "open" then
        -- 检查是否到了半开时间
        local open_time = breaker_cache:get(breaker_key .. ":open_time")
        if ngx.now() - open_time > 30 then  -- 30秒后尝试半开
            breaker_cache:set(breaker_key .. ":state", "half_open")
            state = "half_open"
        else
            return false, "Circuit breaker is open"
        end
    end
    
    if state == "half_open" then
        -- 半开状态,允许少量请求通过
        local half_open_count = breaker_cache:incr(breaker_key .. ":half_open_count", 1, 0)
        if half_open_count > 5 then  -- 只允许5个请求
            return false, "Circuit breaker is half open, limit exceeded"
        end
    end
    
    return true
end

-- 更新熔断器状态
function _M.update_breaker(service_name, is_success)
    local breaker_key = "breaker:" .. service_name
    local breaker_cache = ngx.shared.breaker_cache
    
    local state = breaker_cache:get(breaker_key .. ":state") or "closed"
    
    if state == "closed" then
        if not is_success then
            -- 增加失败计数
            local fail_count = breaker_cache:incr(breaker_key .. ":fail_count", 1, 0, 60)
            
            -- 10秒内失败10次,打开熔断器
            if fail_count >= 10 then
                breaker_cache:set(breaker_key .. ":state", "open")
                breaker_cache:set(breaker_key .. ":open_time", ngx.now())
                ngx.log(ngx.WARN, "Circuit breaker opened for: ", service_name)
            end
        end
    elseif state == "half_open" then
        if is_success then
            -- 半开状态成功,关闭熔断器
            breaker_cache:set(breaker_key .. ":state", "closed")
            breaker_cache:delete(breaker_key .. ":fail_count")
            breaker_cache:delete(breaker_key .. ":half_open_count")
            ngx.log(ngx.INFO, "Circuit breaker closed for: ", service_name)
        else
            -- 半开状态失败,重新打开
            breaker_cache:set(breaker_key .. ":state", "open")
            breaker_cache:set(breaker_key .. ":open_time", ngx.now())
        end
    end
end

return _M

 

2.4 统一认证鉴权

JWT认证和细粒度权限控制:

 

-- gateway/auth.lua
local _M = {}
local jwt = require "resty.jwt"
local redis = require "resty.redis"

-- JWT验证
function _M.verify_jwt()
    local auth_header = ngx.var.http_authorization
    
    if not auth_header then
        return false, "Missing authorization header"
    end
    
    local _, _, token = string.find(auth_header, "Bearer%s+(.+)")
    
    if not token then
        return false, "Invalid authorization header format"
    end
    
    -- 验证JWT
    local jwt_secret = os.getenv("JWT_SECRET") or "your-secret-key"
    local jwt_obj = jwt:verify(jwt_secret, token)
    
    if not jwt_obj.verified then
        return false, jwt_obj.reason
    end
    
    -- 检查token是否在黑名单中(用于主动失效)
    local red = redis:new()
    red:set_timeout(1000)
    local ok, err = red:connect("127.0.0.1", 6379)
    
    if ok then
        local blacklisted = red:get("blacklist:" .. token)
        if blacklisted then
            red:set_keepalive(10000, 100)
            return false, "Token has been revoked"
        end
        red:set_keepalive(10000, 100)
    end
    
    -- 将用户信息存入上下文
    ngx.ctx.user = jwt_obj.payload
    
    return true
end

-- 权限检查
function _M.check_permission(required_permission)
    local user = ngx.ctx.user
    
    if not user then
        return false, "User not authenticated"
    end
    
    -- 从缓存或数据库获取用户权限
    local permissions = _M.get_user_permissions(user.user_id)
    
    -- 支持通配符匹配
    for _, perm in ipairs(permissions) do
        if _M.match_permission(perm, required_permission) then
            return true
        end
    end
    
    return false, "Permission denied"
end

-- 权限匹配(支持通配符)
function _M.match_permission(user_perm, required_perm)
    -- 将权限字符串转换为模式
    -- user* 可以匹配 userprofile
    local pattern = user_perm:gsub("*", ".*")
    pattern = "^" .. pattern .. "$"
    
    return ngx.re.match(required_perm, pattern) ~= nil
end

-- 获取用户权限(带缓存)
function _M.get_user_permissions(user_id)
    local cache_key = "permissions:" .. user_id
    local permissions_cache = ngx.shared.permissions_cache
    
    -- 先从本地缓存获取
    local cached = permissions_cache:get(cache_key)
    if cached then
        return cjson.decode(cached)
    end
    
    -- 从Redis获取
    local red = redis:new()
    red:set_timeout(1000)
    local ok, err = red:connect("127.0.0.1", 6379)
    
    if not ok then
        ngx.log(ngx.ERR, "Redis连接失败: ", err)
        return {}
    end
    
    local permissions = red:smembers("user" .. user_id)
    red:set_keepalive(10000, 100)
    
    -- 缓存5分钟
    permissions_cache:set(cache_key, cjson.encode(permissions), 300)
    
    return permissions
end

-- API签名验证(防重放攻击)
function _M.verify_signature()
    local signature = ngx.var.http_x_signature
    local timestamp = ngx.var.http_x_timestamp
    local nonce = ngx.var.http_x_nonce
    
    if not signature or not timestamp or not nonce then
        return false, "Missing signature headers"
    end
    
    -- 检查时间戳(5分钟内有效)
    local current_time = ngx.now()
    if math.abs(current_time - tonumber(timestamp)) > 300 then
        return false, "Request expired"
    end
    
    -- 检查nonce是否已使用
    local red = redis:new()
    red:set_timeout(1000)
    local ok, err = red:connect("127.0.0.1", 6379)
    
    if ok then
        local nonce_key = "nonce:" .. nonce
        local exists = red:get(nonce_key)
        
        if exists then
            red:set_keepalive(10000, 100)
            return false, "Nonce already used"
        end
        
        -- 记录nonce,5分钟过期
        red:setex(nonce_key, 300, "1")
        red:set_keepalive(10000, 100)
    end
    
    -- 验证签名
    local method = ngx.var.request_method
    local uri = ngx.var.uri
    local body = ngx.req.get_body_data() or ""
    
    local sign_string = method .. uri .. timestamp .. nonce .. body
    local app_secret = _M.get_app_secret(ngx.var.http_x_app_id)
    
    local expected_signature = ngx.encode_base64(
        ngx.hmac_sha256(app_secret, sign_string)
    )
    
    if signature ~= expected_signature then
        return false, "Invalid signature"
    end
    
    return true
end

return _M

 

2.5 请求响应转换

处理不同版本API的兼容性:

 

-- gateway/transformer.lua
local _M = {}
local cjson = require "cjson"

-- 请求转换
function _M.transform_request()
    local uri = ngx.var.uri
    local version = ngx.var.http_x_api_version or "v2"
    
    -- 根据版本转换请求
    if version == "v1" then
        _M.transform_v1_to_v2_request()
    end
    
    -- 添加追踪头
    if not ngx.var.http_x_request_id then
        ngx.req.set_header("X-Request-Id", ngx.var.request_id)
    end
    
    -- 添加来源标识
    ngx.req.set_header("X-Gateway-Time", ngx.now())
    ngx.req.set_header("X-Forwarded-Host", ngx.var.host)
    ngx.req.set_header("X-Forwarded-Proto", ngx.var.scheme)
end

-- V1到V2的请求转换
function _M.transform_v1_to_v2_request()
    ngx.req.read_body()
    local body = ngx.req.get_body_data()
    
    if body then
        local data = cjson.decode(body)
        
        -- 字段映射
        local field_mapping = {
            user_name = "username",
            user_id = "userId",
            create_time = "createdAt"
        }
        
        for old_field, new_field in pairs(field_mapping) do
            if data[old_field] then
                data[new_field] = data[old_field]
                data[old_field] = nil
            end
        end
        
        -- 更新请求体
        ngx.req.set_body_data(cjson.encode(data))
    end
end

-- 响应转换
function _M.transform_response()
    local version = ngx.var.http_x_api_version or "v2"
    
    if version == "v1" then
        -- 获取响应体
        local resp_body = ngx.arg[1]
        
        if resp_body then
            local ok, data = pcall(cjson.decode, resp_body)
            
            if ok then
                -- V2到V1的响应转换
                data = _M.transform_v2_to_v1_response(data)
                
                -- 更新响应体
                ngx.arg[1] = cjson.encode(data)
            end
        end
    end
    
    -- 添加响应头
    ngx.header["X-Gateway-Response-Time"] = ngx.now() - ngx.ctx.start_time
    ngx.header["X-Request-Id"] = ngx.var.request_id
end

-- V2到V1的响应转换
function _M.transform_v2_to_v1_response(data)
    -- 字段映射(反向)
    local field_mapping = {
        username = "user_name",
        userId = "user_id",
        createdAt = "create_time"
    }
    
    local function transform_object(obj)
        if type(obj) ~= "table" then
            return obj
        end
        
        for new_field, old_field in pairs(field_mapping) do
            if obj[new_field] then
                obj[old_field] = obj[new_field]
                obj[new_field] = nil
            end
        end
        
        -- 递归处理嵌套对象
        for k, v in pairs(obj) do
            obj[k] = transform_object(v)
        end
        
        return obj
    end
    
    return transform_object(data)
end

-- 协议转换(GraphQL to REST)
function _M.graphql_to_rest()
    local body = ngx.req.get_body_data()
    
    if not body then
        return
    end
    
    local graphql_query = cjson.decode(body)
    
    -- 解析GraphQL查询
    local operation = graphql_query.query:match("(%w+)%s*{")
    
    -- 映射到REST端点
    local endpoint_mapping = {
        getUser = {method = "GET", path = "/api/users/"},
        createUser = {method = "POST", path = "/api/users"},
        updateUser = {method = "PUT", path = "/api/users/"},
        deleteUser = {method = "DELETE", path = "/api/users/"}
    }
    
    local mapping = endpoint_mapping[operation]
    
    if mapping then
        -- 提取参数
        local params = graphql_query.variables or {}
        
        -- 转换为REST请求
        ngx.req.set_method(ngx[mapping.method])
        
        if params.id then
            ngx.req.set_uri(mapping.path .. params.id)
            params.id = nil
        else
            ngx.req.set_uri(mapping.path)
        end
        
        -- 设置请求体
        if mapping.method ~= "GET" then
            ngx.req.set_body_data(cjson.encode(params))
        end
    end
end

return _M

 

三、性能优化:让网关飞起来

3.1 缓存策略

多级缓存架构,大幅提升响应速度:

 

# 配置本地缓存
proxy_cache_path /var/cache/nginx/api_cache 
    levels=1:2 
    keys_zone=api_cache:100m 
    max_size=10g 
    inactive=60m 
    use_temp_path=off;

# 配置缓存清理
proxy_cache_path /var/cache/nginx/static_cache 
    levels=1:2 
    keys_zone=static_cache:50m 
    max_size=5g 
    inactive=7d 
    use_temp_path=off;

server {
    location /api/ {
        # 定义缓存key
        set $cache_key "$scheme$request_method$host$request_uri$is_args$args";
        
        # Lua处理缓存逻辑
        access_by_lua_block {
            local cache = require "gateway.cache"
            cache.handle_cache()
        }
        
        # Nginx缓存配置
        proxy_cache api_cache;
        proxy_cache_key $cache_key;
        proxy_cache_valid 200 304 5m;
        proxy_cache_valid 404 1m;
        proxy_cache_use_stale error timeout updating http_500 http_502 http_503 http_504;
        proxy_cache_background_update on;
        proxy_cache_lock on;
        proxy_cache_lock_timeout 5s;
        
        # 添加缓存状态头
        add_header X-Cache-Status $upstream_cache_status;
        
        proxy_pass http://backend;
    }
    
    location /static/ {
        proxy_cache static_cache;
        proxy_cache_valid 200 304 7d;
        proxy_cache_valid any 1h;
        
        # 支持断点续传
        proxy_set_header Range $http_range;
        proxy_set_header If-Range $http_if_range;
        
        proxy_pass http://static_backend;
    }
}

 

智能缓存控制Lua脚本:

 

-- gateway/cache.lua
local _M = {}
local redis = require "resty.redis"

function _M.handle_cache()
    local method = ngx.var.request_method
    local uri = ngx.var.uri
    
    -- 只缓存GET请求
    if method ~= "GET" then
        return
    end
    
    -- 根据用户身份生成缓存key
    local cache_key = _M.generate_cache_key()
    
    -- 先从Redis获取缓存
    local cached_response = _M.get_from_redis(cache_key)
    
    if cached_response then
        -- 检查缓存是否过期
        if not _M.is_stale(cached_response) then
            ngx.header["Content-Type"] = cached_response.content_type
            ngx.header["X-Cache-Hit"] = "redis"
            ngx.say(cached_response.body)
            ngx.exit(200)
        else
            -- 异步更新缓存
            ngx.timer.at(0, _M.refresh_cache, cache_key, uri)
        end
    end
    
    -- 设置响应处理
    ngx.ctx.cache_key = cache_key
    ngx.ctx.should_cache = true
end

function _M.generate_cache_key()
    local user = ngx.ctx.user
    local uri = ngx.var.uri
    local args = ngx.var.args or ""
    
    -- 考虑用户个性化
    local user_id = user and user.user_id or "anonymous"
    
    -- 生成缓存key
    local cache_key = ngx.md5(user_id .. ":" .. uri .. ":" .. args)
    
    return cache_key
end

function _M.get_from_redis(key)
    local red = redis:new()
    red:set_timeout(1000)
    
    local ok, err = red:connect("127.0.0.1", 6379)
    if not ok then
        return nil
    end
    
    local res = red:get("cache:" .. key)
    red:set_keepalive(10000, 100)
    
    if
    ```lua
    if res and res ~= ngx.null then
        return cjson.decode(res)
    end
    
    return nil
end

function _M.is_stale(cached_response)
    local ttl = cached_response.ttl or 300
    local cached_time = cached_response.cached_at or 0
    
    return (ngx.now() - cached_time) > ttl
end

function _M.refresh_cache(cache_key, uri)
    -- 异步请求后端更新缓存
    local httpc = require("resty.http").new()
    
    local res, err = httpc:request_uri("http://backend" .. uri, {
        method = "GET",
        headers = {
            ["X-Cache-Refresh"] = "true"
        }
    })
    
    if res and res.status == 200 then
        _M.save_to_redis(cache_key, res.body, res.headers)
    end
end

function _M.save_to_redis(key, body, headers)
    local red = redis:new()
    red:set_timeout(1000)
    
    local ok, err = red:connect("127.0.0.1", 6379)
    if not ok then
        return
    end
    
    local cache_data = {
        body = body,
        content_type = headers["Content-Type"],
        cached_at = ngx.now(),
        ttl = 300
    }
    
    red:setex("cache:" .. key, 300, cjson.encode(cache_data))
    red:set_keepalive(10000, 100)
end

return _M

 

3.2 连接池优化

 

# upstream连接池配置
upstream backend {
    # 使用keepalive保持长连接
    keepalive 256;
    keepalive_requests 1000;
    keepalive_timeout 60s;
    
    # 动态服务器列表
    server 192.168.1.10:8080 max_fails=2 fail_timeout=10s;
    server 192.168.1.11:8080 max_fails=2 fail_timeout=10s;
    server 192.168.1.12:8080 max_fails=2 fail_timeout=10s backup;
    
    # 使用least_conn负载均衡算法
    least_conn;
}

http {
    # 优化连接配置
    keepalive_timeout 65;
    keepalive_requests 100;
    
    # 优化代理设置
    proxy_connect_timeout 5s;
    proxy_send_timeout 60s;
    proxy_read_timeout 60s;
    proxy_buffer_size 32k;
    proxy_buffers 4 64k;
    proxy_busy_buffers_size 128k;
    proxy_temp_file_write_size 256k;
    
    # 开启HTTP/2
    http2_max_field_size 16k;
    http2_max_header_size 32k;
    
    # 上游连接池复用
    proxy_http_version 1.1;
    proxy_set_header Connection "";
}

 

3.3 内存管理优化

 

-- gateway/memory_manager.lua
local _M = {}

-- 定期清理过期缓存
function _M.cleanup_expired_cache()
    local cache_dict = ngx.shared.routes_cache
    local keys = cache_dict:get_keys(0)  -- 获取所有键
    
    for _, key in ipairs(keys) do
        local ttl = cache_dict:ttl(key)
        
        -- 清理即将过期的键
        if ttl and ttl < 10 then
            cache_dict:delete(key)
        end
    end
end

-- 监控内存使用
function _M.monitor_memory()
    local cache_list = {
        "routes_cache",
        "upstream_cache", 
        "permissions_cache",
        "breaker_cache"
    }
    
    local memory_stats = {}
    
    for _, cache_name in ipairs(cache_list) do
        local cache = ngx.shared[cache_name]
        if cache then
            memory_stats[cache_name] = {
                capacity = cache:capacity(),
                free_space = cache:free_space()
            }
        end
    end
    
    -- 当内存使用超过80%时告警
    for name, stats in pairs(memory_stats) do
        local usage = (stats.capacity - stats.free_space) / stats.capacity
        if usage > 0.8 then
            ngx.log(ngx.WARN, string.format(
                "Memory usage warning: %s is %.2f%% full", 
                name, 
                usage * 100
            ))
            
            -- 触发清理
            _M.force_cleanup(name)
        end
    end
    
    return memory_stats
end

-- 强制清理缓存
function _M.force_cleanup(cache_name)
    local cache = ngx.shared[cache_name]
    
    if not cache then
        return
    end
    
    -- 使用LRU策略清理
    cache:flush_expired()
    
    -- 如果还是不够,清理最旧的10%
    local keys = cache:get_keys(0)
    local to_delete = math.floor(#keys * 0.1)
    
    for i = 1, to_delete do
        cache:delete(keys[i])
    end
end

-- 初始化定时任务
function _M.init_timers()
    -- 每分钟清理一次
    ngx.timer.every(60, _M.cleanup_expired_cache)
    
    -- 每5分钟监控一次内存
    ngx.timer.every(300, _M.monitor_memory)
end

return _M

 

四、监控告警:可观测性建设

4.1 日志采集方案

 

-- gateway/logger.lua
local _M = {}
local cjson = require "cjson"

-- 结构化日志
function _M.access_log()
    local log_data = {
        -- 基础信息
        timestamp = ngx.now(),
        request_id = ngx.var.request_id,
        
        -- 请求信息
        method = ngx.var.request_method,
        uri = ngx.var.uri,
        args = ngx.var.args,
        host = ngx.var.host,
        
        -- 客户端信息
        client_ip = ngx.var.remote_addr,
        user_agent = ngx.var.http_user_agent,
        referer = ngx.var.http_referer,
        
        -- 响应信息
        status = ngx.var.status,
        bytes_sent = ngx.var.bytes_sent,
        request_time = ngx.var.request_time,
        upstream_response_time = ngx.var.upstream_response_time,
        
        -- 上游信息
        upstream_addr = ngx.var.upstream_addr,
        upstream_status = ngx.var.upstream_status,
        
        -- 缓存信息
        cache_status = ngx.var.upstream_cache_status,
        
        -- 用户信息
        user_id = ngx.ctx.user and ngx.ctx.user.user_id or nil,
        
        -- 追踪信息
        trace_id = ngx.var.http_x_trace_id,
        span_id = ngx.var.http_x_span_id
    }
    
    -- 异步写入日志
    _M.write_log(log_data)
    
    -- 慢请求告警
    if tonumber(ngx.var.request_time) > 3 then
        _M.alert_slow_request(log_data)
    end
    
    -- 错误告警
    if tonumber(ngx.var.status) >= 500 then
        _M.alert_error(log_data)
    end
end

-- 写入日志到Kafka
function _M.write_log(log_data)
    local kafka_producer = require "resty.kafka.producer"
    
    local broker_list = {
        {host = "127.0.0.1", port = 9092}
    }
    
    local producer = kafka_producer:new(broker_list, {
        producer_type = "async",
        batch_num = 200,
        batch_size = 1048576,
        max_buffering = 50000
    })
    
    local ok, err = producer:send("gateway-logs", nil, cjson.encode(log_data))
    
    if not ok then
        ngx.log(ngx.ERR, "Failed to send log to Kafka: ", err)
        -- 降级写入本地文件
        _M.write_local_log(log_data)
    end
end

-- 本地日志备份
function _M.write_local_log(log_data)
    local file = io.open("/var/log/nginx/gateway_access.log", "a+")
    if file then
        file:write(cjson.encode(log_data) .. "
")
        file:close()
    end
end

-- 慢请求告警
function _M.alert_slow_request(log_data)
    local alert = {
        type = "SLOW_REQUEST",
        level = "WARNING",
        service = "api-gateway",
        message = string.format(
            "Slow request detected: %s %s took %.2fs",
            log_data.method,
            log_data.uri,
            log_data.request_time
        ),
        details = log_data,
        timestamp = ngx.now()
    }
    
    _M.send_alert(alert)
end

-- 发送告警
function _M.send_alert(alert)
    local httpc = require("resty.http").new()
    
    -- 发送到告警平台
    ngx.timer.at(0, function()
        httpc:request_uri("http://alert-system/api/alerts", {
            method = "POST",
            body = cjson.encode(alert),
            headers = {
                ["Content-Type"] = "application/json"
            }
        })
    end)
end

return _M

 

4.2 Metrics采集

 

-- gateway/metrics.lua
local _M = {}
local prometheus = require "nginx.prometheus"

-- 初始化Prometheus metrics
function _M.init()
    prometheus.init("prometheus_metrics")
    
    -- 定义metrics
    _M.request_count = prometheus:counter(
        "gateway_requests_total",
        "Total number of requests",
        {"method", "path", "status"}
    )
    
    _M.request_duration = prometheus:histogram(
        "gateway_request_duration_seconds",
        "Request duration in seconds",
        {"method", "path"}
    )
    
    _M.upstream_duration = prometheus:histogram(
        "gateway_upstream_duration_seconds", 
        "Upstream response time in seconds",
        {"upstream", "method", "path"}
    )
    
    _M.active_connections = prometheus:gauge(
        "gateway_active_connections",
        "Number of active connections"
    )
    
    _M.rate_limit_hits = prometheus:counter(
        "gateway_rate_limit_hits_total",
        "Number of rate limit hits",
        {"client", "rule"}
    )
    
    _M.circuit_breaker_state = prometheus:gauge(
        "gateway_circuit_breaker_state",
        "Circuit breaker state (0=closed, 1=open, 2=half-open)",
        {"service"}
    )
end

-- 记录请求metrics
function _M.log()
    local method = ngx.var.request_method
    local path = ngx.var.uri
    local status = ngx.var.status
    
    -- 请求计数
    _M.request_count:inc(1, {method, path, status})
    
    -- 请求耗时
    local request_time = tonumber(ngx.var.request_time) or 0
    _M.request_duration:observe(request_time, {method, path})
    
    -- 上游耗时
    local upstream_time = tonumber(ngx.var.upstream_response_time) or 0
    local upstream = ngx.var.upstream_addr or "unknown"
    _M.upstream_duration:observe(upstream_time, {upstream, method, path})
    
    -- 活跃连接数
    _M.active_connections:set(ngx.var.connections_active)
end

-- 暴露metrics端点
function _M.collect()
    prometheus:collect()
end

return _M

 

4.3 健康检查

 

# 健康检查配置
upstream backend {
    server 192.168.1.10:8080;
    server 192.168.1.11:8080;
    
    # 主动健康检查
    check interval=3000 rise=2 fall=3 timeout=1000 type=http;
    check_http_send "GET /health HTTP/1.0

";
    check_http_expect_alive http_2xx http_3xx;
}

server {
    # 网关自身健康检查端点
    location /health {
        access_log off;
        
        content_by_lua_block {
            local health = require "gateway.health"
            health.check()
        }
    }
    
    # Prometheus metrics端点
    location /metrics {
        access_log off;
        
        content_by_lua_block {
            local metrics = require "gateway.metrics"
            metrics.collect()
        }
    }
    
    # 上游健康状态页面
    location /upstream_status {
        access_log off;
        
        check_status;
        
        access_by_lua_block {
            -- 简单的IP白名单
            local allowed_ips = {
                ["127.0.0.1"] = true,
                ["10.0.0.0/8"] = true
            }
            
            local client_ip = ngx.var.remote_addr
            if not allowed_ips[client_ip] then
                ngx.exit(403)
            end
        }
    }
}

 

对应的健康检查Lua模块:

 

-- gateway/health.lua
local _M = {}

function _M.check()
    local checks = {}
    local healthy = true
    
    -- 检查Redis连接
    local redis_health = _M.check_redis()
    checks.redis = redis_health
    healthy = healthy and redis_health.healthy
    
    -- 检查上游服务
    local upstream_health = _M.check_upstreams()
    checks.upstreams = upstream_health
    healthy = healthy and upstream_health.healthy
    
    -- 检查内存使用
    local memory_health = _M.check_memory()
    checks.memory = memory_health
    healthy = healthy and memory_health.healthy
    
    -- 返回结果
    local status = healthy and 200 or 503
    
    ngx.status = status
    ngx.header["Content-Type"] = "application/json"
    ngx.say(cjson.encode({
        status = healthy and "UP" or "DOWN",
        timestamp = ngx.now(),
        checks = checks
    }))
end

function _M.check_redis()
    local redis = require "resty.redis"
    local red = redis:new()
    red:set_timeout(1000)
    
    local ok, err = red:connect("127.0.0.1", 6379)
    
    if not ok then
        return {
            healthy = false,
            message = "Redis connection failed: " .. err
        }
    end
    
    -- 测试读写
    local res = red:ping()
    red:set_keepalive(10000, 100)
    
    return {
        healthy = res == "PONG",
        message = res == "PONG" and "Redis is healthy" or "Redis ping failed"
    }
end

function _M.check_upstreams()
    local upstream_cache = ngx.shared.upstream_cache
    local all_upstreams = upstream_cache:get("all_upstreams")
    
    if not all_upstreams then
        return {
            healthy = false,
            message = "No upstreams configured"
        }
    end
    
    local upstreams = cjson.decode(all_upstreams)
    local healthy_count = 0
    local total_count = 0
    
    for name, servers in pairs(upstreams) do
        for _, server in ipairs(servers) do
            total_count = total_count + 1
            
            local stats_key = "stats:" .. server.host .. ":" .. server.port
            local stats = upstream_cache:get(stats_key)
            
            if stats then
                stats = cjson.decode(stats)
                if stats.error_rate < 0.1 then  -- 错误率小于10%
                    healthy_count = healthy_count + 1
                end
            end
        end
    end
    
    local health_ratio = healthy_count / total_count
    
    return {
        healthy = health_ratio > 0.5,  -- 超过50%健康即可
        message = string.format("%d/%d upstreams healthy", healthy_count, total_count),
        details = {
            healthy = healthy_count,
            total = total_count,
            ratio = health_ratio
        }
    }
end

function _M.check_memory()
    local memory_manager = require "gateway.memory_manager"
    local stats = memory_manager.monitor_memory()
    
    local max_usage = 0
    for name, stat in pairs(stats) do
        local usage = (stat.capacity - stat.free_space) / stat.capacity
        if usage > max_usage then
            max_usage = usage
        end
    end
    
    return {
        healthy = max_usage < 0.9,  -- 内存使用率小于90%
        message = string.format("Memory usage: %.2f%%", max_usage * 100),
        details = stats
    }
end

return _M

 

五、高可用部署方案

5.1 多活架构

 

# docker-compose.yml
version: '3.8'

services:
  # 网关节点1
  gateway-1:
    image: openresty/openresty:alpine
    container_name: gateway-1
    volumes:
      - ./conf/nginx.conf:/usr/local/openresty/nginx/conf/nginx.conf
      - ./lua:/usr/local/openresty/lualib/gateway
      - ./logs/gateway-1:/var/log/nginx
    ports:
      - "8080:80"
    environment:
      - GATEWAY_NODE_ID=node-1
      - REDIS_HOST=redis
      - CONSUL_HOST=consul
    depends_on:
      - redis
      - consul
    networks:
      - gateway-network
    deploy:
      resources:
        limits:
          cpus: '2'
          memory: 2G
        reservations:
          cpus: '1'
          memory: 1G
  
  # 网关节点2
  gateway-2:
    image: openresty/openresty:alpine
    container_name: gateway-2
    volumes:
      - ./conf/nginx.conf:/usr/local/openresty/nginx/conf/nginx.conf
      - ./lua:/usr/local/openresty/lualib/gateway
      - ./logs/gateway-2:/var/log/nginx
    ports:
      - "8081:80"
    environment:
      - GATEWAY_NODE_ID=node-2
      - REDIS_HOST=redis
      - CONSUL_HOST=consul
    depends_on:
      - redis
      - consul
    networks:
      - gateway-network
    deploy:
      resources:
        limits:
          cpus: '2'
          memory: 2G
          
  # Keepalived + HAProxy实现高可用
  haproxy:
    image: haproxy:2.4-alpine
    container_name: haproxy
    volumes:
      - ./conf/haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg
    ports:
      - "80:80"
      - "443:443"
      - "8404:8404"  # Stats页面
    depends_on:
      - gateway-1
      - gateway-2
    networks:
      - gateway-network
      
  # Redis集群
  redis:
    image: redis:6-alpine
    container_name: redis
    command: redis-server --appendonly yes
    volumes:
      - redis-data:/data
    ports:
      - "6379:6379"
    networks:
      - gateway-network
      
  # Consul服务发现
  consul:
    image: consul:1.10
    container_name: consul
    command: agent -server -bootstrap-expect=1 -ui -client=0.0.0.0
    ports:
      - "8500:8500"
      - "8600:8600/udp"
    networks:
      - gateway-network
      
  # Prometheus监控
  prometheus:
    image: prom/prometheus:latest
    container_name: prometheus
    volumes:
      - ./conf/prometheus.yml:/etc/prometheus/prometheus.yml
      - prometheus-data:/prometheus
    ports:
      - "9090:9090"
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'
    networks:
      - gateway-network
      
  # Grafana可视化
  grafana:
    image: grafana/grafana:latest
    container_name: grafana
    volumes:
      - grafana-data:/var/lib/grafana
      - ./conf/grafana/dashboards:/etc/grafana/provisioning/dashboards
      - ./conf/grafana/datasources:/etc/grafana/provisioning/datasources
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
    networks:
      - gateway-network

networks:
  gateway-network:
    driver: bridge

volumes:
  redis-data:
  prometheus-data:
  grafana-data:

 

5.2 灰度发布

 

-- gateway/canary.lua
local _M = {}

-- 灰度发布策略
function _M.route_canary()
    local uri = ngx.var.uri
    local headers = ngx.req.get_headers()
    
    -- 策略1:基于Header的灰度
    if headers["X-Canary"] == "true" then
        return _M.get_canary_upstream()
    end
    
    -- 策略2:基于Cookie的灰度
    local cookie_canary = ngx.var.cookie_canary
    if cookie_canary == "true" then
        return _M.get_canary_upstream()
    end
    
    -- 策略3:基于用户ID的灰度
    local user = ngx.ctx.user
    if user and _M.is_canary_user(user.user_id) then
        return _M.get_canary_upstream()
    end
    
    -- 策略4:基于流量百分比的灰度
    local canary_percentage = _M.get_canary_percentage(uri)
    if canary_percentage > 0 then
        local random = math.random(100)
        if random <= canary_percentage then
            return _M.get_canary_upstream()
        end
    end
    
    -- 默认返回稳定版本
    return _M.get_stable_upstream()
end

-- 判断是否为灰度用户
function _M.is_canary_user(user_id)
    local canary_users = ngx.shared.canary_cache:get("canary_users")
    
    if not canary_users then
        return false
    end
    
    canary_users = cjson.decode(canary_users)
    
    -- 支持用户列表
    for _, id in ipairs(canary_users) do
        if id == user_id then
            return true
        end
    end
    
    -- 支持用户ID范围
    local user_id_num = tonumber(user_id)
    if user_id_num and user_id_num % 100 < 10 then  -- 10%的用户
        return true
    end
    
    return false
end

-- 获取灰度百分比
function _M.get_canary_percentage(uri)
    local canary_rules = ngx.shared.canary_cache:get("canary_rules")
    
    if not canary_rules then
        return 0
    end
    
    canary_rules = cjson.decode(canary_rules)
    
    for _, rule in ipairs(canary_rules) do
        if ngx.re.match(uri, rule.pattern) then
            return rule.percentage
        end
    end
    
    return 0
end

-- 获取灰度上游
function _M.get_canary_upstream()
    ngx.header["X-Canary-Version"] = "canary"
    return "canary_backend"
end

-- 获取稳定版上游
function _M.get_stable_upstream()
    ngx.header["X-Canary-Version"] = "stable"
    return "stable_backend"
end

-- 灰度发布控制API
function _M.control_api()
    local method = ngx.var.request_method
    local uri = ngx.var.uri
    
    if uri == "/api/canary/percentage" then
        if method == "GET" then
            _M.get_percentage_api()
        elseif method == "POST" then
            _M.set_percentage_api()
        end
    elseif uri == "/api/canary/users" then
        if method == "GET" then
            _M.get_users_api()
        elseif method == "POST" then
            _M.set_users_api()
        end
    end
end

return _M

 

六、实战案例与性能数据

6.1 性能测试结果

在我们的生产环境中,经过优化后的网关性能数据:

指标 数值 测试条件
QPS 100,000+ 8核16G单节点
P99延迟 < 10ms 不包含业务处理时间
P95延迟 < 5ms 不包含业务处理时间
CPU使用率 40-60% 高峰期
内存使用 2-4GB 含缓存
连接数 50,000+ 并发连接

6.2 故障处理案例

Case 1: 下游服务雪崩

• 问题:某个核心服务故障,导致大量请求堆积

• 解决:熔断器自动开启,返回降级响应,避免故障扩散

• 效果:整体可用性保持99.9%

Case 2: DDoS攻击

• 问题:遭受每秒百万级请求攻击

• 解决:多层限流+IP黑名单自动封禁

• 效果:业务完全无感知

七、踩坑总结与最佳实践

7.1 踩过的坑

1. Nginx reload导致连接断开

• 问题:配置更新需要reload,导致连接中断

• 解决:使用动态配置,避免reload

2. 内存泄漏问题

• 问题:Lua脚本内存泄漏

• 解决:正确使用连接池,及时清理变量

3. DNS解析缓存

• 问题:上游服务IP变更后无法及时感知

• 解决:配置resolver和合理的DNS缓存时间

7.2 最佳实践建议

1. 渐进式改造:不要一次性改造所有功能,分阶段实施

2. 充分测试:压测、故障演练必不可少

3. 监控先行:完善的监控是稳定性的基础

4. 文档完善:维护详细的运维文档和故障处理手册

5. 定期演练:定期进行故障演练,验证高可用方案

总结

基于Nginx构建微服务网关是一个系统工程,需要在架构设计、功能实现、性能优化、高可用等多个方面深入思考和实践。本文分享的方案和代码都经过生产环境验证,希望能给大家一些参考。

网关作为微服务架构的咽喉要道,其重要性不言而喻。一个设计良好的网关不仅能提供统一的入口,还能大幅简化微服务的复杂度,提升整体系统的可维护性。

 

 

打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分