背景
分表组件改造的背景,我在这篇文章《gorm.io/sharding改造:赋能单表,灵活支持多分表策略(上)》中已经做了详细的介绍——这个组件不支持单表多个分表策略,为了突破这个限制做的改造。
在上一篇文章中,我们讨论了注册的改造,注册的改造修改逻辑比较简单,但是,上一篇文章中遗留了一个很重要的议题——在增删改查的实际业务操作中,分表组件究竟如何精准地定位到对应的分表策略,以确保业务逻辑的顺利执行?这篇文章,我们重点讨论这个逻辑。
源码解读
首先,我们需要看一下当我们执行查询,新增,更新或是删除逻辑,其执行流程是什么。比如,这么一个查询。
err := db.Model(&Order{}).Where("user_id = ?", userID).Find(&orders).Error
我们大概梳理一下其执行流程。
- 初始化查询:
- 当我们执行查询
err := db.Model(&Order{}).Where("user_id = ?", userID).Find(&orders).Error
,首先会通过db.Model(&Order{})
初始化一个查询实例,设置相关的模型信息。
- 当我们执行查询
- 构建查询条件:
- 接着,通过
.Where("user_id = ?", userID)
方法,将查询条件user_id = ?
以及对应的参数userID
添加到查询实例中。
- 接着,通过
- 执行查询:
- 调用
.Find(&orders)
方法时,开始执行查询流程。 - 在
Find
方法中,首先通过db.getInstance()
获取数据库实例。 - 然后,检查是否存在查询条件,如果有,则构建 SQL 条件表达式,并将其添加到查询语句中。
- 设置查询结果的目标对象
dest
,即&orders
。
func (db *DB) Find(dest interface{}, conds ...interface{}) (tx *DB) {tx = db.getInstance()if len(conds) > 0 {if exprs := tx.Statement.BuildCondition(conds[0], conds[1:]...); len(exprs) > 0 {tx.Statement.AddClause(clause.Where{Exprs: exprs})}}tx.Statement.Dest = destreturn tx.callbacks.Query().Execute(tx) }
- 调用
- 执行回调和处理:
- 调用
tx.callbacks.Query().Execute(tx)
执行查询回调链。 - 在
Execute
方法中,会遍历并执行所有注册的查询前和查询后的回调函数。
func (p *processor) Execute(db *DB) *DB {//省略其他代码逻辑 ...... for _, f := range p.fns {f(db)}//省略其他代码逻辑 ...... return db }
- 调用
- 分片和查询执行:
- 最终,调用
pool.QueryContext
方法,根据上下文、SQL 查询语句和参数执行实际的数据库查询。 - 在
QueryContext
方法中,会调用pool.sharding.resolve
方法解析并修改查询语句,以处理数据库分片逻辑。 resolve
方法解析 SQL 查询语句,提取表名,并根据表名获取相应的分片配置。- 根据分片配置,可能会修改原始查询语句,以适应分片策略。
func (pool ConnPool) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) {var (curTime = time.Now())//该方法根据传入的SQL查询(及其参数)和上下文信息,动态地解析、修改并返回最终的分片 //查询、原始查询、目标表名以及可能出现的错误。_, stQuery, _, err := pool.sharding.resolve(query, args...)if err != nil {return nil, err}// 省略......return rows, err }
func (s *Sharding) resolve(query string, args ...any) (ftQuery, stQuery, tableName string, err error) {ftQuery = querystQuery = queryif len(s.configs) == 0 {return}expr, err := sqlparser.NewParser(strings.NewReader(query)).ParseStatement()if err != nil {return ftQuery, stQuery, tableName, nil}// 省略......tableName = table.Name.Namer, ok := s.configs[tableName]if !ok {return} // 省略......return }
- 最终,调用
- 返回结果:
- 执行查询后,将结果填充到目标对象
&orders
中,并返回查询结果或错误。
- 执行查询后,将结果填充到目标对象
我们重点关注resolve方法,这个方法包含了分表逻辑的处理逻辑:r, ok := s.configs[tableName]获取对应表的分表策略。
通过上述代码的解析,我们现在应该有了解决方案。原来的逻辑获取分表策略是根据表明获取的。那我们只要修改这个逻辑,根据表名+分表键名作为唯一键获取对应的分表策略就能实现我们的目标。
方案
接下来,我们需要思考的是,如何把分表键传进来呢?
我一开始想的是通过解析query获取查询条件中的分表键。但是,当我深入的看了这个逻辑之后,发现这个设想不能实现,因为value, id, keyFind, err = s.nonInsertValue(r.ShardingKey, condition, args...)这个方法中获取查询条件的字段是在这个函数内部实现的,不能保持一个统一的结构,而且改造复杂度比较高。
context在go语言有着广泛的使用场景,所以,我想着通过context的方式把分表键传递进来。有了这个想法,改造起来就很简单了。我们只需要resolve方法增加一个context的传参,并且r, ok := s.configs[tableName]这个获取分表策略,改成用表名+从context中获取的分表键作为键来获取分表策略即可。
如此,我们就实现了根据表名+分表键获取对应分表策略的逻辑,至此,我们的改造任务完成。
案例
我目前也只是简单的测试了两种分表策略的场景,仅仅只覆盖了查询和插入的场景。更复杂的场景还没有测试。诸如并发情况下的场景。
package testimport ("context""fmt""testing""time""gorm.io/driver/mysql""gorm.io/gorm""gorm.io/sharding"
)
var globalDB *gorm.DBtype Order struct {ID int64 `gorm:"primaryKey"`OrderId string `gorm:"sharding:order_id"` // 指明 OrderId 是分片键UserID int64 `gorm:"sharding:user_id"`ProductID int64OrderDate time.TimeOrderYear int
}
// 自定义 ShardingAlgorithm
func customShardingAlgorithm4(value any) (suffix string, err error) {if year, ok := value.(int); ok {return fmt.Sprintf("_%d", year), nil}return "", fmt.Errorf("invalid order_date")
}func customShardingAlgorithmUserId(value any) (suffix string, err error) {if userId, ok := value.(int64); ok {return fmt.Sprintf("_%d", userId%4), nil}return "", fmt.Errorf("invalid user_id")
}// customePrimaryKeyGeneratorFn 自定义主键生成函数
func customePrimaryKeyGeneratorFn(tableIdx int64) int64 {var id int64seqTableName := "gorm_sharding_orders_id_seq" // 序列表名db := globalDB// 使用事务来确保主键生成的原子性tx := db.Begin()defer func() {if r := recover(); r != nil {tx.Rollback()}}()// 锁定序列表以确保并发安全(可选,取决于你的 MySQL 配置和并发级别)// 注意:在某些 MySQL 版本和配置中,使用 LOCK TABLES 可能不是最佳选择// 这里仅作为示例,实际应用中可能需要更精细的并发控制策略tx.Exec("LOCK TABLES " + seqTableName + " WRITE")// 查询当前的最大 IDtx.Raw("SELECT id FROM " + seqTableName + " ORDER BY id DESC LIMIT 1").Scan(&id)// 更新序列表(这里直接递增 1,实际应用中可能需要更复杂的逻辑)newID := id + 1tx.Exec("INSERT INTO "+seqTableName+" (id) VALUES (?)", newID) // 这里假设序列表允许插入任意 ID,实际应用中可能需要其他机制来确保 ID 的唯一性和连续性// 释放锁定tx.Exec("UNLOCK TABLES")// 提交事务if err := tx.Commit().Error; err != nil {panic(err) // 实际应用中应该使用更优雅的错误处理机制}return newID
}// Test_Gorm_Sharding 用于测试 Gorm Sharding 插件
func Test_Gorm_Sharding6(t *testing.T) {// 连接到 MySQL 数据库dsn := "dev:xxxx@tcp(ip:port)/sharding_db2?charset=utf8mb4&parseTime=True&loc=Local"db, err := gorm.Open(mysql.New(mysql.Config{DSN: dsn,}), &gorm.Config{})if err != nil {panic("failed to connect database")}globalDB = dbconfig1 := sharding.Config{ShardingKey: "order_year",ShardingAlgorithm: customShardingAlgorithm4, // 使用自定义的分片算法//PrimaryKeyGenerator: sharding.PKMySQLSequence,PrimaryKeyGenerator: sharding.PKCustom,PrimaryKeyGeneratorFn: customePrimaryKeyGeneratorFn,}config2 := sharding.Config{ShardingKey: "user_id",NumberOfShards: 4,ShardingAlgorithm: customShardingAlgorithmUserId, // 使用自定义的分片算法PrimaryKeyGenerator: sharding.PKSnowflake, // 使用 Snowflake 算法生成主键}mapConfig := make(map[string]sharding.Config)mapConfig["orders_order_year"] = config1mapConfig["orders_user_id"] = config2// 配置 Gorm Sharding 中间件,使用自定义的分片算法middleware := sharding.RegisterWithKeys(mapConfig) // 逻辑表名为 "orders"db.Use(middleware)// 查询示例var orders []Orderctx, cancel := context.WithCancel(context.Background())defer cancel()ctx = context.WithValue(ctx, "sharding_key", "order_year")db = db.WithContext(ctx)err = db.Model(&Order{}).Where("order_year=? and product_id=?", 2025, 102).Find(&orders).Errorif err != nil {fmt.Println("Error querying orders:", err)}fmt.Printf("sharding key order_year Selected orders: %#v\n", orders)// 查询示例FindByUserID2(db, int64(1))// 示例:插入订单数据InsertOrderByUserId(db)InsertOrderByOrderYear(db)
}func FindByUserID2(db *gorm.DB, userID int64) ([]Order, error) {var orders []Order// 查询示例ctx, cancel := context.WithCancel(context.Background())defer cancel()ctx = context.WithValue(ctx, "sharding_key", "user_id")db = db.WithContext(ctx)err := db.Model(&Order{}).Where("user_id = ?", userID).Find(&orders).Errorif err != nil {fmt.Println("Error querying orders:", err)}fmt.Printf("no sharding key user_id Selected orders: %#v\n", orders)return orders, err
}type OrderByUserId struct {ID int64 `gorm:"primaryKey"`OrderId string `gorm:"sharding:order_id"` // 指明 OrderId 是分片键UserID int64 `gorm:"sharding:user_id"`ProductID int64OrderDate time.Time
}func InsertOrderByUserId(db *gorm.DB) error {ctx, cancel := context.WithCancel(context.Background())defer cancel()ctx = context.WithValue(ctx, "sharding_key", "user_id")db = db.WithContext(ctx)// 示例:插入订单数据order := OrderByUserId{OrderId: "20240101ORDER0001",UserID: 100,ProductID: 100,OrderDate: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC),}err := db.Table("orders").Create(&order).Errorif err != nil {fmt.Println("Error creating order:", err)}order2 := OrderByUserId{OrderId: "20250101ORDER0001",UserID: 105,ProductID: 100,OrderDate: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC),}err = db.Table("orders").Create(&order2).Errorif err != nil {fmt.Println("Error creating order:", err)}return err
}func InsertOrderByOrderYear(db *gorm.DB) error {ctx, cancel := context.WithCancel(context.Background())defer cancel()ctx = context.WithValue(ctx, "sharding_key", "order_year")db = db.WithContext(ctx)orderYear := 2024// 示例:插入订单数据order := Order{OrderId: "20240101ORDER0002",UserID: 1,ProductID: 100,OrderDate: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC),OrderYear: orderYear,}err := db.Create(&order).Errorif err != nil {fmt.Println("Error creating order:", err)}orderYear = 2025order2 := Order{OrderId: "20250101ORDER0002",UserID: 1,ProductID: 100,OrderDate: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC),OrderYear: orderYear,}err = db.Create(&order2).Errorif err != nil {fmt.Println("Error creating order:", err)}return err
}
总结
通过改造gorm.io/sharding
组件,我们实现了根据表名+分表键获取对应分表策略的逻辑。这一改造使得组件能够支持单表多个分表策略,更加灵活和强大。目前,我们已经简单测试了查询和插入场景,更复杂的场景和并发情况还需进一步测试和优化。通过这一改造,我们为业务逻辑的执行提供了更加精准和高效的分表策略定位。