Sfoglia il codice sorgente

修改同步订单/交货单 方法采用事务和临时表更名的方式来处理

njs 5 mesi fa
parent
commit
d825b12428

+ 158 - 68
suishenbang-sync/suishenbang-sync-common/src/main/java/com/dgtly/sync/service/HanaOrderComponent.java

@@ -19,6 +19,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
 @Component
 public class HanaOrderComponent {
@@ -1298,6 +1299,7 @@ public class HanaOrderComponent {
         }catch (Exception e){
             String name ="meta_hana_order_maabc订单C类标识表同步异常";
             userOrderAuthorService.sendMailHanaWarning(name);
+            log.error(name, e);
             e.printStackTrace();
         }finally {
             if(pstm!=null){
@@ -1342,6 +1344,7 @@ public class HanaOrderComponent {
         }catch (Exception e){
             String name ="meta_hana_deliver_sign电子单签收表同步异常";
             userOrderAuthorService.sendMailHanaWarning(name);
+            log.error(name, e);
             e.printStackTrace();
         }finally {
             if(pstm!=null){
@@ -1366,48 +1369,92 @@ public class HanaOrderComponent {
 
 //订单
     public void runLoadSalesOrder(){
-        //生产表
-        Connection conn = null;
-        PreparedStatement pstm =null;
-        try{
-            conn = getMysqlConnection();
-            String sql = "TRUNCATE TABLE meta_hana_sales_order";
-            pstm = conn.prepareStatement(sql);
-            pstm.execute();
-            String testSql = "LOAD DATA LOCAL INFILE 'testIO.txt' into table meta_hana_sales_order fields terminated by '$$' lines terminated by '\\n'";
-            long beginTime=System.currentTimeMillis();
-            int rows=this.bulkLoadFromInputStream(testSql, mergeGetStream(new File((Global.getTemdataPath()+"salesOrder/"))));
-            long endTime=System.currentTimeMillis();
-            log.info("importing salesOrder := "+rows+" rows data into mysql and cost "+(endTime-beginTime)+" ms!");
-            String result = HttpUtils.sendGet(orderGetDataHook);
-            log.info("清空salesOrder缓存:"+result);
-        }catch (Exception e){
-            String name ="meta_hana_sales_order订单表同步异常";
-            userOrderAuthorService.sendMailHanaWarning(name);
-            e.printStackTrace();
-        }finally {
-            if(pstm!=null){
+
+        try (
+                Connection conn = getMysqlConnection();
+                Statement stmt = conn.createStatement()
+        ) {
+            // 设置事务隔离级别
+            conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+            // 开启事务
+            conn.setAutoCommit(false);
+
+            try {
+                // 1. 创建临时表
+                stmt.execute("CREATE TABLE IF NOT EXISTS meta_hana_sales_order_temp LIKE meta_hana_sales_order");
+
+                // 2. 加载数据到临时表
+                String testSql = "LOAD DATA LOCAL INFILE ? into table meta_hana_sales_order_temp fields terminated by '$$' lines terminated by '\\n'";
+                long beginTime = System.currentTimeMillis();
+                String filePath = Global.getTemdataPath() + "salesOrder/";
+                try (InputStream inputStream = mergeGetStream(new File(filePath))) {
+                    int rows = this.bulkLoadFromInputStream(testSql, inputStream);
+                    long endTime = System.currentTimeMillis();
+                    log.info("importing salesOrder := {} rows data into mysql from {} and cost {} ms!", rows, filePath, (endTime - beginTime));
+                }
+
+                // 3. 原子切换表(RENAME是原子操作)
+                stmt.execute("RENAME TABLE meta_hana_sales_order TO meta_hana_sales_order_old, " +
+                        "meta_hana_sales_order_temp TO meta_hana_sales_order");
+
+                // 提交事务
+                conn.commit();
+
+                // 4. 异步删除旧表
+                CompletableFuture.runAsync(() -> {
+                    try (Connection tempConn = getMysqlConnection();
+                         Statement tempStmt = tempConn.createStatement()) {
+                        tempStmt.execute("DROP TABLE IF EXISTS meta_hana_sales_order_old");
+                        log.info("成功删除旧表: meta_hana_sales_order_old");
+                    } catch (Exception e) {
+                        log.error("删除旧表失败,表名: meta_hana_sales_order_old", e);
+                    }
+                }).exceptionally(ex -> {
+                    log.error("异步删除旧表操作异常", ex);
+                    return null;
+                });
+
+                // 清空缓存
+                String result = HttpUtils.sendGet(orderGetDataHook);
+                log.info("清空salesOrder缓存:{}", result);
+            } catch (Exception e) {
+                // 回滚事务
                 try {
-                    pstm.close();
-                } catch (SQLException e) {
-                    e.printStackTrace();
-                    throw new RuntimeException(e);
+                    conn.rollback();
+                    // 清理临时表
+                    try {
+                        stmt.execute("DROP TABLE IF EXISTS meta_hana_sales_order_temp");
+                    } catch (SQLException ex) {
+                        log.error("清理临时表失败", ex);
+                    }
+                } catch (SQLException ex) {
+                    log.error("事务回滚失败", ex);
                 }
-            }
-            if(conn!=null){
+                String name = "meta_hana_sales_order订单表同步异常";
+                userOrderAuthorService.sendMailHanaWarning(name);
+                log.error(name + ", 文件路径: " + Global.getTemdataPath() + "salesOrder/", e);
+                throw new RuntimeException(e);
+            } finally {
                 try {
-                    conn.close();
-                } catch (SQLException e) {
-                    e.printStackTrace();
-                    throw new RuntimeException(e);
+                    conn.setAutoCommit(true);
+                } catch (SQLException ex) {
+                    log.error("恢复自动提交模式失败", ex);
                 }
             }
+        } catch (Exception e) {
+            String name = "meta_hana_sales_order订单表同步异常";
+            userOrderAuthorService.sendMailHanaWarning(name);
+            log.error(name + ", 文件路径: " + Global.getTemdataPath() + "salesOrder/", e);
+            throw new RuntimeException(e);
         }
-        //测试表
-//        String testSql = "LOAD DATA LOCAL INFILE 'testIO.txt' into table sap_hana_sales_order fields terminated by '?' lines terminated by '\\n'";
-        //生产表
+
+
+
     }
 
+
+
+
     public void runLoadFreezeCustomer()throws Exception{
         //生产表
         Connection conn = null;
@@ -1426,6 +1473,7 @@ public class HanaOrderComponent {
         } catch (SQLException e) {
             String name ="meta_hana_not_freeze_customer 未冻结经销商表同步异常";
             userOrderAuthorService.sendMailHanaWarning(name);
+            log.error(name, e);
             e.printStackTrace();
         }finally {
             if(pstm!=null){
@@ -1466,6 +1514,7 @@ public class HanaOrderComponent {
         } catch (SQLException e) {
             String name ="meta_hana_self_deliver_order 自提提前过账表同步异常";
             userOrderAuthorService.sendMailHanaWarning(name);
+            log.error(name, e);
             e.printStackTrace();
         }finally {
             if(pstm!=null){
@@ -1489,46 +1538,87 @@ public class HanaOrderComponent {
 
 
     public void runLoadDeliverOrder() throws Exception{
-        Connection conn = null;
-        PreparedStatement pstm =null;
-        //测试表
-//        String testSql = "LOAD DATA LOCAL INFILE 'testIO.txt' into table sap_hana_deliver_order fields terminated by '?' lines terminated by '\\n'";
-        //生产表
-        try {
-            conn = getMysqlConnection();
-            //生产表
-            String sql = "TRUNCATE TABLE meta_hana_deliver_order";
-            pstm = conn.prepareStatement(sql);
-            pstm.execute();
-            String testSql = "LOAD DATA LOCAL INFILE 'testIO.txt' into table meta_hana_deliver_order fields terminated by '$$' lines terminated by '\\n'";
-            long beginTime=System.currentTimeMillis();
-            int rows=this.bulkLoadFromInputStream(testSql, mergeGetStream(new File((Global.getTemdataPath()+"deliverOrder/"))));
-            long endTime=System.currentTimeMillis();
-            log.info("importing deliverOrder := "+rows+" rows data into mysql and cost "+(endTime-beginTime)+" ms!");
-            String result = HttpUtils.sendGet(deliverGetDataHook);
-            log.info("清空deliverOrder缓存:"+result);
-        } catch (SQLException e) {
-            String name ="meta_hana_deliver_order 交货单表同步异常";
-            userOrderAuthorService.sendMailHanaWarning(name);
-            e.printStackTrace();
-        }finally {
-            if(pstm!=null){
+        try (
+                Connection conn = getMysqlConnection();
+                Statement stmt = conn.createStatement()
+        ) {
+            // 设置事务隔离级别
+            conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+            // 开启事务
+            conn.setAutoCommit(false);
+
+            try {
+                // 1. 创建临时表
+                stmt.execute("CREATE TABLE IF NOT EXISTS meta_hana_deliver_order_temp LIKE meta_hana_deliver_order");
+
+                // 2. 加载数据到临时表
+                String testSql = "LOAD DATA LOCAL INFILE ? into table meta_hana_deliver_order_temp fields terminated by '$$' lines terminated by '\\n'";
+                long beginTime = System.currentTimeMillis();
+                String filePath = Global.getTemdataPath() + "deliverOrder/";
+                try (InputStream inputStream = mergeGetStream(new File(filePath))) {
+                    int rows = this.bulkLoadFromInputStream(testSql, inputStream);
+                    long endTime = System.currentTimeMillis();
+                    log.info("importing deliverOrder := {} rows data into mysql from {} and cost {} ms!", rows, filePath, (endTime - beginTime));
+                }
+
+                // 3. 原子切换表(RENAME是原子操作)
+                stmt.execute("RENAME TABLE meta_hana_deliver_order TO meta_hana_deliver_order_old, " +
+                        "meta_hana_deliver_order_temp TO meta_hana_deliver_order");
+
+                // 提交事务
+                conn.commit();
+
+                // 4. 异步删除旧表
+                CompletableFuture.runAsync(() -> {
+                    try (Connection tempConn = getMysqlConnection();
+                         Statement tempStmt = tempConn.createStatement()) {
+                        tempStmt.execute("DROP TABLE IF EXISTS meta_hana_deliver_order_old");
+                        log.info("成功删除旧表: meta_hana_deliver_order_old");
+                    } catch (Exception e) {
+                        log.error("删除旧表失败,表名: meta_hana_deliver_order_old", e);
+                    }
+                }).exceptionally(ex -> {
+                    log.error("异步删除旧表操作异常", ex);
+                    return null;
+                });
+
+                // 清空缓存
+                String result = HttpUtils.sendGet(orderGetDataHook);
+                log.info("清空deliverOrder缓存:{}", result);
+            } catch (Exception e) {
+                // 回滚事务
                 try {
-                    pstm.close();
-                } catch (SQLException e) {
-                    e.printStackTrace();
-                    throw new RuntimeException(e);
+                    conn.rollback();
+                    // 清理临时表
+                    try {
+                        stmt.execute("DROP TABLE IF EXISTS meta_hana_deliver_order_temp");
+                    } catch (SQLException ex) {
+                        log.error("清理临时表失败", ex);
+                    }
+                } catch (SQLException ex) {
+                    log.error("事务回滚失败", ex);
                 }
-            }
-            if(conn!=null){
+                String name = "meta_hana_deliver_order订单表同步异常";
+                userOrderAuthorService.sendMailHanaWarning(name);
+                log.error(name + ", 文件路径: " + Global.getTemdataPath() + "deliverOrder/", e);
+                throw new RuntimeException(e);
+            } finally {
                 try {
-                    conn.close();
-                } catch (SQLException e) {
-                    e.printStackTrace();
-                    throw new RuntimeException(e);
+                    conn.setAutoCommit(true);
+                } catch (SQLException ex) {
+                    log.error("恢复自动提交模式失败", ex);
                 }
             }
+        } catch (Exception e) {
+            String name = "meta_hana_deliver_order订单表同步异常";
+            userOrderAuthorService.sendMailHanaWarning(name);
+            log.error(name + ", 文件路径: " + Global.getTemdataPath() + "deliverOrder/", e);
+            throw new RuntimeException(e);
         }
+
+
+
+
     }
 
     public static InputStream mergeGetStream(File sourceFile) throws FileNotFoundException {