Sfoglia il codice sorgente

去掉事务,增加原子性切换临时表为主表阶段触发重试机制

njs 4 mesi fa
parent
commit
4be1156af8

+ 209 - 217
suishenbang-sync/suishenbang-sync-common/src/main/java/com/dgtly/sync/service/HanaOrderComponent.java

@@ -1369,97 +1369,94 @@ public class HanaOrderComponent {
 
 //订单
     public void runLoadSalesOrder(){
+            int maxRetries = 3; // 最大重试次数
+            long retryInterval = 30000; // 重试间隔30秒
+            int retryCount = 0;
+            boolean renameSuccess = false;
+            Exception lastException = null;
 
-        try (
-                Connection conn = getMysqlConnection();
-                Statement stmt = conn.createStatement()
-        ) {
-            // 设置事务隔离级别
-            conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
-            // 开启事务
-            conn.setAutoCommit(false);
+            Connection conn = null;
+            Statement stmt = null;
 
             try {
-                // 1. 创建临时表
+                conn = getMysqlConnection();
+                stmt = conn.createStatement();
+
+                // 1. 创建临时表(无重试)
                 stmt.execute("CREATE TABLE IF NOT EXISTS meta_hana_sales_order_temp LIKE meta_hana_sales_order");
 
-                // 2. 加载数据到临时表
-                String testSql = "LOAD DATA LOCAL INFILE 'testIO.txt' into table meta_hana_sales_order_temp fields terminated by '$$' lines terminated by '\\n'";
+                // 2. 加载数据到临时表(无重试)
+                String testSql = "LOAD DATA LOCAL INFILE 'testIO.txt' into table meta_hana_sales_order_temp " +
+                        "fields terminated by '$$' lines terminated by '\\n'";
                 long beginTime = System.currentTimeMillis();
                 String filePath = Global.getTemdataPath() + "salesOrder/";
                 try (InputStream inputStream = mergeGetStream(new File(filePath))) {
                     int rows = this.bulkLoadFromInputStream(testSql, inputStream);
                     long endTime = System.currentTimeMillis();
-                    log.info("importing salesOrder := {} rows data into mysql from {} and cost {} ms!", rows, filePath, (endTime - beginTime));
-                }catch (Exception e) {
-                    log.error("加载数据到临时表", e);
-                    throw e; // 抛出异常让上层处理
+                    log.info("导入数据到临时表: {} 行, 耗时: {} ms", rows, (endTime - beginTime));
                 }
 
-                //删除旧表
-                // 删除旧表使用同一个连接
-                try (Statement tempStmt = conn.createStatement()) {
-                    tempStmt.execute("DROP TABLE IF EXISTS meta_hana_sales_order_old");
-                    log.info("成功删除旧表: meta_hana_sales_order_old");
-                } catch (Exception e) {
-                    log.error("删除旧表失败,表名: meta_hana_sales_order_old", e);
-                    throw e; // 抛出异常让上层处理
-                }
+                // 3. 删除旧表(无重试)
+                stmt.execute("DROP TABLE IF EXISTS meta_hana_sales_order_old");
+                log.info("成功删除旧表: meta_hana_sales_order_old");
 
-
-
-                // 1. 先设置会话级锁等待超时为3秒
-                try (Statement timeOut = conn.createStatement()) {
-                    timeOut.execute("SET SESSION innodb_lock_wait_timeout = 3");
-                }catch (Exception e) {
-                    log.error("先设置会话级锁等待超时为3秒", e);
-                    throw e; // 抛出异常让上层处理
+                // 4. RENAME操作(带重试机制)
+                while (retryCount < maxRetries && !renameSuccess) {
+                    try {
+                        stmt.execute("RENAME TABLE meta_hana_sales_order TO meta_hana_sales_order_old, " +
+                                "meta_hana_sales_order_temp TO meta_hana_sales_order");
+                        renameSuccess = true;
+                        log.info("表切换成功");
+                    } catch (Exception e) {
+                        lastException = e;
+                        retryCount++;
+                        if (retryCount < maxRetries) {
+                            log.warn("第{}次表切换失败,将在{}ms后重试(剩余重试次数:{})",
+                                    retryCount, retryInterval, maxRetries - retryCount);
+                            try {
+                                Thread.sleep(retryInterval);
+                            } catch (InterruptedException ie) {
+                                Thread.currentThread().interrupt();
+                                throw new RuntimeException("重试被中断", ie);
+                            }
+                        }
+                    }
                 }
 
-                // 2. 执行RENAME并设置语句级超时(原子切换表(RENAME是原子操作))
-                try (Statement renameTable = conn.createStatement()) {
-                    renameTable.setQueryTimeout(3); // 语句执行超时3秒
-                    renameTable.execute("RENAME TABLE meta_hana_sales_order TO meta_hana_sales_order_old, " +
-                            "meta_hana_sales_order_temp TO meta_hana_sales_order");
-                }catch (Exception e) {
-                    log.error("执行RENAME并设置语句级超时(原子切换表(RENAME是原子操作))", e);
-                    throw e; // 抛出异常让上层处理
+                // 5. 清空缓存(无重试)
+                if (renameSuccess) {
+                    String result = HttpUtils.sendGet(orderGetDataHook);
+                    log.info("清空salesOrder缓存:{}", result);
                 }
 
-                // 提交事务
-                conn.commit();
-
-
-                // 清空缓存
-                String result = HttpUtils.sendGet(orderGetDataHook);
-                log.info("清空salesOrder缓存:{}", result);
             } catch (Exception e) {
-                // 回滚事务
-                try {
-                    conn.rollback();
-                    // 清理临时表
+                lastException = e;
+                log.error("处理过程中发生异常", e);
+            } finally {
+                // 资源清理
+                if (stmt != null) {
                     try {
-                        stmt.execute("DROP TABLE IF EXISTS meta_hana_sales_order_temp");
-                    } catch (SQLException ex) {
-                        log.error("清理临时表失败", ex);
+                        stmt.close();
+                    } catch (SQLException e) {
+                        log.error("关闭Statement失败", e);
                     }
-                } catch (SQLException ex) {
-                    log.error("事务回滚失败", ex);
                 }
-
-            } finally {
-                try {
-                    conn.setAutoCommit(true);
-                } catch (SQLException ex) {
-                    log.error("恢复自动提交模式失败", ex);
+                if (conn != null) {
+                    try {
+                        conn.close();
+                    } catch (SQLException e) {
+                        log.error("关闭Connection失败", e);
+                    }
                 }
             }
-        } catch (Exception e) {
-            String name = "meta_hana_sales_order订单表同步异常";
-            userOrderAuthorService.sendMailHanaWarning(name);
-            log.error(name + ", 文件路径: " + Global.getTemdataPath() + "salesOrder/", e);
-            throw new RuntimeException(e);
-        }
+
+            // 最终失败处理
+            if (!renameSuccess) {
+                String name = "meta_hana_sales_order订单表同步失败(重试3次后)";
+                userOrderAuthorService.sendMailHanaWarning(name);
+                log.error(name + ", 文件路径: " + Global.getTemdataPath() + "salesOrder/", lastException);
+                throw new RuntimeException(name, lastException);
+            }
 
 
 
@@ -1510,193 +1507,188 @@ public class HanaOrderComponent {
 
 
     public void runLoadSelfReviewedDeliverOrder()throws Exception{
-        try (
-                Connection conn = getMysqlConnection();
-                Statement stmt = conn.createStatement()
-        ) {
-            // 设置事务隔离级别
-            conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
-            // 开启事务
-            conn.setAutoCommit(false);
-
-            try {
-                // 1. 创建临时表
-                stmt.execute("CREATE TABLE IF NOT EXISTS meta_hana_self_deliver_order_temp LIKE meta_hana_self_deliver_order");
+        int maxRetries = 3; // 最大重试次数
+        long retryInterval = 30000; // 重试间隔30秒
+        int retryCount = 0;
+        boolean renameSuccess = false;
+        Exception lastException = null;
 
-                // 2. 加载数据到临时表
-                String testSql = "LOAD DATA LOCAL INFILE 'testIO.txt' into table meta_hana_self_deliver_order_temp fields terminated by '$$' lines terminated by '\\n'";
-                long beginTime = System.currentTimeMillis();
-                String filePath = Global.getTemdataPath() + "selfDeliverOrder/";
-                try (InputStream inputStream = mergeGetStream(new File(filePath))) {
-                    int rows = this.bulkLoadFromInputStream(testSql, inputStream);
-                    long endTime = System.currentTimeMillis();
-                    log.info("importing selfDeliverOrder := {} rows data into mysql from {} and cost {} ms!", rows, filePath, (endTime - beginTime));
-                }catch (Exception e) {
-                    log.error("加载数据到临时表", e);
-                    throw e; // 抛出异常让上层处理
-                }
-
-
-                //删除旧表
-                try (Statement tempStmt = conn.createStatement()) {
-                    tempStmt.execute("DROP TABLE IF EXISTS meta_hana_self_deliver_order_old");
-                    log.info("成功删除旧表: meta_hana_self_deliver_order_old");
-                } catch (Exception e) {
-                    log.error("删除旧表失败,表名: meta_hana_self_deliver_order_old", e);
-                    throw e; // 抛出异常让上层处理
-                }
+        Connection conn = null;
+        Statement stmt = null;
 
+        try {
+            conn = getMysqlConnection();
+            stmt = conn.createStatement();
+
+            // 1. 创建临时表(无重试)
+            stmt.execute("CREATE TABLE IF NOT EXISTS meta_hana_self_deliver_order_temp LIKE meta_hana_self_deliver_order");
+
+            // 2. 加载数据到临时表(无重试)
+            String testSql = "LOAD DATA LOCAL INFILE 'testIO.txt' into table meta_hana_self_deliver_order_temp " +
+                    "fields terminated by '$$' lines terminated by '\\n'";
+            long beginTime = System.currentTimeMillis();
+            String filePath = Global.getTemdataPath() + "selfDeliverOrder/";
+            try (InputStream inputStream = mergeGetStream(new File(filePath))) {
+                int rows = this.bulkLoadFromInputStream(testSql, inputStream);
+                long endTime = System.currentTimeMillis();
+                log.info("导入数据到临时表: {} 行, 耗时: {} ms", rows, (endTime - beginTime));
+            }
 
-                // 1. 先设置会话级锁等待超时为3秒
-                try (Statement timeOut = conn.createStatement()) {
-                    timeOut.execute("SET SESSION innodb_lock_wait_timeout = 3");
-                }catch (Exception e) {
-                    log.error("先设置会话级锁等待超时为3秒", e);
-                    throw e; // 抛出异常让上层处理
-                }
+            // 3. 删除旧表(无重试)
+            stmt.execute("DROP TABLE IF EXISTS meta_hana_self_deliver_order_old");
+            log.info("成功删除旧表: meta_hana_self_deliver_order_old");
 
-                // 2. 执行RENAME并设置语句级超时(原子切换表(RENAME是原子操作))
-                try (Statement renameTable = conn.createStatement()) {
-                    renameTable.setQueryTimeout(3); // 语句执行超时3秒
-                    renameTable.execute("RENAME TABLE meta_hana_self_deliver_order TO meta_hana_self_deliver_order_old, " +
+            // 4. RENAME操作(带重试机制)
+            while (retryCount < maxRetries && !renameSuccess) {
+                try {
+                    stmt.execute("RENAME TABLE meta_hana_self_deliver_order TO meta_hana_self_deliver_order_old, " +
                             "meta_hana_self_deliver_order_temp TO meta_hana_self_deliver_order");
-                }catch (Exception e) {
-                    log.error("执行RENAME并设置语句级超时(原子切换表(RENAME是原子操作))", e);
-                    throw e; // 抛出异常让上层处理
+                    renameSuccess = true;
+                    log.info("表切换成功");
+                } catch (Exception e) {
+                    lastException = e;
+                    retryCount++;
+                    if (retryCount < maxRetries) {
+                        log.warn("第{}次表切换失败,将在{}ms后重试(剩余重试次数:{})",
+                                retryCount, retryInterval, maxRetries - retryCount);
+                        try {
+                            Thread.sleep(retryInterval);
+                        } catch (InterruptedException ie) {
+                            Thread.currentThread().interrupt();
+                            throw new RuntimeException("重试被中断", ie);
+                        }
+                    }
                 }
+            }
 
 
-                // 提交事务
-                conn.commit();
-
-
-
-            } catch (Exception e) {
-                // 回滚事务
+        } catch (Exception e) {
+            lastException = e;
+            log.error("处理过程中发生异常", e);
+        } finally {
+            // 资源清理
+            if (stmt != null) {
                 try {
-                    conn.rollback();
-                    // 清理临时表
-                    try {
-                        stmt.execute("DROP TABLE IF EXISTS meta_hana_self_deliver_order_temp");
-                    } catch (SQLException ex) {
-                        log.error("清理临时表失败", ex);
-                    }
-                } catch (SQLException ex) {
-                    log.error("事务回滚失败", ex);
+                    stmt.close();
+                } catch (SQLException e) {
+                    log.error("关闭Statement失败", e);
                 }
-
-                throw new RuntimeException(e);
-            } finally {
+            }
+            if (conn != null) {
                 try {
-                    conn.setAutoCommit(true);
-                } catch (SQLException ex) {
-                    log.error("恢复自动提交模式失败", ex);
+                    conn.close();
+                } catch (SQLException e) {
+                    log.error("关闭Connection失败", e);
                 }
             }
-        } catch (Exception e) {
-            String name = "meta_hana_self_deliver_order自提提前过账表同步异常";
+        }
+
+        // 最终失败处理
+        if (!renameSuccess) {
+            String name = "meta_hana_self_deliver_order自提过账表同步失败(重试3次后)";
             userOrderAuthorService.sendMailHanaWarning(name);
-            log.error(name + ", 文件路径: " + Global.getTemdataPath() + "selfDeliverOrder/", e);
-            throw new RuntimeException(e);
+            log.error(name + ", 文件路径: " + Global.getTemdataPath() + "selfDeliverOrder/", lastException);
+            throw new RuntimeException(name, lastException);
         }
 
 
-    }
 
 
-    public void runLoadDeliverOrder() throws Exception{
-        try (
-                Connection conn = getMysqlConnection();
-                Statement stmt = conn.createStatement()
-        ) {
-            // 设置事务隔离级别
-            conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
-            // 开启事务
-            conn.setAutoCommit(false);
 
-            try {
-                // 1. 创建临时表
-                stmt.execute("CREATE TABLE IF NOT EXISTS meta_hana_deliver_order_temp LIKE meta_hana_deliver_order");
 
-                // 2. 加载数据到临时表
-                String testSql = "LOAD DATA LOCAL INFILE 'testIO.txt' into table meta_hana_deliver_order_temp fields terminated by '$$' lines terminated by '\\n'";
-                long beginTime = System.currentTimeMillis();
-                String filePath = Global.getTemdataPath() + "deliverOrder/";
-                try (InputStream inputStream = mergeGetStream(new File(filePath))) {
-                    int rows = this.bulkLoadFromInputStream(testSql, inputStream);
-                    long endTime = System.currentTimeMillis();
-                    log.info("importing deliverOrder := {} rows data into mysql from {} and cost {} ms!", rows, filePath, (endTime - beginTime));
-                }catch (Exception e) {
-                    log.error("加载数据到临时表", e);
-                    throw e; // 抛出异常让上层处理
-                }
-
-                //删除旧表
-                try (Statement tempStmt = conn.createStatement()) {
-                    tempStmt.execute("DROP TABLE IF EXISTS meta_hana_deliver_order_old");
-                    log.info("成功删除旧表: meta_hana_deliver_order_old");
-                } catch (Exception e) {
-                    log.error("删除旧表失败,表名: meta_hana_deliver_order_old", e);
-                    throw e; // 抛出异常让上层处理
-                }
 
+    }
 
-                // 1. 先设置会话级锁等待超时为3秒
-                try (Statement timeOut = conn.createStatement()) {
-                    timeOut.execute("SET SESSION innodb_lock_wait_timeout = 3");
-                }catch (Exception e) {
-                    log.error("先设置会话级锁等待超时为3秒", e);
-                    throw e; // 抛出异常让上层处理
-                }
 
-                // 2. 执行RENAME并设置语句级超时(原子切换表(RENAME是原子操作))
-                try (Statement renameTable = conn.createStatement()) {
-                    renameTable.setQueryTimeout(3); // 语句执行超时3秒
-                    renameTable.execute("RENAME TABLE meta_hana_deliver_order TO meta_hana_deliver_order_old, " +
-                            "meta_hana_deliver_order_temp TO meta_hana_deliver_order");
-                }catch (Exception e) {
-                    log.error("执行RENAME并设置语句级超时(原子切换表(RENAME是原子操作))", e);
-                    throw e; // 抛出异常让上层处理
-                }
+    public void runLoadDeliverOrder() throws Exception{
+        int maxRetries = 3; // 最大重试次数
+        long retryInterval = 30000; // 重试间隔30秒
+        int retryCount = 0;
+        boolean renameSuccess = false;
+        Exception lastException = null;
 
+        Connection conn = null;
+        Statement stmt = null;
 
-                // 提交事务
-                conn.commit();
+        try {
+            conn = getMysqlConnection();
+            stmt = conn.createStatement();
+
+            // 1. 创建临时表(无重试)
+            stmt.execute("CREATE TABLE IF NOT EXISTS meta_hana_deliver_order_temp LIKE meta_hana_deliver_order");
+
+            // 2. 加载数据到临时表(无重试)
+            String testSql = "LOAD DATA LOCAL INFILE 'testIO.txt' into table meta_hana_deliver_order_temp " +
+                    "fields terminated by '$$' lines terminated by '\\n'";
+            long beginTime = System.currentTimeMillis();
+            String filePath = Global.getTemdataPath() + "deliverOrder/";
+            try (InputStream inputStream = mergeGetStream(new File(filePath))) {
+                int rows = this.bulkLoadFromInputStream(testSql, inputStream);
+                long endTime = System.currentTimeMillis();
+                log.info("导入数据到临时表: {} 行, 耗时: {} ms", rows, (endTime - beginTime));
+            }
 
+            // 3. 删除旧表(无重试)
+            stmt.execute("DROP TABLE IF EXISTS meta_hana_deliver_order_old");
+            log.info("成功删除旧表: meta_hana_sales_order_old");
 
+            // 4. RENAME操作(带重试机制)
+            while (retryCount < maxRetries && !renameSuccess) {
+                try {
+                    stmt.execute("RENAME TABLE meta_hana_deliver_order TO meta_hana_deliver_order_old, " +
+                            "meta_hana_deliver_order_temp TO meta_hana_deliver_order");
+                    renameSuccess = true;
+                    log.info("表切换成功");
+                } catch (Exception e) {
+                    lastException = e;
+                    retryCount++;
+                    if (retryCount < maxRetries) {
+                        log.warn("第{}次表切换失败,将在{}ms后重试(剩余重试次数:{})",
+                                retryCount, retryInterval, maxRetries - retryCount);
+                        try {
+                            Thread.sleep(retryInterval);
+                        } catch (InterruptedException ie) {
+                            Thread.currentThread().interrupt();
+                            throw new RuntimeException("重试被中断", ie);
+                        }
+                    }
+                }
+            }
 
-                // 清空缓存
+            // 5. 清空缓存(无重试)
+            if (renameSuccess) {
                 String result = HttpUtils.sendGet(orderGetDataHook);
                 log.info("清空deliverOrder缓存:{}", result);
-            } catch (Exception e) {
-                // 回滚事务
+            }
+
+        } catch (Exception e) {
+            lastException = e;
+            log.error("处理过程中发生异常", e);
+        } finally {
+            // 资源清理
+            if (stmt != null) {
                 try {
-                    conn.rollback();
-                    // 清理临时表
-                    try {
-                        stmt.execute("DROP TABLE IF EXISTS meta_hana_deliver_order_temp");
-                    } catch (SQLException ex) {
-                        log.error("清理临时表失败", ex);
-                    }
-                } catch (SQLException ex) {
-                    log.error("事务回滚失败", ex);
+                    stmt.close();
+                } catch (SQLException e) {
+                    log.error("关闭Statement失败", e);
                 }
-
-                throw new RuntimeException(e);
-            } finally {
+            }
+            if (conn != null) {
                 try {
-                    conn.setAutoCommit(true);
-                } catch (SQLException ex) {
-                    log.error("恢复自动提交模式失败", ex);
+                    conn.close();
+                } catch (SQLException e) {
+                    log.error("关闭Connection失败", e);
                 }
             }
-        } catch (Exception e) {
-            String name = "meta_hana_deliver_order订单表同步异常";
-            userOrderAuthorService.sendMailHanaWarning(name);
-            log.error(name + ", 文件路径: " + Global.getTemdataPath() + "deliverOrder/", e);
-            throw new RuntimeException(e);
         }
 
+        // 最终失败处理
+        if (!renameSuccess) {
+            String name = "meta_hana_deliver_order交货表同步失败(重试3次后)";
+            userOrderAuthorService.sendMailHanaWarning(name);
+            log.error(name + ", 文件路径: " + Global.getTemdataPath() + "deliverOrder/", lastException);
+            throw new RuntimeException(name, lastException);
+        }