上亿数据解决方案,SpringBoot整合Sharding Jdbc+Mysql,雪花算法做分布式主键

    技术2026-01-12  8

    SpringBoot整合 Sharding Jdbc

    Sharding Jdbc介绍

    就是一款数据库中间件,可以做分库分表、读写分离专用,属于Apache基金会的顶级项目

    pom依赖

    <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> <spring-boot.version>2.1.3.RELEASE</spring-boot.version> <spring.version>5.1.5.RELEASE</spring.version> <mybatisplus.version>3.0.7.1</mybatisplus.version> <mysql.version>5.1.38</mysql.version> <druid.version>1.1.13</druid.version> <shard-jdbc.version>3.1.0</shard-jdbc.version> <fastjson.version>1.2.47</fastjson.version> <pagehelper.version>1.2.5</pagehelper.version> </properties> <dependencies> <dependency> <groupId>org.flywaydb</groupId> <artifactId>flyway-core</artifactId> <version>5.2.3</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.20</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>${spring-boot.version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <version>${spring-boot.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> <version>${spring-boot.version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <version>${spring-boot.version}</version> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> <version>${spring.version}</version> </dependency> <!-- 使用XA事务时,需要引入此模块 --> <!--<dependency>--> <!--<groupId>org.apache.shardingsphere</groupId>--> <!--<artifactId>shardingsphere-transaction-xa-core</artifactId>--> <!--<version>${shard-jdbc.version}</version>--> <!--</dependency>--> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>${mybatisplus.version}</version> <exclusions> <exclusion> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-generator</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus</artifactId> <version>${mybatisplus.version}</version> </dependency> <dependency> <groupId>com.github.pagehelper</groupId> <artifactId>pagehelper-spring-boot-starter</artifactId> <version>${pagehelper.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>${druid.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> <dependency> <groupId>io.shardingsphere</groupId> <artifactId>sharding-jdbc-core</artifactId> <version>${shard-jdbc.version}</version> </dependency> <dependency> <groupId>io.shardingsphere</groupId> <artifactId>sharding-transaction-spring</artifactId> <version>3.1.0</version> </dependency>

    整合spring的配置

    package com.shard.jdbc.config;

    import io.shardingsphere.api.config.rule.ShardingRuleConfiguration; import io.shardingsphere.api.config.rule.TableRuleConfiguration; import io.shardingsphere.api.config.strategy.StandardShardingStrategyConfiguration; import io.shardingsphere.core.keygen.DefaultKeyGenerator; import io.shardingsphere.shardingjdbc.api.ShardingDataSourceFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;

    import javax.annotation.Resource; import javax.sql.DataSource; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import java.util.Properties; @Configuration public class ShardingDataSourceConfig { @Resource private ShardJdbcCon shardJdbcCon; /** * Shard-JDBC 分库配置 */ @Bean public DataSource dataSource () throws Exception { ShardingRuleConfiguration shardJdbcConfig = new ShardingRuleConfiguration(); shardJdbcConfig.getTableRuleConfigs().add(getTableRule01()); // shardJdbcConfig.getTableRuleConfigs().add(getTableRule02()); shardJdbcConfig.setDefaultDataSourceName(“ds_0”); Properties prop = new Properties(); prop.setProperty(“sql.show”,“true”);//开启日志sql prop.put(“executor.size”,“100”);//sharding 线程池 Map<String,DataSource> dataMap = new LinkedHashMap<>() ; String prefix=“ds_”; String data=“shard_”; String url=“jdbc:mysql://localhost:3306/%s?useUnicode=true&characterEncoding=UTF8&zeroDateTimeBehavior=convertToNull&useSSL=false”; for (int i = 0; i <DataSourceConstant.DATABASE_COUNT ; i++) { String s=String.format(url,data+i); dataMap.put(prefix+i,shardJdbcCon.dataOneSource(s)); } return ShardingDataSourceFactory.createDataSource(dataMap, shardJdbcConfig, new HashMap<>(), prop); }

    /** * Shard-JDBC 分表配置 */ private static TableRuleConfiguration getTableRule01() { TableRuleConfiguration result = new TableRuleConfiguration(); result.setLogicTable("table_one"); result.setActualDataNodes(DataSourceConstant.get()); DefaultKeyGenerator defaultKeyGenerator=new DefaultKeyGenerator(); result.setKeyGenerator(defaultKeyGenerator); result.setKeyGeneratorColumnName("id"); result.setDatabaseShardingStrategyConfig(new StandardShardingStrategyConfiguration("phone", new DataSourceAlg())); result.setTableShardingStrategyConfig(new StandardShardingStrategyConfiguration("phone", new TableOneAlg())); return result; }

    }

    配置分库分表的算法

    package com.shard.jdbc.config;

    import com.shard.jdbc.utils.HashUtil; import io.shardingsphere.api.algorithm.sharding.PreciseShardingValue; import io.shardingsphere.api.algorithm.sharding.standard.PreciseShardingAlgorithm; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collection;

    /**

    数据库映射计算 */ public class DataSourceAlg implements PreciseShardingAlgorithm {

    private static Logger LOG = LoggerFactory.getLogger(DataSourceAlg.class); @Override public String doSharding(Collection names, PreciseShardingValue value) { LOG.debug(“分库算法参数 {},{}”,names,value); int hash = HashUtil.getIndex(DataSourceConstant.DATABASE_COUNT,value.getValue()); return “ds_” + hash; } }

    package com.shard.jdbc.config;

    import com.shard.jdbc.utils.HashUtil; import io.shardingsphere.api.algorithm.sharding.PreciseShardingValue; import io.shardingsphere.api.algorithm.sharding.standard.PreciseShardingAlgorithm; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collection;

    /**

    分表算法 */ public class TableOneAlg implements PreciseShardingAlgorithm {

    private static Logger LOG = LoggerFactory.getLogger(TableOneAlg.class);

    /**

    该表每个库分5张表 */ @Override public String doSharding(Collection names, PreciseShardingValue value) { LOG.debug(“分表算法参数 {},{}”,names,value); int hash = HashUtil.getIndex(DataSourceConstant.TABLE_COUNT,String.valueOf(value.getValue())); return “table_one_”+ hash; } }

    生成数据库表配置

    这里使用的flyway生成结合存储过程

    package com.shard.jdbc.config;

    import io.shardingsphere.shardingjdbc.jdbc.core.datasource.ShardingDataSource; import lombok.extern.slf4j.Slf4j; import org.flywaydb.core.api.configuration.ClassicConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component;

    import javax.annotation.Resource; import javax.sql.DataSource; import java.util.Map;

    @Component @Slf4j public class FlywayInit { @Resource private ShardingDataSource dataSource;

    @Bean public void flyway2() { Map<String, DataSource> dataSourceMap = dataSource.getDataSourceMap(); for (Map.Entry<String, DataSource> entry : dataSourceMap.entrySet()) { ClassicConfiguration configuration = new ClassicConfiguration(); configuration.setBaselineOnMigrate(true); configuration.setDataSource(entry.getValue()); configuration.setCleanOnValidationError(true); org.flywaydb.core.Flyway flyway = new org.flywaydb.core.Flyway(configuration); flyway.migrate(); } }

    }

    //存储过程代码:

    DELIMITER $$

    DROP PROCEDURE IF EXISTS pro_TableCreate$$

    CREATE PROCEDURE pro_TableCreate( ) BEGIN DECLARE i INT; DECLARE table_name VARCHAR(20); SET i = 0;

    WHILE i<25 DO #为了使表名成为xxx00这样的格式加的条件判断 IF i<10 THEN SET table_name = CONCAT(‘table_one_’,i); ELSE SET table_name = CONCAT(‘table_one_’,i); END IF;

    SET @csql = CONCAT( 'CREATE TABLE ‘,table_name,’( id bigint NOT NULL auto_increment COMMENT"主键ID", phone varchar(20) comment"注释", back_one varchar(50) comment"注释", back_two varchar(50) comment"注释", back_three varchar(50) comment"注释",

    PRIMARY KEY(id) )ENGINE=Innodb default charset=utf8;’ );

    PREPARE create_stmt FROM @csql; EXECUTE create_stmt; SET i = i+1; END WHILE;

    END$$ DELIMITER ; call pro_TableCreate();

    哈希一致性算法

    主要用来做哈希路由使用。 package com.shard.jdbc.utils;

    import org.springframework.util.CollectionUtils;

    import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.stream.Collectors;

    public class HashUtil { /** * bkd一致性哈希算法 * * @param str * @return */ public static int bkdrhash(String str) { final int seed = 131;

    int hash = 0; for (int i = 0; i < str.length(); i++) { hash = hash * seed + (int) str.charAt(i); } return hash & 0x7FFFFFFF; } /** * pjw一致性哈希算法 * * @param str * @return */ public static int pjwHash(String str) { int BitsInUnignedInt = 32; int ThreeQuarters = 24; int OneEighth = 4; int HighBits = (int) (0xFFFFFFFF) << (BitsInUnignedInt - OneEighth); int hash = 0; int test = 0; for (int i = 0; i < str.length(); i++) { hash = (hash << OneEighth) + (int) str.charAt(i); if ((test = hash & HighBits) != 0) { hash = ((hash ^ (test >> ThreeQuarters)) & (~HighBits)); } } return hash & 0x7FFFFFFF; } /** * rs一致性哈希算法 * * @param value * @return */ public static int rsHash(String value) { int one = 378551; int two = 63689; int hash = 0; for (int i = 0; i < value.length(); i++) { hash = hash * two + value.charAt(i); two = two * one; } return (hash & 0x7FFFFFFF); } /** * 在分布式服务中,获取路由 * * @param num * @param str * @return */ public static int getIndex1(Integer num, String str) { return pjwHash(str) % num; } public static int getIndex(Integer num, String str) { return rsHash(str) % num; } /** * 计算能均匀分布的数据库数量和表数量 * 默认为10000个数,需要手动选择符合条件的数据库数量和表数量: 数据库数量*表数量=250 * @param x * @param y * @return */ public static Map<String, String> cal(int x, int y) { Map<String, String> result = new TreeMap<>(); for (int c = 1; c < x; c++) { for (int d = 1; d < y; d++) { Map<String, Integer> map =getCountMap(x,y); List<Integer> result4 = map.values().stream().filter(e -> e.equals(0)) .collect(Collectors.toList()); if (CollectionUtils.isEmpty(result4)) { result.put(c + "" + d, c + "$" + d); } } } return result; } /** * 统计分布的总数量 * @param a * @param b * @return */

    public static Map<String,Integer> getCountMap(int a,int b){ Map<String, Integer> map = new HashMap<>(); for (int i = 0; i < a; i++) { for (int j = 0; j < b; j++) { String s = i + “” + j; map.put(s, 0); } } for (int i = 0; i < 10000; i++) { int data = getIndex(a, String.valueOf(i)); int col = getIndex(b, String.valueOf(i)); String s = data + “” + col; int count = map.get(s) + 1; map.put(s, count); } // map.forEach((x, y) -> { // System.out.println(“key=” + x + " value=" + y); // }); return map; } }

    配置分库数量和每个数据库的表数量

    package com.shard.jdbc.config;

    public class DataSourceConstant { public static Integer DATABASE_COUNT=11; public static Integer TABLE_COUNT=23; public static String get(){ String s=“ds_KaTeX parse error: Expected '}', got 'EOF' at end of input: ….%s}.table_one_{0…%s}”; String endD=String.valueOf(DATABASE_COUNT-1); String table=String.valueOf(TABLE_COUNT-1); System.out.println(String.format(s,endD,table)); return String.format(s,endD,table); } }

    效果图展示

    这里我使用了11个数据库,23张表,HashUtils方法可以计算250张表如果要均匀分配需要几个库并且每个库几张表。

    完整的项目链接下载地址

    csdn下载:https://download.csdn.net/download/qq_38926472/12635187 github地址:https://github.com/yanhua0/shard-jdbc

    Processed: 0.014, SQL: 9