Kaynağa Gözat

1. 创建临时表并加载数据(从指定路径合并文件流批量导入MySQL)
2. 通过原子性RENAME操作切换新旧表(设置超时防止锁等待)
3. 异步删除旧表并清空相关缓存
4. 全程使用事务管理,失败时回滚并发送告警邮件

njs 5 ay önce
ebeveyn
işleme
51c83f3fb7

+ 105 - 41
suishenbang-sync/suishenbang-sync-common/src/main/java/com/dgtly/sync/service/HanaOrderComponent.java

@@ -1393,15 +1393,23 @@ public class HanaOrderComponent {
                     log.info("importing salesOrder := {} rows data into mysql from {} and cost {} ms!", rows, filePath, (endTime - beginTime));
                 }
 
-                // 3. 原子切换表(RENAME是原子操作)
-                stmt.execute("RENAME TABLE meta_hana_sales_order TO meta_hana_sales_order_old, " +
-                        "meta_hana_sales_order_temp TO meta_hana_sales_order");
+                // 1. 先设置会话级锁等待超时为3秒
+                try (Statement timeOut = conn.createStatement()) {
+                    timeOut.execute("SET SESSION innodb_lock_wait_timeout = 3");
+                }
+
+                // 2. 执行RENAME并设置语句级超时(原子切换表(RENAME是原子操作))
+                try (Statement renameTable = conn.createStatement()) {
+                    renameTable.setQueryTimeout(3); // 语句执行超时3秒
+                    renameTable.execute("RENAME TABLE meta_hana_sales_order TO meta_hana_sales_order_old, " +
+                            "meta_hana_sales_order_temp TO meta_hana_sales_order");
+                }
 
                 // 提交事务
                 conn.commit();
 
                 // 4. 异步删除旧表
-                CompletableFuture.runAsync(() -> {
+               /* CompletableFuture.runAsync(() -> {
                     try (Connection tempConn = getMysqlConnection();
                          Statement tempStmt = tempConn.createStatement()) {
                         tempStmt.execute("DROP TABLE IF EXISTS meta_hana_sales_order_old");
@@ -1412,7 +1420,7 @@ public class HanaOrderComponent {
                 }).exceptionally(ex -> {
                     log.error("异步删除旧表操作异常", ex);
                     return null;
-                });
+                });*/
 
                 // 清空缓存
                 String result = HttpUtils.sendGet(orderGetDataHook);
@@ -1494,43 +1502,90 @@ public class HanaOrderComponent {
 
 
     public void runLoadSelfReviewedDeliverOrder()throws Exception{
-        //生产表
-        Connection conn = null;
-        PreparedStatement pstm =null;
-        try {
-            conn = getMysqlConnection();
-            //生产表
-            String sql = "TRUNCATE TABLE meta_hana_self_deliver_order";
-            pstm = conn.prepareStatement(sql);
-            pstm.execute();
-            String testSql = "LOAD DATA LOCAL INFILE 'testIO.txt' into table meta_hana_self_deliver_order fields terminated by '$$' lines terminated by '\\n'";
-            long beginTime=System.currentTimeMillis();
-            int rows=this.bulkLoadFromInputStream(testSql, mergeGetStream(new File((Global.getTemdataPath()+"selfDeliverOrder/"))));
-            long endTime=System.currentTimeMillis();
-            log.info("importing selfDeliverOrder := "+rows+" rows data into mysql and cost "+(endTime-beginTime)+" ms!");
-        } catch (SQLException e) {
-            String name ="meta_hana_self_deliver_order 自提提前过账表同步异常";
-            userOrderAuthorService.sendMailHanaWarning(name);
-            log.error(name, e);
-            e.printStackTrace();
-        }finally {
-            if(pstm!=null){
+        try (
+                Connection conn = getMysqlConnection();
+                Statement stmt = conn.createStatement()
+        ) {
+            // 设置事务隔离级别
+            conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+            // 开启事务
+            conn.setAutoCommit(false);
+
+            try {
+                // 1. 创建临时表
+                stmt.execute("CREATE TABLE IF NOT EXISTS meta_hana_self_deliver_order_temp LIKE meta_hana_self_deliver_order");
+
+                // 2. 加载数据到临时表
+                String testSql = "LOAD DATA LOCAL INFILE 'testIO.txt' into table meta_hana_self_deliver_order_temp fields terminated by '$$' lines terminated by '\\n'";
+                long beginTime = System.currentTimeMillis();
+                String filePath = Global.getTemdataPath() + "selfDeliverOrder/";
+                try (InputStream inputStream = mergeGetStream(new File(filePath))) {
+                    int rows = this.bulkLoadFromInputStream(testSql, inputStream);
+                    long endTime = System.currentTimeMillis();
+                    log.info("importing selfDeliverOrder := {} rows data into mysql from {} and cost {} ms!", rows, filePath, (endTime - beginTime));
+                }
+
+                // 1. 先设置会话级锁等待超时为3秒
+                try (Statement timeOut = conn.createStatement()) {
+                    timeOut.execute("SET SESSION innodb_lock_wait_timeout = 3");
+                }
+
+                // 2. 执行RENAME并设置语句级超时(原子切换表(RENAME是原子操作))
+                try (Statement renameTable = conn.createStatement()) {
+                    renameTable.setQueryTimeout(3); // 语句执行超时3秒
+                    renameTable.execute("RENAME TABLE meta_hana_self_deliver_order TO meta_hana_self_deliver_order_old, " +
+                            "meta_hana_self_deliver_order_temp TO meta_hana_self_deliver_order");
+                }
+
+
+                // 提交事务
+                conn.commit();
+
+                // 4. 异步删除旧表
+              /*  CompletableFuture.runAsync(() -> {
+                    try (Connection tempConn = getMysqlConnection();
+                         Statement tempStmt = tempConn.createStatement()) {
+                        tempStmt.execute("DROP TABLE IF EXISTS meta_hana_self_deliver_order_old");
+                        log.info("成功删除旧表: meta_hana_self_deliver_order_old");
+                    } catch (Exception e) {
+                        log.error("删除旧表失败,表名: meta_hana_self_deliver_order_old", e);
+                    }
+                }).exceptionally(ex -> {
+                    log.error("异步删除旧表操作异常", ex);
+                    return null;
+                });*/
+
+
+            } catch (Exception e) {
+                // 回滚事务
                 try {
-                    pstm.close();
-                } catch (SQLException e) {
-                    e.printStackTrace();
-                    throw new RuntimeException(e);
+                    conn.rollback();
+                    // 清理临时表
+                    try {
+                        stmt.execute("DROP TABLE IF EXISTS meta_hana_self_deliver_order_temp");
+                    } catch (SQLException ex) {
+                        log.error("清理临时表失败", ex);
+                    }
+                } catch (SQLException ex) {
+                    log.error("事务回滚失败", ex);
                 }
-            }
-            if(conn!=null){
+
+                throw new RuntimeException(e);
+            } finally {
                 try {
-                    conn.close();
-                } catch (SQLException e) {
-                    e.printStackTrace();
-                    throw new RuntimeException(e);
+                    conn.setAutoCommit(true);
+                } catch (SQLException ex) {
+                    log.error("恢复自动提交模式失败", ex);
                 }
             }
+        } catch (Exception e) {
+            String name = "meta_hana_self_deliver_order自提提前过账表同步异常";
+            userOrderAuthorService.sendMailHanaWarning(name);
+            log.error(name + ", 文件路径: " + Global.getTemdataPath() + "selfDeliverOrder/", e);
+            throw new RuntimeException(e);
         }
+
+
     }
 
 
@@ -1558,15 +1613,24 @@ public class HanaOrderComponent {
                     log.info("importing deliverOrder := {} rows data into mysql from {} and cost {} ms!", rows, filePath, (endTime - beginTime));
                 }
 
-                // 3. 原子切换表(RENAME是原子操作)
-                stmt.execute("RENAME TABLE meta_hana_deliver_order TO meta_hana_deliver_order_old, " +
-                        "meta_hana_deliver_order_temp TO meta_hana_deliver_order");
+                // 1. 先设置会话级锁等待超时为3秒
+                try (Statement timeOut = conn.createStatement()) {
+                    timeOut.execute("SET SESSION innodb_lock_wait_timeout = 3");
+                }
+
+                // 2. 执行RENAME并设置语句级超时(原子切换表(RENAME是原子操作))
+                try (Statement renameTable = conn.createStatement()) {
+                    renameTable.setQueryTimeout(3); // 语句执行超时3秒
+                    renameTable.execute("RENAME TABLE meta_hana_deliver_order TO meta_hana_deliver_order_old, " +
+                            "meta_hana_deliver_order_temp TO meta_hana_deliver_order");
+                }
+
 
                 // 提交事务
                 conn.commit();
 
                 // 4. 异步删除旧表
-                CompletableFuture.runAsync(() -> {
+               /* CompletableFuture.runAsync(() -> {
                     try (Connection tempConn = getMysqlConnection();
                          Statement tempStmt = tempConn.createStatement()) {
                         tempStmt.execute("DROP TABLE IF EXISTS meta_hana_deliver_order_old");
@@ -1577,7 +1641,7 @@ public class HanaOrderComponent {
                 }).exceptionally(ex -> {
                     log.error("异步删除旧表操作异常", ex);
                     return null;
-                });
+                });*/
 
                 // 清空缓存
                 String result = HttpUtils.sendGet(orderGetDataHook);