利用Python将Hive查询结果保存到MySQL

Python脚本连接hive获取返回值代码

  1. #!/usr/bin/env python   
  2.   
  3. import sys  
  4.   
  5. from hive_service import ThriftHive  
  6. from hive_service.ttypes import HiveServerException  
  7. from thrift import Thrift  
  8. from thrift.transport import TSocket  
  9. from thrift.transport import TTransport  
  10. from thrift.protocol import TBinaryProtocol  
  11.   
  12. try:  
  13.     transport = TSocket.TSocket('localhost', 10000)  
  14.     transport = TTransport.TBufferedTransport(transport)  
  15.     protocol = TBinaryProtocol.TBinaryProtocol(transport)  
  16.   
  17.     client = ThriftHive.Client(protocol)  
  18.     transport.open()  
  19.   
  20.     client.execute('ADD jar /home/soft/Hadoop/hive-0.7.0/lib/hive-contrib-0.7.0.jar')  
  21.     query = '''  
  22.         select count(1) from apilog                                '''  
  23.   
  24.     client.execute(query)  
  25.     row = client.fetchOne()  
  26.     print row  
  27.   
  28.     transport.close()  
  29. except Thrift.TException, tx:  
  30.     print '%s' % (tx.message)  

此脚本支持add jar/file

用户Hive查询结果的返回值更新MySQL指定表指定字段(待修改)

  1. def mysqlExe(sql):  
  2.   
  3.         conn = MySQLdb.connect (host = "10.10.111.111",  
  4.   
  5.                                    user = "user",  
  6.   
  7.                                    passwd = "password",  
  8.   
  9.                                    db = "database")  
  10.   
  11.         cursor = conn.cursor ()  
  12.   
  13.         cursor.execute (sql)  
  14.   
  15.         cursor.close ()  
  16.   
  17.         conn.close ()  
  18.   
  19.   
  20.   
  21.   
  22. def hiveExeUpdate(sql,db,tableName,column,date):  
  23.   
  24.     try:  
  25.   
  26.        transport = TSocket.TSocket('10.20.134.199', 10000)  
  27.   
  28.        transport = TTransport.TBufferedTransport(transport)  
  29.   
  30.        protocol = TBinaryProtocol.TBinaryProtocol(transport)  
  31.   
  32.        client = ThriftHive.Client(protocol)  
  33.   
  34.        transport.open()  
  35.   
  36.        client.execute(sql)  
  37.   
  38.        update_sql= " update  " + tableName + " set " + column + " = " + client.fetchOne() + " where id = '" + date + "'"  
  39.   
  40.        mysqlExe(update_sql) //执行一条SQL语句   
  41.   
  42.        transport.close()  
  43.   
  44.        except Thrift.TException, tx:  
  45.   
  46.        print '%s' % (tx.message)