动态创建月维度表及sharding-jdbc单库分表实现

单库分表

一张表数据达到一定级别后,查询、插入、修改、DDL等效率降低,需要优化。这时可以使用单库分表,进行数据的拆分。

主要解决单表数据量过大的问题。

未解决:CPU、内存、带宽等单库的资源限制。

sharding-jdbc固定actual-data-nodes问题,需要动态生成表,采用定时任务实现

创建配置文件:application-shardingjdbc.yml

1
2
3
# 分表的表名用于定时创建表
sharding-jdbc:
tableNames: order_card_consume

application.yml引用

1
2
3
spring:
profiles:
include: shardingjdbc

创建表mapper:CreateTableMapper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import org.apache.ibatis.annotations.Param;

import java.util.List;

/**
* 创建表映射
*/
public interface CreateTableMapper {

/**
* 获取表是否存在
* @param tableName
* @return tableName
*/
String getTableName(@Param("tableName") String tableName);

/**
* 根据逻辑表名获取所有实际表名
* @param logicTableName
* @return 所有实际表名
*/
List<String> getTableNameForLogicTableName(@Param("logicTableName") String logicTableName);

/**
* 根据旧表创建新表,表结构一样
* @param newTableName 新表名
* @param oldTableName 旧表名
* @return
*/
int createTable(@Param("newTableName") String newTableName,
@Param("oldTableName") String oldTableName);

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.trans.backbusi.mapper.CreateTableMapper">

<select id="getTableName" resultType="java.lang.String">
SELECT `table_name` FROM information_schema.TABLES WHERE `table_name` = #{tableName};
</select>

<select id="getTableNameForLogicTableName" resultType="java.lang.String">
SELECT
`table_name`
FROM
information_schema.TABLES
WHERE
`table_name` REGEXP CONCAT(#{logicTableName},'_[0-9]{4}_[0-9]{1,2}')
</select>

<update id="createTable">
CREATE TABLE ${newTableName} like ${oldTableName}
</update>

</mapper>

定时任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import com.trans.backbusi.mapper.CreateTableMapper;
import com.trans.common.utils.StringUtils;
import com.trans.common.utils.uuid.UUID;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.api.hint.HintManager;
import org.apache.shardingsphere.core.rule.TableRule;
import org.apache.shardingsphere.shardingjdbc.jdbc.core.datasource.ShardingDataSource;
import org.apache.shardingsphere.underlying.common.rule.DataNode;
import org.joda.time.DateTime;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.sql.DataSource;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.List;

/**
* 定时任务
* shardingJdbc定时创建表
*/
@Service("shardingJDBCCreateTableTask")
@Slf4j
public class ShardingJDBCCreateTableTask {

@Value("${sharding-jdbc.tableNames}")
private String[] tableNames;

@Autowired
private CreateTableMapper createTableMapper;

@Autowired
private DataSource dataSource;

public void createTable() {
if (StringUtils.isEmpty(MDC.get("traceId"))) {
MDC.put("traceId", UUID.getUUIDString());
}
try {
DateTime dateTime = DateTime.now();
int year = dateTime.getYear();
int month = dateTime.getMonthOfYear();
if (month == 12) {
year = year + 1;
month = 1;
} else {
month = month + 1;
}
for (String tableName : tableNames) {
StringBuilder newTableName = new StringBuilder();
newTableName.append(tableName);
newTableName.append("_");
newTableName.append(year);
newTableName.append("_");
newTableName.append(month);
String exsitTableName = createTableMapper.getTableName(newTableName.toString());
if (StringUtils.isEmpty(exsitTableName)) {
createTableMapper.createTable(newTableName.toString(), tableName);
actualTablesRefresh();
}
}
} finally {
MDC.remove("traceId");
}

}

/**
* 初始化sharding-jdbc的真实表节点
*/
@PostConstruct
public void initActualTablesNode() {
actualTablesRefresh();
}


/**
* 刷新sharding-jdbc的真实表节点
*/
public void actualTablesRefresh() {
try {
ShardingDataSource dataSource = (ShardingDataSource) this.dataSource;
if (tableNames == null || tableNames.length == 0) {
log.info("分表为空");
return;
}
for (String tableName : tableNames) {
TableRule tableRule = dataSource.getRuntimeContext().getRule().getTableRule(tableName);
List<DataNode> dataNodes = tableRule.getActualDataNodes();
Field actualDataNodesField = TableRule.class.getDeclaredField("actualDataNodes");
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(actualDataNodesField, actualDataNodesField.getModifiers() & ~Modifier.FINAL);
actualDataNodesField.setAccessible(true);
List<DataNode> newDataNodes = new ArrayList<>();
// 这里是单库,如果多库还要修改
String dataSourceName = dataNodes.get(0).getDataSourceName();
// 强制走主库
HintManager.getInstance().setMasterRouteOnly();
List<String> tableNames = createTableMapper.getTableNameForLogicTableName(tableName);
for (String realTableName : tableNames) {
DataNode dataNode = new DataNode(dataSourceName + "." + realTableName);
newDataNodes.add(dataNode);
}
actualDataNodesField.set(tableRule, newDataNodes);

// 可用
Method cacheActualDatasourcesAndTablesMethod = TableRule.class.getDeclaredMethod("cacheActualDatasourcesAndTables", null);
cacheActualDatasourcesAndTablesMethod.setAccessible(true);
cacheActualDatasourcesAndTablesMethod.invoke(tableRule, null);
}
} catch (Exception e) {
log.error("动态分表表名添加sharding-jdbc错误", e);
}

}

}

定时任务每月最后一天执行创建下一月的表

定时任务corn表达式:0 0 0 L * ?

添加sharding-jdbc

trans-core的pom.xml中druid修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    <!--<dependency>-->
<!--<groupId>com.alibaba</groupId>-->
<!--<artifactId>druid-spring-boot-starter</artifactId>-->
<!--<version>${druid.version}</version>-->
<!--</dependency>-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>${druid.version}</version>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>sharding-jdbc-spring-boot-starter</artifactId>
<version>4.1.1</version>
</dependency>

trans-framework中pom.xml修改:

1
2
3
4
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
</dependency>

trans-system中pom.xml修改

1
2
3
4
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>sharding-jdbc-spring-boot-starter</artifactId>
</dependency>

注释掉DruidConfig.java、DataSourceAspect.java、DruidProperties.java

application-shardingjdbc.yml修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# 分表的表名用于定时创建表
sharding-jdbc:
tableNames: order_card_consume

# sharding-jdbc配置
spring:
shardingsphere:
props:
sql:
show: true
datasource:
names: master01,slave01
master01:
type: com.alibaba.druid.pool.DruidDataSource
driverClassName: com.mysql.cj.jdbc.Driver
url:
username:
password:
# 初始连接数
initialSize: 5
# 最小连接池数量
minIdle: 10
# 最大连接池数量
maxActive: 20
# 配置获取连接等待超时的时间
maxWait: 60000
# 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
timeBetweenEvictionRunsMillis: 60000
# 配置一个连接在池中最小生存的时间,单位是毫秒
minEvictableIdleTimeMillis: 300000
# 配置一个连接在池中最大生存的时间,单位是毫秒
maxEvictableIdleTimeMillis: 900000
# 配置检测连接是否有效
validationQuery: SELECT 1 FROM DUAL
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
poolPreparedStatements: true
maxPoolPreparedStatementPerConnectionSize: 20
filters: stat,wall,log4j2
connectionProperties: druid.stat.mergeSql\=true;druid.stat.slowSqlMillis\=5000
slave01:
type: com.alibaba.druid.pool.DruidDataSource
driverClassName: com.mysql.cj.jdbc.Driver
url:
username: root
password:
# 初始连接数
initialSize: 5
# 最小连接池数量
minIdle: 10
# 最大连接池数量
maxActive: 20
# 配置获取连接等待超时的时间
maxWait: 60000
# 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
timeBetweenEvictionRunsMillis: 60000
# 配置一个连接在池中最小生存的时间,单位是毫秒
minEvictableIdleTimeMillis: 300000
# 配置一个连接在池中最大生存的时间,单位是毫秒
maxEvictableIdleTimeMillis: 900000
# 配置检测连接是否有效
validationQuery: SELECT 1 FROM DUAL
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
poolPreparedStatements: true
maxPoolPreparedStatementPerConnectionSize: 20
filters: stat,wall,log4j2
connectionProperties: druid.stat.mergeSql\=true;druid.stat.slowSqlMillis\=5000
# 默认数据源
sharding:
default-data-source-name: master
# 读写分离配置
master-slave-rules:
ms0:
load-balance-algorithm-type: round_robin
master-data-source-name: master01
slave-data-source-names: slave01
# 分表配置
tables:
order_card_consume:
actual-data-nodes: ms0.order_card_consume
table-strategy:
standard:
sharding-column: trade_time
precise-algorithm-class-name: com.trans.framework.shardingjdbc.MonthTableShardingAlgorithm
range-algorithm-class-name: com.trans.framework.shardingjdbc.MonthTableShardingAlgorithm

添加分表算法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import com.google.common.collect.Range;
import org.apache.shardingsphere.api.sharding.standard.PreciseShardingAlgorithm;
import org.apache.shardingsphere.api.sharding.standard.PreciseShardingValue;
import org.apache.shardingsphere.api.sharding.standard.RangeShardingAlgorithm;
import org.apache.shardingsphere.api.sharding.standard.RangeShardingValue;
import org.joda.time.DateTime;

import java.util.*;

/**
* 按时间的月份分表算法
*/
public class MonthTableShardingAlgorithm implements PreciseShardingAlgorithm<Date>, RangeShardingAlgorithm<Date> {

/**
* = in 的获取表名
*/
@Override
public String doSharding(Collection<String> collection, PreciseShardingValue<Date> preciseShardingValue) {
Date date = preciseShardingValue.getValue();
DateTime dateTime = new DateTime(date);
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(preciseShardingValue.getLogicTableName());
stringBuilder.append("_");
stringBuilder.append(dateTime.getYear());
stringBuilder.append("_");
stringBuilder.append(dateTime.getMonthOfYear());
return stringBuilder.toString();
}

/**
* BETWEEN AND的获取表名
*/
@Override
public Collection<String> doSharding(Collection<String> availableTargetNames, RangeShardingValue<Date> rangeShardingValue) {
List<String> tableNameList = new ArrayList<>();
Range<Date> valueRange = rangeShardingValue.getValueRange();
Date lowerDate = valueRange.lowerEndpoint();
Date upperDate = valueRange.upperEndpoint();
List<DateTime> dateTimes = rangeMonthToList(lowerDate, upperDate);
for (DateTime dateTime : dateTimes) {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(rangeShardingValue.getLogicTableName());
stringBuilder.append("_");
stringBuilder.append(dateTime.getYear());
stringBuilder.append("_");
stringBuilder.append(dateTime.getMonthOfYear());
tableNameList.add(stringBuilder.toString());
}
Iterator<String> iterator = tableNameList.iterator();
while (iterator.hasNext()) {
String tableName = iterator.next();
if (!availableTargetNames.contains(tableName)) {
iterator.remove();
}
}
return tableNameList;
}

/**
* 给定月范围返回月份DateTime集合
* @param lowerDate 最小时间
* @param upperDate 最大时间
* @return
*/
public static List<DateTime> rangeMonthToList(Date lowerDate, Date upperDate) {
DateTime lower = new DateTime(lowerDate);
DateTime upper = new DateTime(upperDate);
List<DateTime> result = new ArrayList<>();
if (lower.getYear() > upper.getYear()) {
return result;
} else if (lower.getYear() == upper.getYear()) {
for (int month = lower.getMonthOfYear(); month <= upper.getMonthOfYear(); month ++) {
DateTime dateTime = new DateTime(lower.getYear(), month, 1, 1, 1);
result.add(dateTime);
}
return result;
} else if (lower.getYear() < upper.getYear()) {
int lowerYear = lower.getYear();
int upperYear = upper.getYear();
int lowerMonth = lower.getMonthOfYear();
int upperMonth = upper.getMonthOfYear();
// 最小时间的年到年底的月份
for (int month = lowerMonth; month <= 12; month++) {
DateTime dateTime = new DateTime(lowerYear, month, 1, 1, 1);
result.add(dateTime);
}
// 年差中的月份
for (int year = lowerYear + 1; year < upperYear; year++) {
for (int month = 1; month <= 12; month++) {
DateTime dateTime = new DateTime(year, month, 1, 1, 1);
result.add(dateTime);
}
}
// 最后一年的月份
for (int month = 1; month <= upperMonth; month++) {
DateTime dateTime = new DateTime(upperYear, month, 1, 1, 1);
result.add(dateTime);
}
return result;
}
return result;
}

}

测试

测试service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 测试类
*/
@Service
@Slf4j
public class APITestServiceImpl {

@Autowired
private MerRegisterRecordMapper merRegisterRecordMapper;

@Transactional
public MerRegisterRecord test1(Long id) {
return test4(id);
}

public MerRegisterRecord test4(Long id) {
MerRegisterRecord merRegisterRecord = merRegisterRecordMapper.selectMerRegisterRecordById(id);
merRegisterRecord.setUpdateTime(new Date());
merRegisterRecordMapper.updateMerRegisterRecord(merRegisterRecord);
merRegisterRecord = merRegisterRecordMapper.selectMerRegisterRecordById(id);
return merRegisterRecord;
}

}

测试controller

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Api(tags = "test")
@RestController
@RequestMapping("/api/test")
@Slf4j
public class APITestController {

@Autowired
private MerRegisterRecordMapper merRegisterRecordMapper;

@Autowired
private OrderCardConsumeMapper orderCardConsumeMapper;

@Autowired
private APITestServiceImpl apiTestService;

@PostMapping("/test1")
@ApiOperation("测试主库写后读走主库")
public AjaxResult test1() throws Exception {
return AjaxResult.success(apiTestService.test1(931548600963096576L));
}

@PostMapping("/test2")
@ApiOperation("测试从库")
public AjaxResult test2() throws Exception {
MerRegisterRecord merRegisterRecord = merRegisterRecordMapper.selectMerRegisterRecordById(931548600963096576L);
return AjaxResult.success(merRegisterRecord);
}

@PostMapping("/test3")
@ApiOperation("测试分表")
public AjaxResult test3() throws Exception {
OrderCardConsume orderCardConsume = new OrderCardConsume();
orderCardConsume.setTradeTime(new DateTime(2022, 3, 1, 1, 1).toDate());
PageHelper.startPage(1, 1);
List<OrderCardConsume> list = orderCardConsumeMapper.selectOrderCardConsumeList(orderCardConsume);
return AjaxResult.success(list);
}

@PostMapping("/test4")
@ApiOperation("测试主库只有修改走主库")
public AjaxResult test4() throws Exception {
return AjaxResult.success(apiTestService.test4(931548600963096576L));
}

}

调用


动态创建月维度表及sharding-jdbc单库分表实现
http://hanqichuan.com/2022/02/09/分库分表/动态创建月维度表及sharding-jdbc单库分表实现/
作者
韩启川
发布于
2022年2月9日
许可协议