Java多线程向数据库写入数据

任务: 从sqlserver中将一个表A(约16W条数据)导到mysql中对应的一个表B中。

思路:分段获取A表中的数据后,用多个线程同时向B表中写入。

关键代码

//将数据库中的数据条数分段
 public void division(){
  //获取要导入的总的数据条数
  String sql3="SELECT  count(*)  FROM [CMD].[dbo].[mycopy1]";
  try {
   pss=cons.prepareStatement(sql3);
   rss=pss.executeQuery();
   
   while(rss.next()){
   System.out.println("总记录条数:"+rss.getInt(1));
   sum=rss.getInt(1);
   }
   //每30000条记录作为一个分割点
   if(sum>=30000){
    n=sum/30000;
    residue=sum%30000;
   }else{
    residue=sum;
   }
   
   System.out.println(n+"  "+residue);
   
  } catch (SQLException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
 
 }

线程类

public MyThread(int start,int end) {
  this.end=end;
     this.start=start;
  System.out.println("处理掉余数");
    try {
   
         System.out.println("--------"+Thread.currentThread().getName()+"------------");
    Class.forName(SQLSERVERDRIVER);
    System.out.println("加载sqlserver驱动...");
    cons = DriverManager.getConnection(CONTENTS,UNS,UPS);
    stas = cons.createStatement();
    System.out.println("连接SQLServer数据库成功!!");
   
    System.out.println("加载mysql驱动.....");
    Class.forName(MYSQLDRIVER);
    con = DriverManager.getConnection(CONTENT,UN,UP);
    sta = con.createStatement();
    // 关闭事务自动提交
    con.setAutoCommit(false);
    System.out.println("连接mysql数据库成功!!");
   
   } catch (Exception e) {
    e.printStackTrace(); 
   }
  // TODO Auto-generated constructor stub
 }

 

 

 public ArrayList<Member> getAll(){
  Member member;
  String sql1="select * from (select row_number() over (order by pmcode) as rowNum,*" +
    " from [CMD].[dbo].[mycopy1]) as t where rowNum between "+start+" and "+end;
  try {
   System.out.println("正在获取数据...");
   allmembers=new ArrayList();
   rss=stas.executeQuery(sql1);
   while(rss.next()){
    member=new Member();
    member.setAddress1(rss.getString("address1"));
    member.setBnpoints(rss.getString("bnpoints"));
    member.setDbno(rss.getString("dbno"));
    member.setExpiry(rss.getString("expiry"));
    member.setHispoints(rss.getString("hispoints"));
    member.setKypoints(rss.getString("kypoints"));
    member.setLevels(rss.getString("levels"));
    member.setNames(rss.getString("names"));
    member.setPmcode(rss.getString("pmcode"));
    member.setRemark(rss.getString("remark"));
    member.setSex(rss.getString("sex"));
    member.setTelephone(rss.getString("telephone"));
    member.setWxno(rss.getString("wxno"));
    member.setPmdate(rss.getString("pmdate"));
    allmembers.add(member);
   // System.out.println(member.getNames());
   }
   System.out.println("成功获取sqlserver数据库数据!");
   return allmembers;
   
  } catch (SQLException e) {
   // TODO Auto-generated catch block
   System.out.println("获取sqlserver数据库数据发送异常!");
   e.printStackTrace();
  }
  try {
   rss.close();
   stas.close();
  } catch (SQLException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
  return null;
 }
 
 public void inputAll(ArrayList<Member> allmembers){
  System.out.println("开始向mysql中写入");
  String sql2="insert into test.mycopy2 values (?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
  try {
   ps=con.prepareStatement(sql2);
   System.out.println("-------------------------等待写入数据条数: "+allmembers.size());
   for(int i=0;i<allmembers.size();i++){
    ps.setString(1, allmembers.get(i).getPmcode());
    ps.setString(2, allmembers.get(i).getNames());
    //System.out.println(allmembers.get(i).getNames());
    ps.setString(3, allmembers.get(i).getSex());
    ps.setString(4, allmembers.get(i).getTelephone());
    ps.setString(5, allmembers.get(i).getAddress1());
    ps.setString(6, allmembers.get(i).getPmdate());
    ps.setString(7, allmembers.get(i).getExpiry());
    ps.setString(8, allmembers.get(i).getLevels());
    ps.setString(9, allmembers.get(i).getDbno());
    ps.setString(10, allmembers.get(i).getHispoints());
    ps.setString(11, allmembers.get(i).getBnpoints());
    ps.setString(12, allmembers.get(i).getKypoints());
    ps.setString(13, allmembers.get(i).getWxno());
    ps.setString(14, allmembers.get(i).getRemark());
    //插入命令列表
    //ps.addBatch();
    ps.executeUpdate();
   }
   //ps.executeBatch();
   con.commit();
   
   ps.close();
   con.close();
   this.flag=false;
   System.out.println(Thread.currentThread().getName()+"--->OK");
  } catch (SQLException e) {
   // TODO Auto-generated catch block
   System.out.println("向mysql中更新数据时发生异常!");
   e.printStackTrace(); 
  }
 }

 @Override
 public void run() {
  // TODO Auto-generated method stub

  while(true&&flag){
   this.inputAll(getAll());
  }
 }

相关阅读:

相关推荐