记一次HBASE数据跨集群迁移

填坑过程记录

Posted by xuefly on July 6, 2019

背景

考虑到用户数据的监管需求,必须将HBASE数据迁移。 数据特点:

  • T级别。要支撑线上业务,只迁移若干table中的一个。
  • HBASE表的数据是通过消费kafka数据,调用Phoenix, 落入HBASE

迁移方向选择:

  • 需要攻克的方案。直接迁移HBASE的表到新集群。跨国迁移的VPN带宽有限,先从aws的HBASE==>aws==>HDFS==>aws s3==>aliyun HDFS ==>aliyun hbase
  • 备用方案。等kafka迁到新集群后,在新集群重新消费一遍到HBASE。

调研

参考:各个方案说明

  • 停止HBASE 服务
    • DistCp
  • 不用停HBASE服务
    • copytable
    • export
    • Replication

考虑到HBASE服务不必停止,只停止table的写入即可,快照不是必须,选择export方案。

准备:

  1. 生成测试数据:python3 generate_hbase_data.py /home/ubuntu/data/striver/hbase_data.txt 10000000
  2. 数据存至hdfs:hdfs dfs -put /home/ubuntu/data/striver/hbase_data.txt /tmp
  3. 建表:
    hbse shell
    create 'default:people', {NAME=>'basic_info'}, {NAME=>'other_info'}, SPLITS=>['10|','20|','30|','40|','50|','60|','70|','80|','90|']
    

具体过程:

  1. 防止数据不一致,在操作过程中,亚马逊HBASE表不再写入(disable ‘table’执行与否不影响操作),需要在阿里云HBASE上先创建好表
  2. hbase tabel export to hdfs hbase org.apache.hadoop.hbase.mapreduce.Export people /tmp/people
  3. hdfs to s3 nohup hadoop distcp -Dfs.s3a.access.key=Akkkkk -Dfs.s3a.secret.key=x8Lkkkkkk -update -delete /tmp/people s3a://aku-hdfs-backup/test/hbase/people > /tmp/people.log 2>&1 &
  4. s3 to aliyun hdfs hadoop distcp -Dmapred.job.name=hbase_test_people -Dfs.s3a.access.key=Akkkkk -Dfs.s3a.secret.key=x8Lkkkkkk -Dmapreduce.map.memory.mb=6120 -Dyarn.app.mapreduce.am.resource.mb=4096 -bandwidth 500 -m 40 -numListstatusThreads 40 -update -delete -strategy dynamic s3a://aku-hdfs-backup/test/hbase/people /tmp/people ;
  5. aliyun hdfs toaliyun hbase table hbase org.apache.hadoop.hbase.mapreduce.Import people /tmp/people -Dmapreduce.map.speculative=false -Dmapreduce.reduce.speculative=false -Dimport.wal.durability =SKIP_WAL

测试

1.导出hdfs
hbase org.apache.hadoop.hbase.mapreduce.Export -D mapred.output.compress=true ACL_USERACTIONLOG /tmp/ACL_USERACTIONLOG
参数解析:
	-D mapred.output.compress=true 导出数据压缩,可缩减十倍,同时间接提升import速度一倍
	-Dhbase.export.scanner.batch=2000 若某个row数据量过大导致timeout错误,可加此参数限制
2.传输
hadoop distcp  \
-Dmapred.job.name=hbase_acl_useractionlog \
-Dmapreduce.map.memory.mb=6120  \
-Dyarn.app.mapreduce.am.resource.mb=4096  \
-bandwidth 500 \
-m 40   \
-numListstatusThreads 40 \
/tmp/ACL_USERACTIONLOG \
hdfs://172.31.17.54:8020/tmp/ACL_USERACTIONLOG
3.先建好phoenix表后,导入数据
hbase org.apache.hadoop.hbase.mapreduce.Import ACL_USERACTIONLOG  /tmp/ACL_USERACTIONLOG   -Dmapreduce.map.speculative=false   -Dmapreduce.reduce.speculative=false -Dimport.wal.durability =SKIP_WAL

问题:

值不一致的情况

直接在HBASE shell中查看也是负的。最终通过测试发现:

只有主键为最原始的单列的情况下,解码的出的值跟phoenix查出来的一致。当使用了联合主键或者salt backet之后就不能还原出原值了。

附解码字节编码为值的方法:

用Pythonhbase中查看被编码的中文:

print '\x7F\xFF\xFF\xFF\xFF\x15'.decode('utf-8')

查看被编码的主键的值

hbase(main):005:0* scan 'USERACTIONLOG_NEW', {'LIMIT' => 5}
ROW                                COLUMN+CELL
 \x00\x80\x00\x00\x00\x00\x00\x00e column=0:\x00\x00\x00\x00, timestamp=1504153335068, value=x
 \x80\x00\x01^6\x86u\x1C
 \x00\x80\x00\x00\x00\x00\x00\x00e column=0:\x80\x0B, timestamp=1504153335068, value=00000000-4873-4a79-0000-000000c1924f
 \x80\x00\x01^6\x86u\x1C
 \x00\x80\x00\x00\x00\x00\x00\x00e column=0:\x80\x0C, timestamp=1504153335068, value=\x80\x00\x00\x04
 \x80\x00\x01^6\x86u\x1C
 \x00\x80\x00\x00\x00\x00\x00\x00e column=0:\x80\x0D, timestamp=1504153335068, value=2.0.5

#:Decode a long value (such as of time type) using Phoenix 4.4.x
hbase(main):007:0> import org.apache.phoenix.schema.types.PLong
=> Java::OrgApachePhoenixSchemaTypes::PLong
hbase(main):008:0> import org.apache.phoenix.schema.SortOrder
=> Java::OrgApachePhoenixSchema::SortOrder
hbase(main):009:0> PLong::INSTANCE.getCodec.decodeLong("\x80\x00\x01N\xA5`H\xA6".to_java_bytes, 0, SortOrder::ASC)
=> 1437293627558
hbase(main):010:0> PLong::INSTANCE.getCodec.decodeLong("\x80\x00\x01N\xA5`H\xA5".to_java_bytes, 0, SortOrder::ASC)
=> 1437293627557
hbase(main):011:0>

实战

准备工作

  1. 业务方停止1特征程序停止
  2. Phoenix表USERACTIONLOG_NEW和USERACTIONLOGUID0_NEW停止写入,确认写入停止后,记录当前mysql中的offset

过程

aws hbase ==> aws hdfs

  1. 启动Phoenix两张表的export数据脚本
    nohup /home/ubuntu/data/striver/useractionlog3.sh > /home/ubuntu/data/striver/useractionlog3.log &
    nohup /home/ubuntu/data/striver/useractionlog3uid0.sh > /home/ubuntu/data/striver/useractionlog3uid0.log &
    
  2. 监控export脚本100%完成 tail -f /home/ubuntu/data/striver/useractionlog3.log tail -f /home/ubuntu/data/striver/useractionlog3uid0.log

  3. Phoenix表USERACTIONLOG_NEW和USERACTIONLOGUID0_NEW继续写入,同步数据至最新 修改status后,脚本自动调起任务: update yarn_apps_watcher set status =1 where app_name = ‘kafka-hbase-UserActionLog3’; update yarn_apps_watcher set status =1 where app_name = ‘kafka-hbase-UserActionLog3Uid0’;
  4. 特征程序启动(@土豆)
  5. 导出结束 8.

    aws hdfs ==> aws s3

    nohup hadoop distcp -Dfs.s3a.access.key=Akkkkk  -Dfs.s3a.secret.key=x8Lkkkkkk  -update -delete /tmp/USERACTIONLOGUID0_NEW  s3a://aku-hdfs-backup/test/hbase/USERACTIONLOGUID0_NEW   > /tmp/useractionloguid0.log
    nohup hadoop distcp -Dfs.s3a.access.key=Akkkkk  -Dfs.s3a.secret.key=x8Lkkkkkk -Dyarn.app.mapreduce.am.resource.mb=4096 -Dmapreduce.map.maxattempts=100  -bandwidth 1000 -m 60   -numListstatusThreads 40 -update -delete /tmp/USERACTIONLOG_NEW  s3a://aku-hdfs-backup/hbase/data/USERACTIONLOG_NEW   > /tmp/useractionlog.log
    

    经常失败,为了充分发挥Hadoop的容错能力,修改相关容错参数,参考:hadoop 优化配置 结果遇到下面问题:

    问题:使用distcp传文件到s3,会在任务所在节点的本地磁盘/tmp/hadoop-yarn/s3a缓存数据,造成一半节点系统盘写满的惨状

尝试方法:

  • 指定Hadoop运行的缓存目录到数据盘 通过查找这个目录的作用,发现是缓存目录,更改缓存目录,创建了目录/home/ubuntu/data/buffer-dir,修改权限为777。执行命令时添加选项:-Dfs.s3a.buffer.dir=/home/ubuntu/data/buffer-dir。没有走成。
  • 大文件问题:怎么拆分
  • 最终成功的方法:修改配置重启服务。 解决方法:设置fs.s3a.fast.upload 为true 参考 这样可以避免缓存到本地,而是直接从内存推到s3.

aws s3 ==> aliyun hdfs

ssh 172.22.8.89(aliyun master4)
hadoop distcp  -Dmapred.job.name=hbase_test_people -Dfs.s3a.access.key=Akkkkk  -Dfs.s3a.secret.key=x8Lkkkkkk -Dmapreduce.map.memory.mb=6120  -Dyarn.app.mapreduce.am.resource.mb=4096  -bandwidth 500 -m 40   -numListstatusThreads 40   -update -delete -strategy dynamic  s3a://aku-hdfs-backup/test/hbase/USERACTIONLOGUID0_NEW    /tmp/USERACTIONLOGUID0_NEW
hadoop distcp  -Dmapred.job.name=hbase_useractionlog_new -Dfs.s3a.access.key=Akkkkk  -Dfs.s3a.secret.key=x8Lkkkkkk -Dmapreduce.map.memory.mb=6120  -Dyarn.app.mapreduce.am.resource.mb=4096  -bandwidth 500 -m 40   -numListstatusThreads 40   -update -delete -strategy dynamic  s3a://aku-hdfs-backup/hbase/data/USERACTIONLOG_NEW    /tmp/USERACTIONLOG_NEW

在走公网的情况下,每个节点单独的带宽都是一定的,在越多的节点上并行运行map任务,导入速度就越快。

aliyun hdfs ==> aliyun phoenix

nohup hbase org.apache.hadoop.hbase.mapreduce.Import USERACTIONLOGUID0_NEW  /tmp/USERACTIONLOGUID0_NEW   -Dmapreduce.map.speculative=false   -Dmapreduce.reduce.speculative=false -Dimport.wal.durability =SKIP_WAL > /tmp/uid0_import.log &
nohup hbase org.apache.hadoop.hbase.mapreduce.Import USERACTIONLOG_NEW /tmp/USERACTIONLOG_NEW -Dmapreduce.map.speculative=false -Dmapreduce.reduce.speculative=false -Dimport.wal.durability =SKIP_WAL > /tmp/actionlog_import.log

在越多的regionserver上导入数据就越快,所以进行预分区。否则开始只会用到少数几个regionsever 建表语句:

-- USERACTIONLOGUID0_NEW
CREATE TABLE USERACTIONLOGUID0_NEW ( uid BIGINT , device_id VARCHAR not null, device_type INTEGER, app_version VARCHAR, screen_num INTEGER, screen_action INTEGER, screen_value VARCHAR, control_num INTEGER, control_action VARCHAR, control_type VARCHAR, control_value VARCHAR, device_time BIGINT, adjust_time BIGINT not null, country_id INTEGER, kafka_time BIGINT CONSTRAINT PK PRIMARY KEY (device_id,adjust_time)) SPLIT ON('00000000-00','00000000-01','00000000-03','00000000-05','00000000-07','00000000-08','00000000-09','00000000-0a','00000000-0b','00000000-0c','00000000-0d','00000000-0e','00000000-0f','00000000-11','00000000-13','00000000-14','00000000-15','00000000-16','00000000-17','00000000-18','00000000-19','00000000-1b','00000000-1d','00000000-1f','00000000-20','00000000-21','00000000-22','00000000-23','00000000-25','00000000-26','00000000-27','00000000-29','00000000-2a','00000000-2c','00000000-2d','00000000-2e','00000000-30','00000000-31','00000000-32','00000000-34','00000000-36','00000000-37','00000000-38','00000000-39','00000000-3a','00000000-3b','00000000-3c','00000000-3d','00000000-3e','00000000-40','00000000-41','00000000-42','00000000-43','00000000-44','00000000-45','00000000-46','00000000-47','00000000-48','00000000-4a','00000000-4b','00000000-4c','00000000-4d','00000000-4e','00000000-4f','00000000-50','00000000-52','00000000-53','00000000-54','00000000-56','00000000-57','00000000-58','00000000-5a','00000000-5b','00000000-5c','00000000-5d','00000000-5e','00000000-5f','00000000-61','00000000-63','00000000-65','00000000-66','00000000-67','00000000-69','00000000-6b','00000000-6d','00000000-6e','00000000-6f','00000000-70','00000000-71','00000000-72','00000000-73','00000000-75','00000000-76','00000000-77','00000000-78','00000000-79','00000000-7a','00000000-7b','00000000-7c','00000000-7d','00000000-7e','00000000-7f','ffffffff-80','ffffffff-81','ffffffff-83','ffffffff-84','ffffffff-85','ffffffff-87','ffffffff-88','ffffffff-89','ffffffff-8a','ffffffff-8b','ffffffff-8c','ffffffff-8d','ffffffff-8e','ffffffff-8f','ffffffff-91','ffffffff-92','ffffffff-93','ffffffff-94','ffffffff-95','ffffffff-97','ffffffff-98','ffffffff-99','ffffffff-9b','ffffffff-9c','ffffffff-9d','ffffffff-a0','ffffffff-a2','ffffffff-a3','ffffffff-a4','ffffffff-a5','ffffffff-a6','ffffffff-a8','ffffffff-a9','ffffffff-aa','ffffffff-ab','ffffffff-ac','ffffffff-ae','ffffffff-b0','ffffffff-b1','ffffffff-b2','ffffffff-b4','ffffffff-b6','ffffffff-b7','ffffffff-b8','ffffffff-ba','ffffffff-bb','ffffffff-bc','ffffffff-bd','ffffffff-be','ffffffff-c0','ffffffff-c2','ffffffff-c3','ffffffff-c4','ffffffff-c5','ffffffff-c6','ffffffff-c8','ffffffff-ca','ffffffff-cb','ffffffff-cc','ffffffff-cd','ffffffff-ce','ffffffff-d0','ffffffff-d2','ffffffff-d4','ffffffff-d5','ffffffff-d7','ffffffff-d9','ffffffff-da','ffffffff-db','ffffffff-dd','ffffffff-de','ffffffff-df','ffffffff-e0','ffffffff-e1','ffffffff-e2','ffffffff-e3','ffffffff-e5','ffffffff-e7','ffffffff-e9','ffffffff-eb','ffffffff-ec','ffffffff-ed','ffffffff-ef','ffffffff-f1','ffffffff-f2','ffffffff-f3','ffffffff-f4','ffffffff-f5','ffffffff-f7','ffffffff-f8','ffffffff-f9','ffffffff-fa','ffffffff-fb','ffffffff-fd','ffffffff-fe');

-- ACTIONLOG_NEW建表语句:
CREATE TABLE USERACTIONLOG_NEW ( uid BIGINT not null, device_id VARCHAR, device_type INTEGER, app_version VARCHAR, screen_num INTEGER, screen_action INTEGER, screen_value VARCHAR, control_num INTEGER, control_action VARCHAR, control_type VARCHAR, control_value VARCHAR, device_time BIGINT, adjust_time BIGINT not null, country_id INTEGER, kafka_time BIGINT CONSTRAINT PK PRIMARY KEY (uid,adjust_time)) SALT_BUCKETS = 34;

这一步坑太多:

当写入过快时会遇见什么问题?

写入过快时,memstore的水位会马上被推高。 你可能会看到以下类似日志: RegionTooBusyException: Above memstore limit, regionName=xxxxx ... 这个是Region的memstore占用内存大小超过正常的4倍,这时候会抛异常,写入请求会被拒绝,客户端开始重试请求。

当达到128M的时候会触发flush memstore,当达到128M * 4还没法触发flush时候会抛异常来拒绝写入。 两个相关参数的默认值如下:

hbase.hregion.memstore.flush.size=128M
hbase.hregion.memstore.block.multiplier=4

或者这样的日志: regionserver.MemStoreFlusher: Blocking updates on hbase.example.host.com,16020,1522286703886: the global memstore size 1.3 G is >= than blocking 1.3 G sizeregionserver.MemStoreFlusher: Memstore is above high water mark and block 528ms 这是所有region的memstore内存总和开销超过配置上限,默认是配置heap的40%,这会导致写入被阻塞。目的是等待flush的线程把内存里的数据flush下去,否则继续允许写入memestore会把内存写爆

hbase.regionserver.global.memstore.upperLimit=0.4 # 较旧版本,新版本兼容
hbase.regionserver.global.memstore.size=0.4 # 新版本

当写入被阻塞,队列会开始积压,如果运气不好最后会导致OOM,你可能会发现JVM由于OOM crash或者看到如下类似日志:ipc.RpcServer: /192.168.x.x:16020 is unable to read call parameter from client 10.47.x.xjava.lang.OutOfMemoryError: Java heap spaceHBase 这里我认为有个很不好的设计,捕获了OOM异常却没有终止进程。这时候进程可能已经没法正常运行下去了,你还会在日志里发现很多其它线程也抛OOM异常。比如stop可能根本stop不了,RS可能会处于一种僵死状态。

如何避免RS OOM?

  1. 加快flush速度:
    hbase.hstore.blockingWaitTime = 90000
    mshbase.hstore.flusher.count = 2
    hbase.hstore.blockingStoreFiles = 10
    

    当达到hbase.hstore.blockingStoreFiles配置上限时,会导致flush阻塞等到compaction工作完成。阻塞时间是hbase.hstore.blockingWaitTime,可以改小这个时间。hbase.hstore.flusher.count可以根据机器型号去配置,可惜这个数量不会根据写压力去动态调整,配多了,非导入数据多场景也没用,改配置还得重启。

  2. 同样的道理,如果flush加快,意味这compaction也要跟上,不然文件会越来越多,这样scan性能会下降,开销也会增大。
    hbase.regionserver.thread.compaction.small = 1
    hbase.regionserver.thread.compaction.large = 1
    

    增加compaction线程会增加CPU和带宽开销,可能会影响正常的请求。如果不是导入数据,一般而言是够了。好在这个配置在云HBase内是可以动态调整的,不需要重启。

  3. 上述配置都需要人工干预,如果干预不及时server可能已经OOM了,这时候有没有更好的控制方法?
    hbase.ipc.server.max.callqueue.size = 1024 * 1024 * 1024 # 1G
    

    直接限制队列堆积的大小。在CDH中没找到这个配置。

  4. Java Heap Size of HBase RegionServer in Bytes

当堆积到一定程度后,事实上后面的请求等不到server端处理完,可能客户端先超时了。并且一直堆积下去会导致OOM,1G的默认配置需要相对大内存的型号。当达到queue上限,客户端会收到CallQueueTooBigException 然后自动重试。通过这个可以防止写入过快时候把server端写爆,有一定反压作用。线上使用这个在一些小型号稳定性控制上效果不错。

  1. 增大yarn的container最小内存从1G到3G。

数据验证

总结

好的也是莫名其妙。优化人崩溃,只为最大的满足业务的迁移时间需求。