05-Go微服务实践
微服务架构将大型应用拆分为独立小服务,Go语言的轻量和并发特性使其成为微服务首选。
微服务架构
服务划分原则
按业务能力划分:
├── user-service # 用户管理
├── order-service # 订单处理
├── payment-service # 支付
├── inventory-service # 库存
└── notification-service # 通知
按技术边界:
├── auth-gateway # 认证网关
├── api-gateway # API网关
└── backend-services # 后端服务
服务间通信
同步通信:
HTTP/RESTful(简单,广泛支持)
gRPC(高性能,类型安全)
异步通信:
消息队列(RabbitMQ、Kafka)
事件驱动
API网关
使用Gin构建
package main
import (
"github.com/gin-gonic/gin"
"net/http"
"net/http/httputil"
"net/url"
)
type Service struct {
Name string
URL string
}
var services = map[string]*Service{
"user": {Name: "user", URL: "http://localhost:8081"},
"order": {Name: "order", URL: "http://localhost:8082"},
"product": {Name: "product", URL: "http://localhost:8083"},
}
func proxy(targetURL string) gin.HandlerFunc {
target, _ := url.Parse(targetURL)
proxy := httputil.NewSingleHostReverseProxy(target)
return func(c *gin.Context) {
proxy.ServeHTTP(c.Writer, c.Request)
}
}
func main() {
r := gin.Default()
// 路由到不同服务
r.Any("/api/users/*path", proxy(services["user"].URL))
r.Any("/api/orders/*path", proxy(services["order"].URL))
r.Any("/api/products/*path", proxy(services["product"].URL))
r.Run(":8080")
}
服务注册与发现
Consul
import (
"github.com/hashicorp/consul/api"
)
// 注册服务
func registerService(name, address string, port int) error {
config := api.DefaultConfig()
client, err := api.NewClient(config)
if err != nil {
return err
}
registration := &api.AgentServiceRegistration{
ID: name + "-" + address,
Name: name,
Address: address,
Port: port,
Check: &api.AgentServiceCheck{
HTTP: fmt.Sprintf("http://%s:%d/health", address, port),
Interval: "10s",
Timeout: "3s",
DeregisterCriticalServiceAfter: "30s",
},
}
return client.Agent().ServiceRegister(registration)
}
// 发现服务
func discoverService(name string) (string, error) {
config := api.DefaultConfig()
client, err := api.NewClient(config)
if err != nil {
return "", err
}
services, _, err := client.Health().Service(name, "", true, nil)
if err != nil || len(services) == 0 {
return "", err
}
service := services[0].Service
return fmt.Sprintf("%s:%d", service.Address, service.Port), nil
}
配置中心
Viper + Consul
import (
"github.com/spf13/viper"
_ "github.com/spf13/viper/remote"
)
func initConfig() error {
viper.AddRemoteProvider("consul", "localhost:8500", "config/myapp")
viper.SetConfigType("json")
if err := viper.ReadRemoteConfig(); err != nil {
return err
}
// 监听配置变化
go func() {
for {
time.Sleep(time.Second * 5)
err := viper.WatchRemoteConfig()
if err != nil {
log.Println("watch config error:", err)
}
}
}()
return nil
}
链路追踪
OpenTelemetry
import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)
func initTracer() {
// 初始化tracer(Jaeger/Zipkin)
// ...
}
func businessLogic(ctx context.Context) error {
tracer := otel.Tracer("myservice")
ctx, span := tracer.Start(ctx, "businessLogic")
defer span.End()
// 添加属性
span.SetAttributes(attribute.String("user.id", "123"))
// 调用其他服务(传递context)
callOtherService(ctx)
return nil
}
熔断与限流
hystrix-go(熔断)
import "github.com/afex/hystrix-go/hystrix"
func init() {
hystrix.ConfigureCommand("my_command", hystrix.CommandConfig{
Timeout: 1000, // 超时1秒
MaxConcurrentRequests: 100, // 最大并发
ErrorPercentThreshold: 25, // 错误率25%触发熔断
})
}
func callService() (interface{}, error) {
output := make(chan interface{}, 1)
errors := hystrix.Go("my_command", func() error {
// 正常逻辑
result, err := doSomething()
if err != nil {
return err
}
output <- result
return nil
}, func(err error) error {
// 降级逻辑
output <- getDefaultValue()
return nil
})
select {
case out := <-output:
return out, nil
case err := <-errors:
return nil, err
}
}
令牌桶限流
import "golang.org/x/time/rate"
type RateLimiter struct {
limiter *rate.Limiter
}
func NewRateLimiter(r rate.Limit, b int) *RateLimiter {
return &RateLimiter{
limiter: rate.NewLimiter(r, b),
}
}
func (rl *RateLimiter) Middleware() gin.HandlerFunc {
return func(c *gin.Context) {
if !rl.limiter.Allow() {
c.JSON(429, gin.H{"error": "too many requests"})
c.Abort()
return
}
c.Next()
}
}
// 使用
r := gin.Default()
r.Use(NewRateLimiter(10, 100).Middleware()) // 10 req/s
消息队列
RabbitMQ
import "github.com/streadway/amqp"
// 发布消息
func publishMessage(exchange, routingKey string, body []byte) error {
conn, _ := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
ch, _ := conn.Channel()
defer ch.Close()
return ch.Publish(
exchange,
routingKey,
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: body,
},
)
}
// 消费消息
func consumeMessages(queueName string, handler func([]byte)) error {
conn, _ := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
ch, _ := conn.Channel()
defer ch.Close()
msgs, _ := ch.Consume(queueName, "", true, false, false, false, nil)
for msg := range msgs {
handler(msg.Body)
}
return nil
}
Kafka
import "github.com/Shopify/sarama"
// 生产者
func produceKafka(topic string, message []byte) error {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
return err
}
defer producer.Close()
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(message),
}
_, _, err = producer.SendMessage(msg)
return err
}
// 消费者
func consumeKafka(topic string, handler func([]byte)) error {
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
return err
}
defer consumer.Close()
partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest)
if err != nil {
return err
}
defer partitionConsumer.Close()
for message := range partitionConsumer.Messages() {
handler(message.Value)
}
return nil
}
服务健康检查
func healthCheck() gin.HandlerFunc {
return func(c *gin.Context) {
// 检查数据库
if err := db.Ping(); err != nil {
c.JSON(503, gin.H{
"status": "unhealthy",
"database": "down",
})
return
}
// 检查Redis
if err := redis.Ping().Err(); err != nil {
c.JSON(503, gin.H{
"status": "unhealthy",
"redis": "down",
})
return
}
c.JSON(200, gin.H{
"status": "healthy",
})
}
}
r.GET("/health", healthCheck())
分布式事务
Saga模式
type SagaStep struct {
Action func() error
Compensate func() error
}
type Saga struct {
steps []SagaStep
}
func (s *Saga) Execute() error {
executed := []SagaStep{}
for _, step := range s.steps {
if err := step.Action(); err != nil {
// 回滚已执行的步骤
for i := len(executed) - 1; i >= 0; i-- {
executed[i].Compensate()
}
return err
}
executed = append(executed, step)
}
return nil
}
// 使用
saga := &Saga{
steps: []SagaStep{
{
Action: createOrder,
Compensate: cancelOrder,
},
{
Action: reserveInventory,
Compensate: releaseInventory,
},
{
Action: processPayment,
Compensate: refundPayment,
},
},
}
if err := saga.Execute(); err != nil {
log.Println("Transaction failed:", err)
}
服务监控
Prometheus + Grafana
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
var (
httpRequestsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "http_requests_total",
Help: "Total number of HTTP requests",
},
[]string{"method", "endpoint", "status"},
)
httpRequestDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "http_request_duration_seconds",
Help: "HTTP request duration",
Buckets: prometheus.DefBuckets,
},
[]string{"method", "endpoint"},
)
)
func init() {
prometheus.MustRegister(httpRequestsTotal)
prometheus.MustRegister(httpRequestDuration)
}
func prometheusMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
start := time.Now()
c.Next()
duration := time.Since(start).Seconds()
status := strconv.Itoa(c.Writer.Status())
httpRequestsTotal.WithLabelValues(c.Request.Method, c.FullPath(), status).Inc()
httpRequestDuration.WithLabelValues(c.Request.Method, c.FullPath()).Observe(duration)
}
}
// 暴露metrics
r.GET("/metrics", gin.WrapH(promhttp.Handler()))
r.Use(prometheusMiddleware())
微服务项目结构
microservice/
├── cmd/
│ └── server/
│ └── main.go # 程序入口
├── internal/
│ ├── handler/ # HTTP处理器
│ ├── service/ # 业务逻辑
│ ├── repository/ # 数据访问
│ └── model/ # 数据模型
├── pkg/ # 可导出的库
│ ├── middleware/
│ └── util/
├── api/
│ └── proto/ # gRPC定义
├── config/
│ └── config.yaml
├── deployments/
│ ├── docker-compose.yml
│ └── k8s/
├── go.mod
└── README.md
Docker Compose部署
# docker-compose.yml
version: '3.8'
services:
user-service:
build: ./user-service
ports:
- "8081:8080"
environment:
- DATABASE_URL=postgres://postgres:postgres@db:5432/users
depends_on:
- db
- redis
order-service:
build: ./order-service
ports:
- "8082:8080"
environment:
- DATABASE_URL=postgres://postgres:postgres@db:5432/orders
depends_on:
- db
- kafka
api-gateway:
build: ./api-gateway
ports:
- "8080:8080"
depends_on:
- user-service
- order-service
db:
image: postgres:14
environment:
POSTGRES_PASSWORD: postgres
redis:
image: redis:7-alpine
kafka:
image: confluentinc/cp-kafka:latest
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
consul:
image: consul:latest
ports:
- "8500:8500"
Kubernetes部署
# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: user-service
spec:
replicas: 3
selector:
matchLabels:
app: user-service
template:
metadata:
labels:
app: user-service
spec:
containers:
- name: user-service
image: user-service:latest
ports:
- containerPort: 8080
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: db-secret
key: url
---
apiVersion: v1
kind: Service
metadata:
name: user-service
spec:
selector:
app: user-service
ports:
- port: 80
targetPort: 8080
type: LoadBalancer
最佳实践
单一职责:每个服务专注一个业务能力
独立部署:服务独立构建、测试、部署
数据隔离:每个服务独立数据库
API网关:统一入口,认证、限流、路由
服务发现:动态注册和发现
配置中心:集中管理配置
链路追踪:分布式追踪请求
熔断降级:防止雪崩
异步通信:解耦服务,消息队列
监控告警:Prometheus + Grafana
核心: 微服务提升灵活性,但增加复杂度,需完善基础设施支撑。