基于Nginx的微服务网关实现与最佳实践:从零打造企业级API Gateway
引言:为什么你的微服务架构需要一个强大的网关?
还记得上次生产环境的那个事故吗?某个服务突然涌入大量请求,没有限流保护,直接把下游服务打挂了。或者是那次安全审计,发现有些API接口裸奔在公网上,没有任何认证机制。又或者是运维同学半夜被叫起来,因为某个服务的日志分散在十几台机器上,根本无法快速定位问题...
这些痛点,其实都指向同一个解决方案:你需要一个统一的API网关。
今天,我将分享我们团队如何基于Nginx构建了一个日均处理10亿+请求的微服务网关,以及踩过的那些坑。这套方案已经稳定运行2年+,经历过多次大促考验。
一、架构设计:不只是反向代理那么简单
1.1 整体架构设计
我们的网关架构分为四层:
├── 接入层(DNS + CDN) ├── 网关层(Nginx + OpenResty) ├── 服务层(微服务集群) └── 数据层(Redis + MySQL + MongoDB)
核心设计理念:
• 高可用:多活部署,自动故障转移
• 高性能:充分利用Nginx的事件驱动模型
• 可扩展:基于OpenResty的Lua脚本扩展
• 可观测:完整的监控和日志体系
1.2 技术选型对比
| 方案 | 优势 | 劣势 | 适用场景 |
|---|---|---|---|
| Nginx + OpenResty | 性能极高、稳定性好、运维成熟 | 功能相对简单、需要二次开发 | 高并发、低延迟要求 |
| Kong | 功能丰富、插件生态好 | 性能损耗较大、运维复杂 | 中小规模、快速搭建 |
| Spring Cloud Gateway | Java生态友好、功能完善 | 性能一般、资源占用高 | Java技术栈 |
| Envoy | 云原生、功能强大 | 学习曲线陡、配置复杂 | K8s环境 |
二、核心功能实现:从配置到代码
2.1 动态路由配置
传统的Nginx配置需要reload才能生效,这在生产环境是不可接受的。我们的解决方案:
# nginx.conf 核心配置
http {
# 引入Lua模块
lua_package_path "/usr/local/openresty/lualib/?.lua;;";
lua_shared_dict routes_cache 100m;
lua_shared_dict upstream_cache 100m;
# 初始化阶段加载路由
init_by_lua_block {
local route_manager = require "gateway.route_manager"
route_manager.init()
}
# 定时更新路由配置
init_worker_by_lua_block {
local route_manager = require "gateway.route_manager"
-- 每10秒从配置中心拉取最新路由
ngx.timer.every(10, route_manager.sync_routes)
}
server {
listen 80;
server_name api.example.com;
location / {
# 动态路由处理
access_by_lua_block {
local router = require "gateway.router"
router.route()
}
# 动态upstream
proxy_pass http://$upstream;
# 标准代理配置
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Request-Id $request_id;
}
}
}
对应的Lua路由模块:
-- gateway/router.lua
local _M = {}
local routes_cache = ngx.shared.routes_cache
local cjson = require "cjson"
function _M.route()
local uri = ngx.var.uri
local method = ngx.var.request_method
-- 从缓存获取路由配置
local route_key = method .. ":" .. uri
local route_data = routes_cache:get(route_key)
if not route_data then
-- 模糊匹配逻辑
route_data = _M.fuzzy_match(uri, method)
end
if route_data then
local route = cjson.decode(route_data)
-- 设置upstream
ngx.var.upstream = route.upstream
-- 添加自定义header
if route.headers then
for k, v in pairs(route.headers) do
ngx.req.set_header(k, v)
end
end
-- 路径重写
if route.rewrite then
ngx.req.set_uri(route.rewrite)
end
else
ngx.exit(404)
end
end
function _M.fuzzy_match(uri, method)
-- 实现路径参数匹配 /api/user/{id} -> /api/user/123
local all_routes = routes_cache:get("all_routes")
if not all_routes then
return nil
end
local routes = cjson.decode(all_routes)
for _, route in ipairs(routes) do
local pattern = route.path:gsub("{.-}", "([^/]+)")
local matches = {ngx.re.match(uri, "^" .. pattern .. "$")}
if matches and route.method == method then
-- 提取路径参数
local params = {}
for i, match in ipairs(matches) do
if i > 1 then
params[route.params[i-1]] = match
end
end
-- 将参数传递给upstream
ngx.ctx.path_params = params
return cjson.encode(route)
end
end
return nil
end
return _M
2.2 智能负载均衡
不仅仅是轮询,我们实现了基于响应时间的动态权重调整:
-- gateway/balancer.lua
local _M = {}
local upstream_cache = ngx.shared.upstream_cache
function _M.get_server(upstream_name)
local servers_key = "servers:" .. upstream_name
local servers_data = upstream_cache:get(servers_key)
if not servers_data then
return nil
end
local servers = cjson.decode(servers_data)
-- 基于加权响应时间选择服务器
local total_weight = 0
local weighted_servers = {}
for _, server in ipairs(servers) do
-- 获取服务器统计信息
local stats_key = "stats:" .. server.host .. ":" .. server.port
local stats = upstream_cache:get(stats_key)
if stats then
stats = cjson.decode(stats)
-- 响应时间越短,权重越高
local weight = 1000 / (stats.avg_response_time + 1)
-- 考虑错误率
weight = weight * (1 - stats.error_rate)
-- 考虑服务器配置的基础权重
weight = weight * server.weight
table.insert(weighted_servers, {
server = server,
weight = weight,
range_start = total_weight,
range_end = total_weight + weight
})
total_weight = total_weight + weight
else
-- 新服务器,给予默认权重
table.insert(weighted_servers, {
server = server,
weight = server.weight,
range_start = total_weight,
range_end = total_weight + server.weight
})
total_weight = total_weight + server.weight
end
end
-- 加权随机选择
local random_weight = math.random() * total_weight
for _, ws in ipairs(weighted_servers) do
if random_weight >= ws.range_start and random_weight < ws.range_end then
return ws.server
end
end
-- 兜底返回第一个
return servers[1]
end
-- 更新服务器统计信息
function _M.update_stats(server, response_time, is_error)
local stats_key = "stats:" .. server.host .. ":" .. server.port
local stats = upstream_cache:get(stats_key)
if not stats then
stats = {
total_requests = 0,
total_response_time = 0,
avg_response_time = 0,
error_count = 0,
error_rate = 0
}
else
stats = cjson.decode(stats)
end
-- 更新统计
stats.total_requests = stats.total_requests + 1
stats.total_response_time = stats.total_response_time + response_time
stats.avg_response_time = stats.total_response_time / stats.total_requests
if is_error then
stats.error_count = stats.error_count + 1
end
stats.error_rate = stats.error_count / stats.total_requests
-- 保存统计,设置过期时间防止内存泄漏
upstream_cache:set(stats_key, cjson.encode(stats), 300)
end
return _M
2.3 限流熔断机制
基于令牌桶算法的分布式限流实现:
-- gateway/rate_limiter.lua
local _M = {}
local redis = require "resty.redis"
-- 令牌桶限流
function _M.token_bucket_limit(key, rate, capacity)
local red = redis:new()
red:set_timeout(1000)
local ok, err = red:connect("127.0.0.1", 6379)
if not ok then
ngx.log(ngx.ERR, "Redis连接失败: ", err)
return true -- 降级放行
end
-- Lua脚本原子操作
local script = [[
local key = KEYS[1]
local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4] or 1)
local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
local tokens = tonumber(bucket[1] or capacity)
local last_refill = tonumber(bucket[2] or now)
-- 计算应该添加的令牌数
local elapsed = math.max(0, now - last_refill)
local tokens_to_add = elapsed * rate
tokens = math.min(capacity, tokens + tokens_to_add)
if tokens >= requested then
tokens = tokens - requested
redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
redis.call('EXPIRE', key, capacity / rate + 1)
return 1
else
redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
redis.call('EXPIRE', key, capacity / rate + 1)
return 0
end
]]
local now = ngx.now()
local res = red:eval(script, 1, key, rate, capacity, now, 1)
red:set_keepalive(10000, 100)
return res == 1
end
-- 熔断器实现
function _M.circuit_breaker(service_name)
local breaker_key = "breaker:" .. service_name
local breaker_cache = ngx.shared.breaker_cache
-- 获取熔断器状态
local state = breaker_cache:get(breaker_key .. ":state") or "closed"
if state == "open" then
-- 检查是否到了半开时间
local open_time = breaker_cache:get(breaker_key .. ":open_time")
if ngx.now() - open_time > 30 then -- 30秒后尝试半开
breaker_cache:set(breaker_key .. ":state", "half_open")
state = "half_open"
else
return false, "Circuit breaker is open"
end
end
if state == "half_open" then
-- 半开状态,允许少量请求通过
local half_open_count = breaker_cache:incr(breaker_key .. ":half_open_count", 1, 0)
if half_open_count > 5 then -- 只允许5个请求
return false, "Circuit breaker is half open, limit exceeded"
end
end
return true
end
-- 更新熔断器状态
function _M.update_breaker(service_name, is_success)
local breaker_key = "breaker:" .. service_name
local breaker_cache = ngx.shared.breaker_cache
local state = breaker_cache:get(breaker_key .. ":state") or "closed"
if state == "closed" then
if not is_success then
-- 增加失败计数
local fail_count = breaker_cache:incr(breaker_key .. ":fail_count", 1, 0, 60)
-- 10秒内失败10次,打开熔断器
if fail_count >= 10 then
breaker_cache:set(breaker_key .. ":state", "open")
breaker_cache:set(breaker_key .. ":open_time", ngx.now())
ngx.log(ngx.WARN, "Circuit breaker opened for: ", service_name)
end
end
elseif state == "half_open" then
if is_success then
-- 半开状态成功,关闭熔断器
breaker_cache:set(breaker_key .. ":state", "closed")
breaker_cache:delete(breaker_key .. ":fail_count")
breaker_cache:delete(breaker_key .. ":half_open_count")
ngx.log(ngx.INFO, "Circuit breaker closed for: ", service_name)
else
-- 半开状态失败,重新打开
breaker_cache:set(breaker_key .. ":state", "open")
breaker_cache:set(breaker_key .. ":open_time", ngx.now())
end
end
end
return _M
2.4 统一认证鉴权
JWT认证和细粒度权限控制:
-- gateway/auth.lua
local _M = {}
local jwt = require "resty.jwt"
local redis = require "resty.redis"
-- JWT验证
function _M.verify_jwt()
local auth_header = ngx.var.http_authorization
if not auth_header then
return false, "Missing authorization header"
end
local _, _, token = string.find(auth_header, "Bearer%s+(.+)")
if not token then
return false, "Invalid authorization header format"
end
-- 验证JWT
local jwt_secret = os.getenv("JWT_SECRET") or "your-secret-key"
local jwt_obj = jwt:verify(jwt_secret, token)
if not jwt_obj.verified then
return false, jwt_obj.reason
end
-- 检查token是否在黑名单中(用于主动失效)
local red = redis:new()
red:set_timeout(1000)
local ok, err = red:connect("127.0.0.1", 6379)
if ok then
local blacklisted = red:get("blacklist:" .. token)
if blacklisted then
red:set_keepalive(10000, 100)
return false, "Token has been revoked"
end
red:set_keepalive(10000, 100)
end
-- 将用户信息存入上下文
ngx.ctx.user = jwt_obj.payload
return true
end
-- 权限检查
function _M.check_permission(required_permission)
local user = ngx.ctx.user
if not user then
return false, "User not authenticated"
end
-- 从缓存或数据库获取用户权限
local permissions = _M.get_user_permissions(user.user_id)
-- 支持通配符匹配
for _, perm in ipairs(permissions) do
if _M.match_permission(perm, required_permission) then
return true
end
end
return false, "Permission denied"
end
-- 权限匹配(支持通配符)
function _M.match_permission(user_perm, required_perm)
-- 将权限字符串转换为模式
-- user* 可以匹配 userprofile
local pattern = user_perm:gsub("*", ".*")
pattern = "^" .. pattern .. "$"
return ngx.re.match(required_perm, pattern) ~= nil
end
-- 获取用户权限(带缓存)
function _M.get_user_permissions(user_id)
local cache_key = "permissions:" .. user_id
local permissions_cache = ngx.shared.permissions_cache
-- 先从本地缓存获取
local cached = permissions_cache:get(cache_key)
if cached then
return cjson.decode(cached)
end
-- 从Redis获取
local red = redis:new()
red:set_timeout(1000)
local ok, err = red:connect("127.0.0.1", 6379)
if not ok then
ngx.log(ngx.ERR, "Redis连接失败: ", err)
return {}
end
local permissions = red:smembers("user" .. user_id)
red:set_keepalive(10000, 100)
-- 缓存5分钟
permissions_cache:set(cache_key, cjson.encode(permissions), 300)
return permissions
end
-- API签名验证(防重放攻击)
function _M.verify_signature()
local signature = ngx.var.http_x_signature
local timestamp = ngx.var.http_x_timestamp
local nonce = ngx.var.http_x_nonce
if not signature or not timestamp or not nonce then
return false, "Missing signature headers"
end
-- 检查时间戳(5分钟内有效)
local current_time = ngx.now()
if math.abs(current_time - tonumber(timestamp)) > 300 then
return false, "Request expired"
end
-- 检查nonce是否已使用
local red = redis:new()
red:set_timeout(1000)
local ok, err = red:connect("127.0.0.1", 6379)
if ok then
local nonce_key = "nonce:" .. nonce
local exists = red:get(nonce_key)
if exists then
red:set_keepalive(10000, 100)
return false, "Nonce already used"
end
-- 记录nonce,5分钟过期
red:setex(nonce_key, 300, "1")
red:set_keepalive(10000, 100)
end
-- 验证签名
local method = ngx.var.request_method
local uri = ngx.var.uri
local body = ngx.req.get_body_data() or ""
local sign_string = method .. uri .. timestamp .. nonce .. body
local app_secret = _M.get_app_secret(ngx.var.http_x_app_id)
local expected_signature = ngx.encode_base64(
ngx.hmac_sha256(app_secret, sign_string)
)
if signature ~= expected_signature then
return false, "Invalid signature"
end
return true
end
return _M
2.5 请求响应转换
处理不同版本API的兼容性:
-- gateway/transformer.lua
local _M = {}
local cjson = require "cjson"
-- 请求转换
function _M.transform_request()
local uri = ngx.var.uri
local version = ngx.var.http_x_api_version or "v2"
-- 根据版本转换请求
if version == "v1" then
_M.transform_v1_to_v2_request()
end
-- 添加追踪头
if not ngx.var.http_x_request_id then
ngx.req.set_header("X-Request-Id", ngx.var.request_id)
end
-- 添加来源标识
ngx.req.set_header("X-Gateway-Time", ngx.now())
ngx.req.set_header("X-Forwarded-Host", ngx.var.host)
ngx.req.set_header("X-Forwarded-Proto", ngx.var.scheme)
end
-- V1到V2的请求转换
function _M.transform_v1_to_v2_request()
ngx.req.read_body()
local body = ngx.req.get_body_data()
if body then
local data = cjson.decode(body)
-- 字段映射
local field_mapping = {
user_name = "username",
user_id = "userId",
create_time = "createdAt"
}
for old_field, new_field in pairs(field_mapping) do
if data[old_field] then
data[new_field] = data[old_field]
data[old_field] = nil
end
end
-- 更新请求体
ngx.req.set_body_data(cjson.encode(data))
end
end
-- 响应转换
function _M.transform_response()
local version = ngx.var.http_x_api_version or "v2"
if version == "v1" then
-- 获取响应体
local resp_body = ngx.arg[1]
if resp_body then
local ok, data = pcall(cjson.decode, resp_body)
if ok then
-- V2到V1的响应转换
data = _M.transform_v2_to_v1_response(data)
-- 更新响应体
ngx.arg[1] = cjson.encode(data)
end
end
end
-- 添加响应头
ngx.header["X-Gateway-Response-Time"] = ngx.now() - ngx.ctx.start_time
ngx.header["X-Request-Id"] = ngx.var.request_id
end
-- V2到V1的响应转换
function _M.transform_v2_to_v1_response(data)
-- 字段映射(反向)
local field_mapping = {
username = "user_name",
userId = "user_id",
createdAt = "create_time"
}
local function transform_object(obj)
if type(obj) ~= "table" then
return obj
end
for new_field, old_field in pairs(field_mapping) do
if obj[new_field] then
obj[old_field] = obj[new_field]
obj[new_field] = nil
end
end
-- 递归处理嵌套对象
for k, v in pairs(obj) do
obj[k] = transform_object(v)
end
return obj
end
return transform_object(data)
end
-- 协议转换(GraphQL to REST)
function _M.graphql_to_rest()
local body = ngx.req.get_body_data()
if not body then
return
end
local graphql_query = cjson.decode(body)
-- 解析GraphQL查询
local operation = graphql_query.query:match("(%w+)%s*{")
-- 映射到REST端点
local endpoint_mapping = {
getUser = {method = "GET", path = "/api/users/"},
createUser = {method = "POST", path = "/api/users"},
updateUser = {method = "PUT", path = "/api/users/"},
deleteUser = {method = "DELETE", path = "/api/users/"}
}
local mapping = endpoint_mapping[operation]
if mapping then
-- 提取参数
local params = graphql_query.variables or {}
-- 转换为REST请求
ngx.req.set_method(ngx[mapping.method])
if params.id then
ngx.req.set_uri(mapping.path .. params.id)
params.id = nil
else
ngx.req.set_uri(mapping.path)
end
-- 设置请求体
if mapping.method ~= "GET" then
ngx.req.set_body_data(cjson.encode(params))
end
end
end
return _M
三、性能优化:让网关飞起来
3.1 缓存策略
多级缓存架构,大幅提升响应速度:
# 配置本地缓存
proxy_cache_path /var/cache/nginx/api_cache
levels=1:2
keys_zone=api_cache:100m
max_size=10g
inactive=60m
use_temp_path=off;
# 配置缓存清理
proxy_cache_path /var/cache/nginx/static_cache
levels=1:2
keys_zone=static_cache:50m
max_size=5g
inactive=7d
use_temp_path=off;
server {
location /api/ {
# 定义缓存key
set $cache_key "$scheme$request_method$host$request_uri$is_args$args";
# Lua处理缓存逻辑
access_by_lua_block {
local cache = require "gateway.cache"
cache.handle_cache()
}
# Nginx缓存配置
proxy_cache api_cache;
proxy_cache_key $cache_key;
proxy_cache_valid 200 304 5m;
proxy_cache_valid 404 1m;
proxy_cache_use_stale error timeout updating http_500 http_502 http_503 http_504;
proxy_cache_background_update on;
proxy_cache_lock on;
proxy_cache_lock_timeout 5s;
# 添加缓存状态头
add_header X-Cache-Status $upstream_cache_status;
proxy_pass http://backend;
}
location /static/ {
proxy_cache static_cache;
proxy_cache_valid 200 304 7d;
proxy_cache_valid any 1h;
# 支持断点续传
proxy_set_header Range $http_range;
proxy_set_header If-Range $http_if_range;
proxy_pass http://static_backend;
}
}
智能缓存控制Lua脚本:
-- gateway/cache.lua
local _M = {}
local redis = require "resty.redis"
function _M.handle_cache()
local method = ngx.var.request_method
local uri = ngx.var.uri
-- 只缓存GET请求
if method ~= "GET" then
return
end
-- 根据用户身份生成缓存key
local cache_key = _M.generate_cache_key()
-- 先从Redis获取缓存
local cached_response = _M.get_from_redis(cache_key)
if cached_response then
-- 检查缓存是否过期
if not _M.is_stale(cached_response) then
ngx.header["Content-Type"] = cached_response.content_type
ngx.header["X-Cache-Hit"] = "redis"
ngx.say(cached_response.body)
ngx.exit(200)
else
-- 异步更新缓存
ngx.timer.at(0, _M.refresh_cache, cache_key, uri)
end
end
-- 设置响应处理
ngx.ctx.cache_key = cache_key
ngx.ctx.should_cache = true
end
function _M.generate_cache_key()
local user = ngx.ctx.user
local uri = ngx.var.uri
local args = ngx.var.args or ""
-- 考虑用户个性化
local user_id = user and user.user_id or "anonymous"
-- 生成缓存key
local cache_key = ngx.md5(user_id .. ":" .. uri .. ":" .. args)
return cache_key
end
function _M.get_from_redis(key)
local red = redis:new()
red:set_timeout(1000)
local ok, err = red:connect("127.0.0.1", 6379)
if not ok then
return nil
end
local res = red:get("cache:" .. key)
red:set_keepalive(10000, 100)
if
```lua
if res and res ~= ngx.null then
return cjson.decode(res)
end
return nil
end
function _M.is_stale(cached_response)
local ttl = cached_response.ttl or 300
local cached_time = cached_response.cached_at or 0
return (ngx.now() - cached_time) > ttl
end
function _M.refresh_cache(cache_key, uri)
-- 异步请求后端更新缓存
local httpc = require("resty.http").new()
local res, err = httpc:request_uri("http://backend" .. uri, {
method = "GET",
headers = {
["X-Cache-Refresh"] = "true"
}
})
if res and res.status == 200 then
_M.save_to_redis(cache_key, res.body, res.headers)
end
end
function _M.save_to_redis(key, body, headers)
local red = redis:new()
red:set_timeout(1000)
local ok, err = red:connect("127.0.0.1", 6379)
if not ok then
return
end
local cache_data = {
body = body,
content_type = headers["Content-Type"],
cached_at = ngx.now(),
ttl = 300
}
red:setex("cache:" .. key, 300, cjson.encode(cache_data))
red:set_keepalive(10000, 100)
end
return _M
3.2 连接池优化
# upstream连接池配置
upstream backend {
# 使用keepalive保持长连接
keepalive 256;
keepalive_requests 1000;
keepalive_timeout 60s;
# 动态服务器列表
server 192.168.1.10:8080 max_fails=2 fail_timeout=10s;
server 192.168.1.11:8080 max_fails=2 fail_timeout=10s;
server 192.168.1.12:8080 max_fails=2 fail_timeout=10s backup;
# 使用least_conn负载均衡算法
least_conn;
}
http {
# 优化连接配置
keepalive_timeout 65;
keepalive_requests 100;
# 优化代理设置
proxy_connect_timeout 5s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
proxy_buffer_size 32k;
proxy_buffers 4 64k;
proxy_busy_buffers_size 128k;
proxy_temp_file_write_size 256k;
# 开启HTTP/2
http2_max_field_size 16k;
http2_max_header_size 32k;
# 上游连接池复用
proxy_http_version 1.1;
proxy_set_header Connection "";
}
3.3 内存管理优化
-- gateway/memory_manager.lua
local _M = {}
-- 定期清理过期缓存
function _M.cleanup_expired_cache()
local cache_dict = ngx.shared.routes_cache
local keys = cache_dict:get_keys(0) -- 获取所有键
for _, key in ipairs(keys) do
local ttl = cache_dict:ttl(key)
-- 清理即将过期的键
if ttl and ttl < 10 then
cache_dict:delete(key)
end
end
end
-- 监控内存使用
function _M.monitor_memory()
local cache_list = {
"routes_cache",
"upstream_cache",
"permissions_cache",
"breaker_cache"
}
local memory_stats = {}
for _, cache_name in ipairs(cache_list) do
local cache = ngx.shared[cache_name]
if cache then
memory_stats[cache_name] = {
capacity = cache:capacity(),
free_space = cache:free_space()
}
end
end
-- 当内存使用超过80%时告警
for name, stats in pairs(memory_stats) do
local usage = (stats.capacity - stats.free_space) / stats.capacity
if usage > 0.8 then
ngx.log(ngx.WARN, string.format(
"Memory usage warning: %s is %.2f%% full",
name,
usage * 100
))
-- 触发清理
_M.force_cleanup(name)
end
end
return memory_stats
end
-- 强制清理缓存
function _M.force_cleanup(cache_name)
local cache = ngx.shared[cache_name]
if not cache then
return
end
-- 使用LRU策略清理
cache:flush_expired()
-- 如果还是不够,清理最旧的10%
local keys = cache:get_keys(0)
local to_delete = math.floor(#keys * 0.1)
for i = 1, to_delete do
cache:delete(keys[i])
end
end
-- 初始化定时任务
function _M.init_timers()
-- 每分钟清理一次
ngx.timer.every(60, _M.cleanup_expired_cache)
-- 每5分钟监控一次内存
ngx.timer.every(300, _M.monitor_memory)
end
return _M
四、监控告警:可观测性建设
4.1 日志采集方案
-- gateway/logger.lua
local _M = {}
local cjson = require "cjson"
-- 结构化日志
function _M.access_log()
local log_data = {
-- 基础信息
timestamp = ngx.now(),
request_id = ngx.var.request_id,
-- 请求信息
method = ngx.var.request_method,
uri = ngx.var.uri,
args = ngx.var.args,
host = ngx.var.host,
-- 客户端信息
client_ip = ngx.var.remote_addr,
user_agent = ngx.var.http_user_agent,
referer = ngx.var.http_referer,
-- 响应信息
status = ngx.var.status,
bytes_sent = ngx.var.bytes_sent,
request_time = ngx.var.request_time,
upstream_response_time = ngx.var.upstream_response_time,
-- 上游信息
upstream_addr = ngx.var.upstream_addr,
upstream_status = ngx.var.upstream_status,
-- 缓存信息
cache_status = ngx.var.upstream_cache_status,
-- 用户信息
user_id = ngx.ctx.user and ngx.ctx.user.user_id or nil,
-- 追踪信息
trace_id = ngx.var.http_x_trace_id,
span_id = ngx.var.http_x_span_id
}
-- 异步写入日志
_M.write_log(log_data)
-- 慢请求告警
if tonumber(ngx.var.request_time) > 3 then
_M.alert_slow_request(log_data)
end
-- 错误告警
if tonumber(ngx.var.status) >= 500 then
_M.alert_error(log_data)
end
end
-- 写入日志到Kafka
function _M.write_log(log_data)
local kafka_producer = require "resty.kafka.producer"
local broker_list = {
{host = "127.0.0.1", port = 9092}
}
local producer = kafka_producer:new(broker_list, {
producer_type = "async",
batch_num = 200,
batch_size = 1048576,
max_buffering = 50000
})
local ok, err = producer:send("gateway-logs", nil, cjson.encode(log_data))
if not ok then
ngx.log(ngx.ERR, "Failed to send log to Kafka: ", err)
-- 降级写入本地文件
_M.write_local_log(log_data)
end
end
-- 本地日志备份
function _M.write_local_log(log_data)
local file = io.open("/var/log/nginx/gateway_access.log", "a+")
if file then
file:write(cjson.encode(log_data) .. "
")
file:close()
end
end
-- 慢请求告警
function _M.alert_slow_request(log_data)
local alert = {
type = "SLOW_REQUEST",
level = "WARNING",
service = "api-gateway",
message = string.format(
"Slow request detected: %s %s took %.2fs",
log_data.method,
log_data.uri,
log_data.request_time
),
details = log_data,
timestamp = ngx.now()
}
_M.send_alert(alert)
end
-- 发送告警
function _M.send_alert(alert)
local httpc = require("resty.http").new()
-- 发送到告警平台
ngx.timer.at(0, function()
httpc:request_uri("http://alert-system/api/alerts", {
method = "POST",
body = cjson.encode(alert),
headers = {
["Content-Type"] = "application/json"
}
})
end)
end
return _M
4.2 Metrics采集
-- gateway/metrics.lua
local _M = {}
local prometheus = require "nginx.prometheus"
-- 初始化Prometheus metrics
function _M.init()
prometheus.init("prometheus_metrics")
-- 定义metrics
_M.request_count = prometheus:counter(
"gateway_requests_total",
"Total number of requests",
{"method", "path", "status"}
)
_M.request_duration = prometheus:histogram(
"gateway_request_duration_seconds",
"Request duration in seconds",
{"method", "path"}
)
_M.upstream_duration = prometheus:histogram(
"gateway_upstream_duration_seconds",
"Upstream response time in seconds",
{"upstream", "method", "path"}
)
_M.active_connections = prometheus:gauge(
"gateway_active_connections",
"Number of active connections"
)
_M.rate_limit_hits = prometheus:counter(
"gateway_rate_limit_hits_total",
"Number of rate limit hits",
{"client", "rule"}
)
_M.circuit_breaker_state = prometheus:gauge(
"gateway_circuit_breaker_state",
"Circuit breaker state (0=closed, 1=open, 2=half-open)",
{"service"}
)
end
-- 记录请求metrics
function _M.log()
local method = ngx.var.request_method
local path = ngx.var.uri
local status = ngx.var.status
-- 请求计数
_M.request_count:inc(1, {method, path, status})
-- 请求耗时
local request_time = tonumber(ngx.var.request_time) or 0
_M.request_duration:observe(request_time, {method, path})
-- 上游耗时
local upstream_time = tonumber(ngx.var.upstream_response_time) or 0
local upstream = ngx.var.upstream_addr or "unknown"
_M.upstream_duration:observe(upstream_time, {upstream, method, path})
-- 活跃连接数
_M.active_connections:set(ngx.var.connections_active)
end
-- 暴露metrics端点
function _M.collect()
prometheus:collect()
end
return _M
4.3 健康检查
# 健康检查配置
upstream backend {
server 192.168.1.10:8080;
server 192.168.1.11:8080;
# 主动健康检查
check interval=3000 rise=2 fall=3 timeout=1000 type=http;
check_http_send "GET /health HTTP/1.0
";
check_http_expect_alive http_2xx http_3xx;
}
server {
# 网关自身健康检查端点
location /health {
access_log off;
content_by_lua_block {
local health = require "gateway.health"
health.check()
}
}
# Prometheus metrics端点
location /metrics {
access_log off;
content_by_lua_block {
local metrics = require "gateway.metrics"
metrics.collect()
}
}
# 上游健康状态页面
location /upstream_status {
access_log off;
check_status;
access_by_lua_block {
-- 简单的IP白名单
local allowed_ips = {
["127.0.0.1"] = true,
["10.0.0.0/8"] = true
}
local client_ip = ngx.var.remote_addr
if not allowed_ips[client_ip] then
ngx.exit(403)
end
}
}
}
对应的健康检查Lua模块:
-- gateway/health.lua
local _M = {}
function _M.check()
local checks = {}
local healthy = true
-- 检查Redis连接
local redis_health = _M.check_redis()
checks.redis = redis_health
healthy = healthy and redis_health.healthy
-- 检查上游服务
local upstream_health = _M.check_upstreams()
checks.upstreams = upstream_health
healthy = healthy and upstream_health.healthy
-- 检查内存使用
local memory_health = _M.check_memory()
checks.memory = memory_health
healthy = healthy and memory_health.healthy
-- 返回结果
local status = healthy and 200 or 503
ngx.status = status
ngx.header["Content-Type"] = "application/json"
ngx.say(cjson.encode({
status = healthy and "UP" or "DOWN",
timestamp = ngx.now(),
checks = checks
}))
end
function _M.check_redis()
local redis = require "resty.redis"
local red = redis:new()
red:set_timeout(1000)
local ok, err = red:connect("127.0.0.1", 6379)
if not ok then
return {
healthy = false,
message = "Redis connection failed: " .. err
}
end
-- 测试读写
local res = red:ping()
red:set_keepalive(10000, 100)
return {
healthy = res == "PONG",
message = res == "PONG" and "Redis is healthy" or "Redis ping failed"
}
end
function _M.check_upstreams()
local upstream_cache = ngx.shared.upstream_cache
local all_upstreams = upstream_cache:get("all_upstreams")
if not all_upstreams then
return {
healthy = false,
message = "No upstreams configured"
}
end
local upstreams = cjson.decode(all_upstreams)
local healthy_count = 0
local total_count = 0
for name, servers in pairs(upstreams) do
for _, server in ipairs(servers) do
total_count = total_count + 1
local stats_key = "stats:" .. server.host .. ":" .. server.port
local stats = upstream_cache:get(stats_key)
if stats then
stats = cjson.decode(stats)
if stats.error_rate < 0.1 then -- 错误率小于10%
healthy_count = healthy_count + 1
end
end
end
end
local health_ratio = healthy_count / total_count
return {
healthy = health_ratio > 0.5, -- 超过50%健康即可
message = string.format("%d/%d upstreams healthy", healthy_count, total_count),
details = {
healthy = healthy_count,
total = total_count,
ratio = health_ratio
}
}
end
function _M.check_memory()
local memory_manager = require "gateway.memory_manager"
local stats = memory_manager.monitor_memory()
local max_usage = 0
for name, stat in pairs(stats) do
local usage = (stat.capacity - stat.free_space) / stat.capacity
if usage > max_usage then
max_usage = usage
end
end
return {
healthy = max_usage < 0.9, -- 内存使用率小于90%
message = string.format("Memory usage: %.2f%%", max_usage * 100),
details = stats
}
end
return _M
五、高可用部署方案
5.1 多活架构
# docker-compose.yml version: '3.8' services: # 网关节点1 gateway-1: image: openresty/openresty:alpine container_name: gateway-1 volumes: - ./conf/nginx.conf:/usr/local/openresty/nginx/conf/nginx.conf - ./lua:/usr/local/openresty/lualib/gateway - ./logs/gateway-1:/var/log/nginx ports: - "8080:80" environment: - GATEWAY_NODE_ID=node-1 - REDIS_HOST=redis - CONSUL_HOST=consul depends_on: - redis - consul networks: - gateway-network deploy: resources: limits: cpus: '2' memory: 2G reservations: cpus: '1' memory: 1G # 网关节点2 gateway-2: image: openresty/openresty:alpine container_name: gateway-2 volumes: - ./conf/nginx.conf:/usr/local/openresty/nginx/conf/nginx.conf - ./lua:/usr/local/openresty/lualib/gateway - ./logs/gateway-2:/var/log/nginx ports: - "8081:80" environment: - GATEWAY_NODE_ID=node-2 - REDIS_HOST=redis - CONSUL_HOST=consul depends_on: - redis - consul networks: - gateway-network deploy: resources: limits: cpus: '2' memory: 2G # Keepalived + HAProxy实现高可用 haproxy: image: haproxy:2.4-alpine container_name: haproxy volumes: - ./conf/haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg ports: - "80:80" - "443:443" - "8404:8404" # Stats页面 depends_on: - gateway-1 - gateway-2 networks: - gateway-network # Redis集群 redis: image: redis:6-alpine container_name: redis command: redis-server --appendonly yes volumes: - redis-data:/data ports: - "6379:6379" networks: - gateway-network # Consul服务发现 consul: image: consul:1.10 container_name: consul command: agent -server -bootstrap-expect=1 -ui -client=0.0.0.0 ports: - "8500:8500" - "8600:8600/udp" networks: - gateway-network # Prometheus监控 prometheus: image: prom/prometheus:latest container_name: prometheus volumes: - ./conf/prometheus.yml:/etc/prometheus/prometheus.yml - prometheus-data:/prometheus ports: - "9090:9090" command: - '--config.file=/etc/prometheus/prometheus.yml' - '--storage.tsdb.path=/prometheus' networks: - gateway-network # Grafana可视化 grafana: image: grafana/grafana:latest container_name: grafana volumes: - grafana-data:/var/lib/grafana - ./conf/grafana/dashboards:/etc/grafana/provisioning/dashboards - ./conf/grafana/datasources:/etc/grafana/provisioning/datasources ports: - "3000:3000" environment: - GF_SECURITY_ADMIN_PASSWORD=admin networks: - gateway-network networks: gateway-network: driver: bridge volumes: redis-data: prometheus-data: grafana-data:
5.2 灰度发布
-- gateway/canary.lua
local _M = {}
-- 灰度发布策略
function _M.route_canary()
local uri = ngx.var.uri
local headers = ngx.req.get_headers()
-- 策略1:基于Header的灰度
if headers["X-Canary"] == "true" then
return _M.get_canary_upstream()
end
-- 策略2:基于Cookie的灰度
local cookie_canary = ngx.var.cookie_canary
if cookie_canary == "true" then
return _M.get_canary_upstream()
end
-- 策略3:基于用户ID的灰度
local user = ngx.ctx.user
if user and _M.is_canary_user(user.user_id) then
return _M.get_canary_upstream()
end
-- 策略4:基于流量百分比的灰度
local canary_percentage = _M.get_canary_percentage(uri)
if canary_percentage > 0 then
local random = math.random(100)
if random <= canary_percentage then
return _M.get_canary_upstream()
end
end
-- 默认返回稳定版本
return _M.get_stable_upstream()
end
-- 判断是否为灰度用户
function _M.is_canary_user(user_id)
local canary_users = ngx.shared.canary_cache:get("canary_users")
if not canary_users then
return false
end
canary_users = cjson.decode(canary_users)
-- 支持用户列表
for _, id in ipairs(canary_users) do
if id == user_id then
return true
end
end
-- 支持用户ID范围
local user_id_num = tonumber(user_id)
if user_id_num and user_id_num % 100 < 10 then -- 10%的用户
return true
end
return false
end
-- 获取灰度百分比
function _M.get_canary_percentage(uri)
local canary_rules = ngx.shared.canary_cache:get("canary_rules")
if not canary_rules then
return 0
end
canary_rules = cjson.decode(canary_rules)
for _, rule in ipairs(canary_rules) do
if ngx.re.match(uri, rule.pattern) then
return rule.percentage
end
end
return 0
end
-- 获取灰度上游
function _M.get_canary_upstream()
ngx.header["X-Canary-Version"] = "canary"
return "canary_backend"
end
-- 获取稳定版上游
function _M.get_stable_upstream()
ngx.header["X-Canary-Version"] = "stable"
return "stable_backend"
end
-- 灰度发布控制API
function _M.control_api()
local method = ngx.var.request_method
local uri = ngx.var.uri
if uri == "/api/canary/percentage" then
if method == "GET" then
_M.get_percentage_api()
elseif method == "POST" then
_M.set_percentage_api()
end
elseif uri == "/api/canary/users" then
if method == "GET" then
_M.get_users_api()
elseif method == "POST" then
_M.set_users_api()
end
end
end
return _M
六、实战案例与性能数据
6.1 性能测试结果
在我们的生产环境中,经过优化后的网关性能数据:
| 指标 | 数值 | 测试条件 |
|---|---|---|
| QPS | 100,000+ | 8核16G单节点 |
| P99延迟 | < 10ms | 不包含业务处理时间 |
| P95延迟 | < 5ms | 不包含业务处理时间 |
| CPU使用率 | 40-60% | 高峰期 |
| 内存使用 | 2-4GB | 含缓存 |
| 连接数 | 50,000+ | 并发连接 |
6.2 故障处理案例
Case 1: 下游服务雪崩
• 问题:某个核心服务故障,导致大量请求堆积
• 解决:熔断器自动开启,返回降级响应,避免故障扩散
• 效果:整体可用性保持99.9%
Case 2: DDoS攻击
• 问题:遭受每秒百万级请求攻击
• 解决:多层限流+IP黑名单自动封禁
• 效果:业务完全无感知
七、踩坑总结与最佳实践
7.1 踩过的坑
1. Nginx reload导致连接断开
• 问题:配置更新需要reload,导致连接中断
• 解决:使用动态配置,避免reload
2. 内存泄漏问题
• 问题:Lua脚本内存泄漏
• 解决:正确使用连接池,及时清理变量
3. DNS解析缓存
• 问题:上游服务IP变更后无法及时感知
• 解决:配置resolver和合理的DNS缓存时间
7.2 最佳实践建议
1. 渐进式改造:不要一次性改造所有功能,分阶段实施
2. 充分测试:压测、故障演练必不可少
3. 监控先行:完善的监控是稳定性的基础
4. 文档完善:维护详细的运维文档和故障处理手册
5. 定期演练:定期进行故障演练,验证高可用方案
总结
基于Nginx构建微服务网关是一个系统工程,需要在架构设计、功能实现、性能优化、高可用等多个方面深入思考和实践。本文分享的方案和代码都经过生产环境验证,希望能给大家一些参考。
网关作为微服务架构的咽喉要道,其重要性不言而喻。一个设计良好的网关不仅能提供统一的入口,还能大幅简化微服务的复杂度,提升整体系统的可维护性。
全部0条评论
快来发表一下你的评论吧 !