mongodb数据迁移到hbase
# encoding: utf-8'''@author: zcc@license: (C) Copyright 2013-2017, Node Supply Chain Manager Corporation Limited.@software: pycharm@file: ggsn_to_hbase.py@time: 9/1/17 2:43 PM@desc:'''from thrift.transport import TSocket, TTransportfrom thrift.transport import TTransportfrom thrift.protocol import TBinaryProtocolfrom hbase.ttypes import ColumnDescriptor, Mutation, BatchMutation, TRegionInfofrom hbase.ttypes import IOError, AlreadyExistsfrom hbase import Hbasefrom hbase.ttypes import *
import structdef encode(n): return struct.pack("i", n)class HbaseControl(object): def __init__(self, table, col_name, host='192.168.1.10', port=9090): self.table = table self.host = host self.port = port # Connect to HBase Thrift server self.transport = TTransport.TBufferedTransport(TSocket.TSocket(host, port)) self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport) # Create and open the client connection self.client = Hbase.Client(self.protocol) self.transport.open() # set type and field of column families self.set_column_families(col_name) self._build_column_families() def set_column_families(self, col_list=['name', 'sex', 'age']): ''' 设置每列名称和属性 :param self: :param type_list: :param col_list: :return: ''' self.columnFamilies = col_list def _build_column_families(self): ''' 如果hbase中没有当前表,则建立 :param self: :return: ''' tables = self.client.getTableNames() if self.table not in tables: self.__create_table(self.table) def __create_table(self, table): ''' 在hbase中建表 :param self: :param table: :return: ''' columnFamilies = [] for columnFamily in self.columnFamilies: name = Hbase.ColumnDescriptor(name=columnFamily) columnFamilies.append(name) self.client.createTable(table, columnFamilies) def del_row(self, row_key): ''' 删除行 :param row_key: :return: ''' self.client.deleteAllRow(self.table, row_key, {}) def __del__(self): ''' 销毁对象前关闭hbase链接 :return: ''' self.transport.close() def _del_table(self, table): ''' 删除hbase中的表 :param table: :return: ''' self.client.disableTable(table) self.client.deleteTable(table) def getColumnDescriptors(self): ''' 获取hbase表的列簇描述 :return: ''' return self.client.getColumnDescriptors(self.table) def put(self, record, day): ''' 向hbase中插入一条记录 :param record: :return: ''' assert isinstance(record, dict) mutations = [] # tel和日期构成hbase内的行名 row_key = '{0}_{1}'.format(record['tel'], day) # 插入tel mutations.append(Hbase.Mutation(column='baseinfo:tel', value=str(record['tel']))) # 插入day mutations.append(Hbase.Mutation(column='baseinfo:day', value=str(day))) # 插入suminfo mutations.append(Hbase.Mutation(column='suminfo:context', value=str(record['suminfo']))) self.client.mutateRow(self.table, row_key, mutations, {}) def puts(self, records, day): ''' hbase批量插入 :param records: :param day: :return: ''' assert isinstance(records, list) mutationsBatch = [] for record in records: mutations = [] # tel和日期构成hbase内的行名 row_key = '{0}_{1}'.format(record['tel'], day) # 插入tel mutations.append(Hbase.Mutation(column='baseinfo:tel', value=str(record['tel']))) # 插入day mutations.append(Hbase.Mutation(column='baseinfo:day', value=str(day))) # 插入suminfo mutations.append(Hbase.Mutation(column='suminfo:context', value=str(record['suminfo']))) mutationsBatch.append(Hbase.BatchMutation(row=row_key, mutations=mutations)) self.client.mutateRows(self.table, mutationsBatch, {})
from pymongo import MongoClientclass MongDBControl(object): def __init__(self, table_name, host='192.168.1.20', port=27017): self.client = MongoClient(host, port) db = self.client.table self.collect = db[table_name] self.table = table_name def __del__(self): self.client.close() def record_to_hbase(self, hc): assert isinstance(hc, HbaseControl) num = 0 while True: records = self.collect.find().skip(1000*num).limit(1000) if not records: break hc.puts(list(records), self.table) num += 1 print '已经从mongodb向hbase导入{0}条数据!!'.format(num*1000) print '数据迁移完毕!!!'
if __name__ == '__main__': if 1: hc = HbaseControl(table='table', col_name=['baseinfo', 'count', 'suminfo', 'nodebll', 'nodebzl']) mc = MongDBControl('20170806') # mc.record_to_hbase(hc) hc._del_table('table')