博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
mongodb数据迁移到hbase
阅读量:5241 次
发布时间:2019-06-14

本文共 5147 字,大约阅读时间需要 17 分钟。

mongodb数据迁移到hbase

  • 导入包
# encoding: utf-8'''@author: zcc@license: (C) Copyright 2013-2017, Node Supply Chain Manager Corporation Limited.@software: pycharm@file: ggsn_to_hbase.py@time: 9/1/17 2:43 PM@desc:'''from thrift.transport import TSocket, TTransportfrom thrift.transport import TTransportfrom thrift.protocol import TBinaryProtocolfrom hbase.ttypes import ColumnDescriptor, Mutation, BatchMutation, TRegionInfofrom hbase.ttypes import IOError, AlreadyExistsfrom hbase import Hbasefrom hbase.ttypes import *
  • 操作hbase的类
import structdef encode(n):   return struct.pack("i", n)class HbaseControl(object):    def __init__(self, table, col_name, host='192.168.1.10', port=9090):        self.table = table        self.host = host        self.port = port        # Connect to HBase Thrift server        self.transport = TTransport.TBufferedTransport(TSocket.TSocket(host, port))        self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)        # Create and open the client connection        self.client = Hbase.Client(self.protocol)        self.transport.open()        # set type and field of column families        self.set_column_families(col_name)        self._build_column_families()    def set_column_families(self, col_list=['name', 'sex', 'age']):        '''        设置每列名称和属性        :param self:         :param type_list:         :param col_list:         :return:         '''        self.columnFamilies = col_list    def _build_column_families(self):        '''        如果hbase中没有当前表,则建立        :param self:         :return:         '''        tables = self.client.getTableNames()        if self.table not in tables:            self.__create_table(self.table)    def __create_table(self, table):        '''        在hbase中建表        :param self:         :param table:         :return:         '''        columnFamilies = []        for columnFamily in self.columnFamilies:            name = Hbase.ColumnDescriptor(name=columnFamily)            columnFamilies.append(name)        self.client.createTable(table, columnFamilies)    def del_row(self, row_key):        '''        删除行        :param row_key:         :return:         '''        self.client.deleteAllRow(self.table, row_key, {})    def __del__(self):        '''        销毁对象前关闭hbase链接        :return:         '''        self.transport.close()    def _del_table(self, table):        '''        删除hbase中的表        :param table:         :return:         '''        self.client.disableTable(table)        self.client.deleteTable(table)    def getColumnDescriptors(self):        '''        获取hbase表的列簇描述        :return:         '''        return self.client.getColumnDescriptors(self.table)    def put(self, record, day):        '''        向hbase中插入一条记录        :param record:         :return:         '''        assert isinstance(record, dict)        mutations = []        #   tel和日期构成hbase内的行名        row_key = '{0}_{1}'.format(record['tel'], day)        #   插入tel        mutations.append(Hbase.Mutation(column='baseinfo:tel', value=str(record['tel'])))        #   插入day        mutations.append(Hbase.Mutation(column='baseinfo:day', value=str(day)))        #   插入suminfo        mutations.append(Hbase.Mutation(column='suminfo:context', value=str(record['suminfo'])))        self.client.mutateRow(self.table, row_key, mutations, {})    def puts(self, records, day):        '''        hbase批量插入        :param records:         :param day:         :return:         '''        assert isinstance(records, list)        mutationsBatch = []        for record in records:            mutations = []            #   tel和日期构成hbase内的行名            row_key = '{0}_{1}'.format(record['tel'], day)            #   插入tel            mutations.append(Hbase.Mutation(column='baseinfo:tel', value=str(record['tel'])))            #   插入day            mutations.append(Hbase.Mutation(column='baseinfo:day', value=str(day)))            # 插入suminfo            mutations.append(Hbase.Mutation(column='suminfo:context', value=str(record['suminfo'])))            mutationsBatch.append(Hbase.BatchMutation(row=row_key, mutations=mutations))        self.client.mutateRows(self.table, mutationsBatch, {})
  • 操作mongodb且到将数据导入到hbase的类
from pymongo import MongoClientclass MongDBControl(object):    def __init__(self, table_name, host='192.168.1.20', port=27017):        self.client = MongoClient(host, port)        db = self.client.table        self.collect = db[table_name]        self.table = table_name    def __del__(self):        self.client.close()    def record_to_hbase(self, hc):        assert isinstance(hc, HbaseControl)        num = 0        while True:            records = self.collect.find().skip(1000*num).limit(1000)            if not records: break            hc.puts(list(records), self.table)            num += 1            print '已经从mongodb向hbase导入{0}条数据!!'.format(num*1000)        print '数据迁移完毕!!!'
  • 主函数
if __name__ == '__main__':    if 1:        hc = HbaseControl(table='table', col_name=['baseinfo', 'count', 'suminfo', 'nodebll', 'nodebzl'])        mc = MongDBControl('20170806')        # mc.record_to_hbase(hc)        hc._del_table('table')

转载于:https://www.cnblogs.com/crazysquirrel/p/7471641.html

你可能感兴趣的文章
POJ 2289——Jamie's Contact Groups——————【多重匹配、二分枚举匹配次数】
查看>>
java 得到以后的日期
查看>>
[Kaggle] Sentiment Analysis on Movie Reviews
查看>>
python安装easy_intall和pip
查看>>
HDU1004
查看>>
MySQL高速缓存
查看>>
DropdownList绑定的两种方法
查看>>
价值观
查看>>
数值计算中,浮点类型给我们挖的坑
查看>>
(String)、toString、String.valueOf
查看>>
mongodb命令----批量更改文档字段名
查看>>
python多线程下载网页图片并保存至特定目录
查看>>
《人工智能的未来》--------------经典语录
查看>>
了解循环队列的实现
查看>>
CentOS 简单命令
查看>>
Linux中修改docker镜像源及安装docker
查看>>
数位dp(模板+例题)
查看>>
javascript中强制类型转换
查看>>
python学习笔记
查看>>
php+ajax(jquery)的文件异步上传
查看>>