背景
考虑到用户数据的监管需求,必须将HBASE数据迁移。 数据特点:
- T级别。要支撑线上业务,只迁移若干table中的一个。
- HBASE表的数据是通过消费kafka数据,调用Phoenix, 落入HBASE
迁移方向选择:
- 需要攻克的方案。直接迁移HBASE的表到新集群。跨国迁移的VPN带宽有限,先从aws的HBASE==>aws==>HDFS==>aws s3==>aliyun HDFS ==>aliyun hbase
- 备用方案。等kafka迁到新集群后,在新集群重新消费一遍到HBASE。
调研
参考:各个方案说明
- 停止HBASE 服务
- DistCp
- 不用停HBASE服务
- copytable
- export
- Replication
考虑到HBASE服务不必停止,只停止table的写入即可,快照不是必须,选择export方案。
准备:
- 生成测试数据:
python3 generate_hbase_data.py /home/ubuntu/data/striver/hbase_data.txt 10000000
- 数据存至hdfs:
hdfs dfs -put /home/ubuntu/data/striver/hbase_data.txt /tmp
- 建表:
hbse shell create 'default:people', {NAME=>'basic_info'}, {NAME=>'other_info'}, SPLITS=>['10|','20|','30|','40|','50|','60|','70|','80|','90|']
具体过程:
- 防止数据不一致,在操作过程中,亚马逊HBASE表不再写入(disable ‘table’执行与否不影响操作),需要在阿里云HBASE上先创建好表
- hbase tabel export to hdfs
hbase org.apache.hadoop.hbase.mapreduce.Export people /tmp/people
- hdfs to s3
nohup hadoop distcp -Dfs.s3a.access.key=Akkkkk -Dfs.s3a.secret.key=x8Lkkkkkk -update -delete /tmp/people s3a://aku-hdfs-backup/test/hbase/people > /tmp/people.log 2>&1 &
- s3 to aliyun hdfs
hadoop distcp -Dmapred.job.name=hbase_test_people -Dfs.s3a.access.key=Akkkkk -Dfs.s3a.secret.key=x8Lkkkkkk -Dmapreduce.map.memory.mb=6120 -Dyarn.app.mapreduce.am.resource.mb=4096 -bandwidth 500 -m 40 -numListstatusThreads 40 -update -delete -strategy dynamic s3a://aku-hdfs-backup/test/hbase/people /tmp/people ;
- aliyun hdfs toaliyun hbase table
hbase org.apache.hadoop.hbase.mapreduce.Import people /tmp/people -Dmapreduce.map.speculative=false -Dmapreduce.reduce.speculative=false -Dimport.wal.durability =SKIP_WAL
测试
1.导出hdfs
hbase org.apache.hadoop.hbase.mapreduce.Export -D mapred.output.compress=true ACL_USERACTIONLOG /tmp/ACL_USERACTIONLOG
参数解析:
-D mapred.output.compress=true 导出数据压缩,可缩减十倍,同时间接提升import速度一倍
-Dhbase.export.scanner.batch=2000 若某个row数据量过大导致timeout错误,可加此参数限制
2.传输
hadoop distcp \
-Dmapred.job.name=hbase_acl_useractionlog \
-Dmapreduce.map.memory.mb=6120 \
-Dyarn.app.mapreduce.am.resource.mb=4096 \
-bandwidth 500 \
-m 40 \
-numListstatusThreads 40 \
/tmp/ACL_USERACTIONLOG \
hdfs://172.31.17.54:8020/tmp/ACL_USERACTIONLOG
3.先建好phoenix表后,导入数据
hbase org.apache.hadoop.hbase.mapreduce.Import ACL_USERACTIONLOG /tmp/ACL_USERACTIONLOG -Dmapreduce.map.speculative=false -Dmapreduce.reduce.speculative=false -Dimport.wal.durability =SKIP_WAL
问题:
直接在HBASE shell中查看也是负的。最终通过测试发现:
只有主键为最原始的单列的情况下,解码的出的值跟phoenix查出来的一致。当使用了联合主键或者salt backet之后就不能还原出原值了。
附解码字节编码为值的方法:
用Pythonhbase中查看被编码的中文:
print '\x7F\xFF\xFF\xFF\xFF\x15'.decode('utf-8')
查看被编码的主键的值
hbase(main):005:0* scan 'USERACTIONLOG_NEW', {'LIMIT' => 5}
ROW COLUMN+CELL
\x00\x80\x00\x00\x00\x00\x00\x00e column=0:\x00\x00\x00\x00, timestamp=1504153335068, value=x
\x80\x00\x01^6\x86u\x1C
\x00\x80\x00\x00\x00\x00\x00\x00e column=0:\x80\x0B, timestamp=1504153335068, value=00000000-4873-4a79-0000-000000c1924f
\x80\x00\x01^6\x86u\x1C
\x00\x80\x00\x00\x00\x00\x00\x00e column=0:\x80\x0C, timestamp=1504153335068, value=\x80\x00\x00\x04
\x80\x00\x01^6\x86u\x1C
\x00\x80\x00\x00\x00\x00\x00\x00e column=0:\x80\x0D, timestamp=1504153335068, value=2.0.5
#:Decode a long value (such as of time type) using Phoenix 4.4.x
hbase(main):007:0> import org.apache.phoenix.schema.types.PLong
=> Java::OrgApachePhoenixSchemaTypes::PLong
hbase(main):008:0> import org.apache.phoenix.schema.SortOrder
=> Java::OrgApachePhoenixSchema::SortOrder
hbase(main):009:0> PLong::INSTANCE.getCodec.decodeLong("\x80\x00\x01N\xA5`H\xA6".to_java_bytes, 0, SortOrder::ASC)
=> 1437293627558
hbase(main):010:0> PLong::INSTANCE.getCodec.decodeLong("\x80\x00\x01N\xA5`H\xA5".to_java_bytes, 0, SortOrder::ASC)
=> 1437293627557
hbase(main):011:0>
实战
准备工作
- 业务方停止1特征程序停止
- Phoenix表USERACTIONLOG_NEW和USERACTIONLOGUID0_NEW停止写入,确认写入停止后,记录当前mysql中的offset
过程
aws hbase ==> aws hdfs
- 启动Phoenix两张表的export数据脚本
nohup /home/ubuntu/data/striver/useractionlog3.sh > /home/ubuntu/data/striver/useractionlog3.log & nohup /home/ubuntu/data/striver/useractionlog3uid0.sh > /home/ubuntu/data/striver/useractionlog3uid0.log &
-
监控export脚本100%完成 tail -f /home/ubuntu/data/striver/useractionlog3.log tail -f /home/ubuntu/data/striver/useractionlog3uid0.log
- Phoenix表USERACTIONLOG_NEW和USERACTIONLOGUID0_NEW继续写入,同步数据至最新 修改status后,脚本自动调起任务: update yarn_apps_watcher set status =1 where app_name = ‘kafka-hbase-UserActionLog3’; update yarn_apps_watcher set status =1 where app_name = ‘kafka-hbase-UserActionLog3Uid0’;
- 特征程序启动(@土豆)
- 导出结束
8.
aws hdfs ==> aws s3
nohup hadoop distcp -Dfs.s3a.access.key=Akkkkk -Dfs.s3a.secret.key=x8Lkkkkkk -update -delete /tmp/USERACTIONLOGUID0_NEW s3a://aku-hdfs-backup/test/hbase/USERACTIONLOGUID0_NEW > /tmp/useractionloguid0.log nohup hadoop distcp -Dfs.s3a.access.key=Akkkkk -Dfs.s3a.secret.key=x8Lkkkkkk -Dyarn.app.mapreduce.am.resource.mb=4096 -Dmapreduce.map.maxattempts=100 -bandwidth 1000 -m 60 -numListstatusThreads 40 -update -delete /tmp/USERACTIONLOG_NEW s3a://aku-hdfs-backup/hbase/data/USERACTIONLOG_NEW > /tmp/useractionlog.log
经常失败,为了充分发挥Hadoop的容错能力,修改相关容错参数,参考:hadoop 优化配置 结果遇到下面问题:
问题:使用distcp传文件到s3,会在任务所在节点的本地磁盘
/tmp/hadoop-yarn/s3a
缓存数据,造成一半节点系统盘写满的惨状
尝试方法:
- 指定Hadoop运行的缓存目录到数据盘
通过查找这个目录的作用,发现是缓存目录,更改缓存目录,创建了目录
/home/ubuntu/data/buffer-dir
,修改权限为777。执行命令时添加选项:-Dfs.s3a.buffer.dir=/home/ubuntu/data/buffer-dir。没有走成。 - 大文件问题:怎么拆分
- 最终成功的方法:修改配置重启服务。 解决方法:设置fs.s3a.fast.upload 为true 参考 这样可以避免缓存到本地,而是直接从内存推到s3.
aws s3 ==> aliyun hdfs
ssh 172.22.8.89(aliyun master4)
hadoop distcp -Dmapred.job.name=hbase_test_people -Dfs.s3a.access.key=Akkkkk -Dfs.s3a.secret.key=x8Lkkkkkk -Dmapreduce.map.memory.mb=6120 -Dyarn.app.mapreduce.am.resource.mb=4096 -bandwidth 500 -m 40 -numListstatusThreads 40 -update -delete -strategy dynamic s3a://aku-hdfs-backup/test/hbase/USERACTIONLOGUID0_NEW /tmp/USERACTIONLOGUID0_NEW
hadoop distcp -Dmapred.job.name=hbase_useractionlog_new -Dfs.s3a.access.key=Akkkkk -Dfs.s3a.secret.key=x8Lkkkkkk -Dmapreduce.map.memory.mb=6120 -Dyarn.app.mapreduce.am.resource.mb=4096 -bandwidth 500 -m 40 -numListstatusThreads 40 -update -delete -strategy dynamic s3a://aku-hdfs-backup/hbase/data/USERACTIONLOG_NEW /tmp/USERACTIONLOG_NEW
在走公网的情况下,每个节点单独的带宽都是一定的,在越多的节点上并行运行map任务,导入速度就越快。
aliyun hdfs ==> aliyun phoenix
nohup hbase org.apache.hadoop.hbase.mapreduce.Import USERACTIONLOGUID0_NEW /tmp/USERACTIONLOGUID0_NEW -Dmapreduce.map.speculative=false -Dmapreduce.reduce.speculative=false -Dimport.wal.durability =SKIP_WAL > /tmp/uid0_import.log &
nohup hbase org.apache.hadoop.hbase.mapreduce.Import USERACTIONLOG_NEW /tmp/USERACTIONLOG_NEW -Dmapreduce.map.speculative=false -Dmapreduce.reduce.speculative=false -Dimport.wal.durability =SKIP_WAL > /tmp/actionlog_import.log
在越多的regionserver上导入数据就越快,所以进行预分区。否则开始只会用到少数几个regionsever 建表语句:
-- USERACTIONLOGUID0_NEW
CREATE TABLE USERACTIONLOGUID0_NEW ( uid BIGINT , device_id VARCHAR not null, device_type INTEGER, app_version VARCHAR, screen_num INTEGER, screen_action INTEGER, screen_value VARCHAR, control_num INTEGER, control_action VARCHAR, control_type VARCHAR, control_value VARCHAR, device_time BIGINT, adjust_time BIGINT not null, country_id INTEGER, kafka_time BIGINT CONSTRAINT PK PRIMARY KEY (device_id,adjust_time)) SPLIT ON('00000000-00','00000000-01','00000000-03','00000000-05','00000000-07','00000000-08','00000000-09','00000000-0a','00000000-0b','00000000-0c','00000000-0d','00000000-0e','00000000-0f','00000000-11','00000000-13','00000000-14','00000000-15','00000000-16','00000000-17','00000000-18','00000000-19','00000000-1b','00000000-1d','00000000-1f','00000000-20','00000000-21','00000000-22','00000000-23','00000000-25','00000000-26','00000000-27','00000000-29','00000000-2a','00000000-2c','00000000-2d','00000000-2e','00000000-30','00000000-31','00000000-32','00000000-34','00000000-36','00000000-37','00000000-38','00000000-39','00000000-3a','00000000-3b','00000000-3c','00000000-3d','00000000-3e','00000000-40','00000000-41','00000000-42','00000000-43','00000000-44','00000000-45','00000000-46','00000000-47','00000000-48','00000000-4a','00000000-4b','00000000-4c','00000000-4d','00000000-4e','00000000-4f','00000000-50','00000000-52','00000000-53','00000000-54','00000000-56','00000000-57','00000000-58','00000000-5a','00000000-5b','00000000-5c','00000000-5d','00000000-5e','00000000-5f','00000000-61','00000000-63','00000000-65','00000000-66','00000000-67','00000000-69','00000000-6b','00000000-6d','00000000-6e','00000000-6f','00000000-70','00000000-71','00000000-72','00000000-73','00000000-75','00000000-76','00000000-77','00000000-78','00000000-79','00000000-7a','00000000-7b','00000000-7c','00000000-7d','00000000-7e','00000000-7f','ffffffff-80','ffffffff-81','ffffffff-83','ffffffff-84','ffffffff-85','ffffffff-87','ffffffff-88','ffffffff-89','ffffffff-8a','ffffffff-8b','ffffffff-8c','ffffffff-8d','ffffffff-8e','ffffffff-8f','ffffffff-91','ffffffff-92','ffffffff-93','ffffffff-94','ffffffff-95','ffffffff-97','ffffffff-98','ffffffff-99','ffffffff-9b','ffffffff-9c','ffffffff-9d','ffffffff-a0','ffffffff-a2','ffffffff-a3','ffffffff-a4','ffffffff-a5','ffffffff-a6','ffffffff-a8','ffffffff-a9','ffffffff-aa','ffffffff-ab','ffffffff-ac','ffffffff-ae','ffffffff-b0','ffffffff-b1','ffffffff-b2','ffffffff-b4','ffffffff-b6','ffffffff-b7','ffffffff-b8','ffffffff-ba','ffffffff-bb','ffffffff-bc','ffffffff-bd','ffffffff-be','ffffffff-c0','ffffffff-c2','ffffffff-c3','ffffffff-c4','ffffffff-c5','ffffffff-c6','ffffffff-c8','ffffffff-ca','ffffffff-cb','ffffffff-cc','ffffffff-cd','ffffffff-ce','ffffffff-d0','ffffffff-d2','ffffffff-d4','ffffffff-d5','ffffffff-d7','ffffffff-d9','ffffffff-da','ffffffff-db','ffffffff-dd','ffffffff-de','ffffffff-df','ffffffff-e0','ffffffff-e1','ffffffff-e2','ffffffff-e3','ffffffff-e5','ffffffff-e7','ffffffff-e9','ffffffff-eb','ffffffff-ec','ffffffff-ed','ffffffff-ef','ffffffff-f1','ffffffff-f2','ffffffff-f3','ffffffff-f4','ffffffff-f5','ffffffff-f7','ffffffff-f8','ffffffff-f9','ffffffff-fa','ffffffff-fb','ffffffff-fd','ffffffff-fe');
-- ACTIONLOG_NEW建表语句:
CREATE TABLE USERACTIONLOG_NEW ( uid BIGINT not null, device_id VARCHAR, device_type INTEGER, app_version VARCHAR, screen_num INTEGER, screen_action INTEGER, screen_value VARCHAR, control_num INTEGER, control_action VARCHAR, control_type VARCHAR, control_value VARCHAR, device_time BIGINT, adjust_time BIGINT not null, country_id INTEGER, kafka_time BIGINT CONSTRAINT PK PRIMARY KEY (uid,adjust_time)) SALT_BUCKETS = 34;
这一步坑太多:
当写入过快时会遇见什么问题?
写入过快时,memstore的水位会马上被推高。
你可能会看到以下类似日志:
RegionTooBusyException: Above memstore limit, regionName=xxxxx ...
这个是Region的memstore占用内存大小超过正常的4倍,这时候会抛异常,写入请求会被拒绝,客户端开始重试请求。
当达到128M的时候会触发flush memstore,当达到128M * 4还没法触发flush时候会抛异常来拒绝写入。 两个相关参数的默认值如下:
hbase.hregion.memstore.flush.size=128M
hbase.hregion.memstore.block.multiplier=4
或者这样的日志:
regionserver.MemStoreFlusher: Blocking updates on hbase.example.host.com,16020,1522286703886: the global memstore size 1.3 G is >= than blocking 1.3 G sizeregionserver.MemStoreFlusher: Memstore is above high water mark and block 528ms
这是所有region的memstore内存总和开销超过配置上限,默认是配置heap的40%,这会导致写入被阻塞。目的是等待flush的线程把内存里的数据flush下去,否则继续允许写入memestore会把内存写爆
hbase.regionserver.global.memstore.upperLimit=0.4 # 较旧版本,新版本兼容
hbase.regionserver.global.memstore.size=0.4 # 新版本
当写入被阻塞,队列会开始积压,如果运气不好最后会导致OOM,你可能会发现JVM由于OOM crash或者看到如下类似日志:ipc.RpcServer: /192.168.x.x:16020 is unable to read call parameter from client 10.47.x.xjava.lang.OutOfMemoryError: Java heap spaceHBase
这里我认为有个很不好的设计,捕获了OOM异常却没有终止进程。这时候进程可能已经没法正常运行下去了,你还会在日志里发现很多其它线程也抛OOM异常。比如stop可能根本stop不了,RS可能会处于一种僵死状态。
如何避免RS OOM?
- 加快flush速度:
hbase.hstore.blockingWaitTime = 90000 mshbase.hstore.flusher.count = 2 hbase.hstore.blockingStoreFiles = 10
当达到hbase.hstore.blockingStoreFiles配置上限时,会导致flush阻塞等到compaction工作完成。阻塞时间是hbase.hstore.blockingWaitTime,可以改小这个时间。hbase.hstore.flusher.count可以根据机器型号去配置,可惜这个数量不会根据写压力去动态调整,配多了,非导入数据多场景也没用,改配置还得重启。
- 同样的道理,如果flush加快,意味这compaction也要跟上,不然文件会越来越多,这样scan性能会下降,开销也会增大。
hbase.regionserver.thread.compaction.small = 1 hbase.regionserver.thread.compaction.large = 1
增加compaction线程会增加CPU和带宽开销,可能会影响正常的请求。如果不是导入数据,一般而言是够了。好在这个配置在云HBase内是可以动态调整的,不需要重启。
- 上述配置都需要人工干预,如果干预不及时server可能已经OOM了,这时候有没有更好的控制方法?
hbase.ipc.server.max.callqueue.size = 1024 * 1024 * 1024 # 1G
直接限制队列堆积的大小。在CDH中没找到这个配置。
- Java Heap Size of HBase RegionServer in Bytes
当堆积到一定程度后,事实上后面的请求等不到server端处理完,可能客户端先超时了。并且一直堆积下去会导致OOM,1G的默认配置需要相对大内存的型号。当达到queue上限,客户端会收到CallQueueTooBigException 然后自动重试。通过这个可以防止写入过快时候把server端写爆,有一定反压作用。线上使用这个在一些小型号稳定性控制上效果不错。
- 增大yarn的container最小内存从1G到3G。
数据验证
- 数据条数查询
- 使用联合主键过滤查询
总结
好的也是莫名其妙。优化人崩溃,只为最大的满足业务的迁移时间需求。