一.背景
要做数据模拟,需要在测试环境创建7千万的流水数据,进行迁移的模拟动作。
二.具体代码
private static final String DB_URL = "jdbc:mysql://IP:3306/twallet_dev?zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&useAffectedRows=true&rewriteBatchedStatements=true";private static final String USER = "root";private static final String PASS = "123456";private static final String TABLE_NAME = "tstd_jour_copy1";private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");private static final DateTimeFormatter formatter2 = DateTimeFormatter.ofPattern("yyyyMMdd");private SnowflakeIdWorker snowflakeIdWorker = new SnowflakeIdWorker(0,0);public void testinsert() {try {Class.forName("com.mysql.cj.jdbc.Driver");Connection conn = DriverManager.getConnection(DB_URL, USER, PASS);String sql = "INSERT INTO " + TABLE_NAME + " (`code`, `type`, `user_id`, `account_number`, `account_type`, `currency`, `biz_type`, `biz_note`, `en_biz_note`, `trans_object`, `trans_object_user_id`, `trans_amount`, `pre_amount`, `post_amount`, `status`, `push_status`, `channel_type`, `channel_order`, `prev_jour_code`, `ref_no`, `remark`, `create_datetime`, `work_date`, `check_user`, `check_note`, `check_datetime`, `adjust_user`, `adjust_note`, `adjust_datetime`, `trans_order_no`, `subsidiary_code`)" +" VALUES (?, ?, ?, ?, ?, ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";PreparedStatement pstmt = conn.prepareStatement(sql);conn.setAutoCommit(false);LocalDate startDate = LocalDate.of(2022, 1, 1);for (int i = 1; i < 15000000; i++) {int randomDay = random.nextInt(365) + 1; // 添LocalDate randomDate = startDate.plusDays(randomDay);String formattedDate = randomDate.atStartOfDay().format(formatter);String formattedDatesd = randomDate.atStartOfDay().format(formatter2);pstmt.setString(1,"AJ" + snowflakeIdWorker.nextId() + "");//codepstmt.setString(2,"1");pstmt.setString(3,"U20200218112349891757358");pstmt.setString(4,"A202002181123499425288");pstmt.setString(5,"C");pstmt.setString(6,"Points");pstmt.setString(7,"otc_sell_frozen");pstmt.setString(8,"出售冻结提交购买订单");pstmt.setString(9,null);pstmt.setString(10,null);pstmt.setString(11,null);pstmt.setBigDecimal(12,new BigDecimal(28000000000L));pstmt.setBigDecimal(13,new BigDecimal(0L));pstmt.setBigDecimal(14,new BigDecimal(28000000000L));pstmt.setString(15,"1");pstmt.setString(16,null);pstmt.setString(17,"0");pstmt.setString(18,null);pstmt.setString(19,null);pstmt.setString(20,"OO20200218150028928325362");pstmt.setString(21,"记得对账哦");pstmt.setString(22,formattedDate);pstmt.setString(23,formattedDatesd);pstmt.setString(24,null);pstmt.setString(25,null);pstmt.setString(26,null);pstmt.setString(27,null);pstmt.setString(28,null);pstmt.setString(29,null);pstmt.setString(30,null);pstmt.setString(31,"tbay000000001");pstmt.addBatch();if (i % 10000 == 0) {pstmt.executeBatch();pstmt.clearBatch();System.out.println("Inserted records up to index: " + i);}}pstmt.executeBatch();pstmt.clearBatch();conn.setAutoCommit(true);pstmt.close();conn.close();} catch (ClassNotFoundException | SQLException se) {// Handle errors for JDBCse.printStackTrace();}}public class SnowflakeIdWorker {private static final long twepoch = 1288834974657L; // Twitter Snowflake epoch时间戳private static final long workerIdBits = 5L; // 工作节点ID位数private static final long datacenterIdBits = 5L; // 数据中心ID位数private static final long maxWorkerId = -1L ^ (-1L << workerIdBits); // 最大工作节点IDprivate static final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits); // 最大数据中心IDprivate static final long sequenceBits = 12L; // 序列号位数private static final long workerIdShift = sequenceBits; // 工作节点ID左移位数private static final long datacenterIdShift = sequenceBits + workerIdBits; // 数据中心ID左移位数private static final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits; // 时间戳左移位数private static final long sequenceMask = -1L ^ (-1L << sequenceBits); // 序列号掩码private long workerId; // 工作节点IDprivate long datacenterId; // 数据中心IDprivate long sequence = 0L; // 序列号private long lastTimestamp = -1L; // 上次生成ID的时间戳public SnowflakeIdWorker(long workerId, long datacenterId) {if (workerId > maxWorkerId || workerId < 0) {throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));}if (datacenterId > maxDatacenterId || datacenterId < 0) {throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));}this.workerId = workerId;this.datacenterId = datacenterId;}/*** 生成下一个ID** @return 下一个ID*/public synchronized long nextId() {long timestamp = timeGen();// 如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过,抛出异常if (timestamp < lastTimestamp) {throw new RuntimeException(String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));}// 如果是同一毫秒内生成的,则进行序列号自增if (lastTimestamp == timestamp) {sequence = (sequence + 1) & sequenceMask;// 检查是否溢出:每个节点每毫秒能够产生的序列数最大是4096,如果溢出则需要等待到下一毫秒if (sequence == 0) {timestamp = tilNextMillis(lastTimestamp);}} else {// 时间戳改变,重置序列号sequence = 0L;}// 更新上次生成ID的时间戳lastTimestamp = timestamp;// 移位并通过或运算拼接成64位IDreturn ((timestamp - twepoch) << timestampLeftShift)| (datacenterId << datacenterIdShift)| (workerId << workerIdShift)| sequence;}/*** 获取当前时间戳** @return 当前时间戳*/protected long timeGen() {return System.currentTimeMillis();}/*** 等待到下一毫秒** @param lastTimestamp 上次生成ID的时间戳* @return 下一毫秒的时间戳*/protected long tilNextMillis(long lastTimestamp) {long timestamp = timeGen();while (timestamp <= lastTimestamp) {timestamp = timeGen();}return timestamp;}}
三.注意
1. mysql的连接需要增加参数 rewriteBatchedStatements=true
2.自动提交的要关掉,代码手动提交事物