动态创建月维度表及sharding-jdbc单库分表实现

单库分表

一张表数据达到一定级别后,查询、插入、修改、DDL等效率降低,需要优化。这时可以使用单库分表,进行数据的拆分。

主要解决单表数据量过大的问题。

未解决:CPU、内存、带宽等单库的资源限制。

sharding-jdbc固定actual-data-nodes问题,需要动态生成表,采用定时任务实现

创建配置文件:application-shardingjdbc.yml

1
2
3
# 分表的表名用于定时创建表
sharding-jdbc:
tableNames: order_card_consume

application.yml引用

1
2
3
spring:
profiles:
include: shardingjdbc

创建表mapper:CreateTableMapper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import org.apache.ibatis.annotations.Param;

import java.util.List;

/**
* 创建表映射
*/
public interface CreateTableMapper {

/**
* 根据旧表创建新表,表结构一样
* @param newTableName 新表名
* @param oldTableName 旧表名
* @return
*/
int createTable(@Param("newTableName") String newTableName,
@Param("oldTableName") String oldTableName);

}
1
2
3
4
5
6
7
8
9
10
11
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.trans.backbusi.mapper.CreateTableMapper">

<update id="createTable">
CREATE TABLE ${newTableName} like ${oldTableName}
</update>

</mapper>

表sharding_jdbc_real_table创建及mapper、及xml

1
2
3
4
5
6
CREATE TABLE `sharding_jdbc_real_table` (
`id` int NOT NULL,
`logic_table_name` varchar(100) DEFAULT NULL COMMENT '逻辑表名',
`real_table_name` varchar(100) DEFAULT NULL COMMENT '逻辑表名',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='sharding-jdbc分表真实表名记录';

定时任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import com.trans.backbusi.mapper.CreateTableMapper;
import com.trans.common.utils.StringUtils;
import com.trans.common.utils.uuid.UUID;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.api.hint.HintManager;
import org.apache.shardingsphere.core.rule.TableRule;
import org.apache.shardingsphere.shardingjdbc.jdbc.core.datasource.ShardingDataSource;
import org.apache.shardingsphere.underlying.common.rule.DataNode;
import org.joda.time.DateTime;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.sql.DataSource;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.List;

/**
* 定时任务
* shardingJdbc定时创建表
*/
@Service("shardingJDBCCreateTableTask")
@Slf4j
public class ShardingJDBCCreateTableTask {

@Value("${sharding-jdbc.tableNames}")
private String[] tableNames;

@Autowired
private CreateTableMapper createTableMapper;

@Autowired
private DataSource dataSource;

public void createTable() {
if (StringUtils.isEmpty(MDC.get("traceId"))) {
MDC.put("traceId", UUID.getUUIDString());
}
try {
DateTime dateTime = DateTime.now();
int year = dateTime.getYear();
int month = dateTime.getMonthOfYear();
if (month == 12) {
year = year + 1;
month = 1;
} else {
month = month + 1;
}
for (String tableName : tableNames) {
StringBuilder newTableName = new StringBuilder();
newTableName.append(tableName);
newTableName.append("_");
newTableName.append(year);
newTableName.append("_");
newTableName.append(month);
ShardingJdbcRealTable shardingJdbcRealTable = new ShardingJdbcRealTable();
shardingJdbcRealTable.setLogicTableName(tableName);
shardingJdbcRealTable.setRealTableName(newTableName.toString());
List<ShardingJdbcRealTable> list = shardingJdbcRealTableMapper.selectShardingJdbcRealTableList(shardingJdbcRealTable);
if (list.isEmpty()) {
createTableMapper.createTable(newTableName.toString(), tableName);
shardingJdbcRealTable = new ShardingJdbcRealTable();
shardingJdbcRealTable.setId(twitterSnowflakeIdWorker.nextId());
shardingJdbcRealTable.setLogicTableName(tableName);
shardingJdbcRealTable.setRealTableName(newTableName.toString());
shardingJdbcRealTableMapper.insertShardingJdbcRealTable(shardingJdbcRealTable);
actualTablesRefresh();
}
}
} finally {
MDC.remove("traceId");
}

}

/**
* 初始化sharding-jdbc的真实表节点
*/
@PostConstruct
public void initActualTablesNode() {
actualTablesRefresh();
}


/**
* 刷新sharding-jdbc的真实表节点
*/
public void actualTablesRefresh() {
try {
ShardingSphereDataSource dataSource = (ShardingSphereDataSource) this.dataSource;
if (tableNames == null || tableNames.length == 0) {
log.info("分表为空");
return;
}
Collection<ShardingSphereRule> rules = dataSource.getContextManager().getMetaDataContexts().getMetaData("logic_db").getRuleMetaData().getRules();
Map<String, TableRule> tableRuleMap = new HashMap<>();
for (ShardingSphereRule shardingSphereRule : rules) {
if (shardingSphereRule instanceof ShardingRule) {
tableRuleMap = ((ShardingRule) shardingSphereRule).getTableRules();
}
}
for (String tableName : tableNames) {
TableRule tableRule = tableRuleMap.get(tableName);
if (tableRule == null) {
continue;
}
List<DataNode> dataNodes = tableRule.getActualDataNodes();
Field actualDataNodesField = TableRule.class.getDeclaredField("actualDataNodes");
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(actualDataNodesField, actualDataNodesField.getModifiers() & ~Modifier.FINAL);
actualDataNodesField.setAccessible(true);
List<DataNode> newDataNodes = new ArrayList<>();
// 这里是单库,如果多库还要修改
String dataSourceName = dataNodes.get(0).getDataSourceName();
// 强制走主库
HintManager.getInstance().setWriteRouteOnly();
ShardingJdbcRealTable shardingJdbcRealTable = new ShardingJdbcRealTable();
shardingJdbcRealTable.setLogicTableName(tableName);
List<ShardingJdbcRealTable> list = shardingJdbcRealTableMapper.selectShardingJdbcRealTableList(shardingJdbcRealTable);
List<String> actualDataNodes = new ArrayList<>();
String databaseTableName = null;
for (ShardingJdbcRealTable realTable : list) {
databaseTableName = dataSourceName + "." + realTable.getRealTableName();
DataNode dataNode = new DataNode(databaseTableName);
newDataNodes.add(dataNode);
actualDataNodes.add(databaseTableName);
}
// 可用数据节点设置
Method generateDataNodesMethod = TableRule.class.getDeclaredMethod("generateDataNodes", List.class, Collection.class);
generateDataNodesMethod.setAccessible(true);
List<String> dataSourceNames = new ArrayList<>();
dataSourceNames.add(dataSourceName);
generateDataNodesMethod.invoke(tableRule, actualDataNodes, dataSourceNames);

actualDataNodesField.set(tableRule, newDataNodes);

Method cacheActualDatasourcesAndTablesMethod = TableRule.class.getDeclaredMethod("getActualTables", null);
cacheActualDatasourcesAndTablesMethod.setAccessible(true);
Object actualTables = cacheActualDatasourcesAndTablesMethod.invoke(tableRule, null);
Field actualTablesField = TableRule.class.getDeclaredField("actualTables");
Field actualTablesmodifiersField = Field.class.getDeclaredField("modifiers");
actualTablesmodifiersField.setAccessible(true);
actualTablesmodifiersField.setInt(actualTablesField, actualTablesField.getModifiers() & ~Modifier.FINAL);
actualTablesField.setAccessible(true);
actualTablesField.set(tableRule, actualTables);
}
} catch (Exception e) {
log.error("动态分表表名添加sharding-jdbc错误", e);
}

}

}

定时任务每月最后一天执行创建下一月的表

定时任务corn表达式:0 0 0 L * ?

添加sharding-jdbc

trans-core的pom.xml中druid修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<!--<dependency>-->
<!--<groupId>com.alibaba</groupId>-->
<!--<artifactId>druid-spring-boot-starter</artifactId>-->
<!--<version>${druid.version}</version>-->
<!--</dependency>-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>${druid.version}</version>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId>
<version>5.0.0</version>
</dependency>

trans-framework中pom.xml修改:

1
2
3
4
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
</dependency>

trans-system中pom.xml修改

1
2
3
4
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId>
</dependency>

注释掉DruidConfig.java、DataSourceAspect.java、DruidProperties.java

application-shardingjdbc.yml修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# 分表的表名用于定时创建表
sharding-jdbc:
tableNames: order_card_consume

# sharding-jdbc配置
spring:
shardingsphere:
props:
sql-show: false
datasource:
names: master01,slave01
master01:
type: com.alibaba.druid.pool.DruidDataSource
driverClassName: com.mysql.cj.jdbc.Driver
url:
username:
password:
# 初始连接数
initialSize: 5
# 最小连接池数量
minIdle: 10
# 最大连接池数量
maxActive: 20
# 配置获取连接等待超时的时间
maxWait: 60000
# 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
timeBetweenEvictionRunsMillis: 60000
# 配置一个连接在池中最小生存的时间,单位是毫秒
minEvictableIdleTimeMillis: 300000
# 配置一个连接在池中最大生存的时间,单位是毫秒
maxEvictableIdleTimeMillis: 900000
# 配置检测连接是否有效
validationQuery: SELECT 1 FROM DUAL
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
poolPreparedStatements: true
maxPoolPreparedStatementPerConnectionSize: 20
filters: stat,wall,log4j2
connectionProperties: druid.stat.mergeSql\=true;druid.stat.slowSqlMillis\=5000
slave01:
type: com.alibaba.druid.pool.DruidDataSource
driverClassName: com.mysql.cj.jdbc.Driver
url:
username:
password:
# 初始连接数
initialSize: 5
# 最小连接池数量
minIdle: 10
# 最大连接池数量
maxActive: 20
# 配置获取连接等待超时的时间
maxWait: 60000
# 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
timeBetweenEvictionRunsMillis: 60000
# 配置一个连接在池中最小生存的时间,单位是毫秒
minEvictableIdleTimeMillis: 300000
# 配置一个连接在池中最大生存的时间,单位是毫秒
maxEvictableIdleTimeMillis: 900000
# 配置检测连接是否有效
validationQuery: SELECT 1 FROM DUAL
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
poolPreparedStatements: true
maxPoolPreparedStatementPerConnectionSize: 20
filters: stat,wall,log4j2
connectionProperties: druid.stat.mergeSql\=true;druid.stat.slowSqlMillis\=5000
rules:
sharding:
# 分表配置
tables:
order_card_consume:
actual-data-nodes: ms0.order_card_consume
table-strategy:
standard:
sharding-column: trade_time
# 必须小写不然找不到
sharding-algorithm-name: monthtableshardingalgorithm
sharding-algorithms:
monthtableshardingalgorithm:
type: CLASS_BASED
props:
strategy: STANDARD
algorithmClassName: com.trans.framework.shardingjdbc.MonthTableShardingAlgorithm
readwrite-splitting:
data-sources:
ms0:
loadBalancerName: my-load
writeDataSourceName: master01
readDataSourceNames: slave01
load-balancers:
my-load:
type: ROUND_ROBIN

添加分表算法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import com.google.common.collect.Range;
import org.apache.shardingsphere.sharding.api.sharding.standard.PreciseShardingValue;
import org.apache.shardingsphere.sharding.api.sharding.standard.RangeShardingValue;
import org.apache.shardingsphere.sharding.api.sharding.standard.StandardShardingAlgorithm;
import org.joda.time.DateTime;

import java.util.*;

/**
* 按时间的月份分表算法
*/
public class MonthTableShardingAlgorithm implements StandardShardingAlgorithm<Date> {

/**
* = in 的获取表名
*/
@Override
public String doSharding(Collection<String> collection, PreciseShardingValue<Date> preciseShardingValue) {
Date date = preciseShardingValue.getValue();
DateTime dateTime = new DateTime(date);
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(preciseShardingValue.getLogicTableName());
stringBuilder.append("_");
stringBuilder.append(dateTime.getYear());
stringBuilder.append("_");
stringBuilder.append(dateTime.getMonthOfYear());
return stringBuilder.toString();
}

/**
* BETWEEN AND的获取表名
*/
@Override
public Collection<String> doSharding(Collection<String> availableTargetNames, RangeShardingValue<Date> rangeShardingValue) {
List<String> tableNameList = new ArrayList<>();
Range<Date> valueRange = rangeShardingValue.getValueRange();
Date lowerDate = valueRange.lowerEndpoint();
Date upperDate = valueRange.upperEndpoint();
List<DateTime> dateTimes = rangeMonthToList(lowerDate, upperDate);
for (DateTime dateTime : dateTimes) {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(rangeShardingValue.getLogicTableName());
stringBuilder.append("_");
stringBuilder.append(dateTime.getYear());
stringBuilder.append("_");
stringBuilder.append(dateTime.getMonthOfYear());
tableNameList.add(stringBuilder.toString());
}
Iterator<String> iterator = tableNameList.iterator();
while (iterator.hasNext()) {
String tableName = iterator.next();
if (!availableTargetNames.contains(tableName)) {
iterator.remove();
}
}
return tableNameList;
}

/**
* 给定月范围返回月份DateTime集合
* @param lowerDate 最小时间
* @param upperDate 最大时间
* @return
*/
public static List<DateTime> rangeMonthToList(Date lowerDate, Date upperDate) {
DateTime lower = new DateTime(lowerDate);
DateTime upper = new DateTime(upperDate);
List<DateTime> result = new ArrayList<>();
if (lower.getYear() > upper.getYear()) {
return result;
} else if (lower.getYear() == upper.getYear()) {
for (int month = lower.getMonthOfYear(); month <= upper.getMonthOfYear(); month ++) {
DateTime dateTime = new DateTime(lower.getYear(), month, 1, 1, 1);
result.add(dateTime);
}
return result;
} else if (lower.getYear() < upper.getYear()) {
int lowerYear = lower.getYear();
int upperYear = upper.getYear();
int lowerMonth = lower.getMonthOfYear();
int upperMonth = upper.getMonthOfYear();
// 最小时间的年到年底的月份
for (int month = lowerMonth; month <= 12; month++) {
DateTime dateTime = new DateTime(lowerYear, month, 1, 1, 1);
result.add(dateTime);
}
// 年差中的月份
for (int year = lowerYear + 1; year < upperYear; year++) {
for (int month = 1; month <= 12; month++) {
DateTime dateTime = new DateTime(year, month, 1, 1, 1);
result.add(dateTime);
}
}
// 最后一年的月份
for (int month = 1; month <= upperMonth; month++) {
DateTime dateTime = new DateTime(upperYear, month, 1, 1, 1);
result.add(dateTime);
}
return result;
}
return result;
}

@Override
public void init() {

}

@Override
public String getType() {
return "MONTH_TABLE_SHARDING_ALGORITHM";
}
}

测试

测试service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 测试类
*/
@Service
@Slf4j
public class APITestServiceImpl {

@Autowired
private MerRegisterRecordMapper merRegisterRecordMapper;

@Transactional
public MerRegisterRecord test1(Long id) {
return test4(id);
}

public MerRegisterRecord test4(Long id) {
MerRegisterRecord merRegisterRecord = merRegisterRecordMapper.selectMerRegisterRecordById(id);
merRegisterRecord.setUpdateTime(new Date());
merRegisterRecordMapper.updateMerRegisterRecord(merRegisterRecord);
merRegisterRecord = merRegisterRecordMapper.selectMerRegisterRecordById(id);
return merRegisterRecord;
}

}

测试controller

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Api(tags = "test")
@RestController
@RequestMapping("/api/test")
@Slf4j
public class APITestController {

@Autowired
private MerRegisterRecordMapper merRegisterRecordMapper;

@Autowired
private OrderCardConsumeMapper orderCardConsumeMapper;

@Autowired
private APITestServiceImpl apiTestService;

@PostMapping("/test1")
@ApiOperation("测试主库写后读走主库")
public AjaxResult test1() throws Exception {
return AjaxResult.success(apiTestService.test1(931548600963096576L));
}

@PostMapping("/test2")
@ApiOperation("测试从库")
public AjaxResult test2() throws Exception {
MerRegisterRecord merRegisterRecord = merRegisterRecordMapper.selectMerRegisterRecordById(931548600963096576L);
return AjaxResult.success(merRegisterRecord);
}

@PostMapping("/test3")
@ApiOperation("测试分表")
public AjaxResult test3() throws Exception {
OrderCardConsume orderCardConsume = new OrderCardConsume();
orderCardConsume.setTradeTime(new DateTime(2022, 3, 1, 1, 1).toDate());
PageHelper.startPage(1, 1);
List<OrderCardConsume> list = orderCardConsumeMapper.selectOrderCardConsumeList(orderCardConsume);
return AjaxResult.success(list);
}

@PostMapping("/test4")
@ApiOperation("测试主库只有修改走主库")
public AjaxResult test4() throws Exception {
return AjaxResult.success(apiTestService.test4(931548600963096576L));
}

}

调用


动态创建月维度表及sharding-jdbc单库分表实现
http://hanqichuan.com/2022/02/09/分库分表/动态创建月维度表及sharding-jdbc5.0单库分表实现/
作者
韩启川
发布于
2022年2月9日
许可协议