本文共 5760 字,大约阅读时间需要 19 分钟。
本篇将介绍如何使用 Instance Tunnel 来获取 Maxcompute Instance 执行结果。
每天我们都会在 Maxcompute 平台上提交 select query,用于查询特定的数据。然而,熟悉平台的同学都知道,从平台获取 sql 查询结果是一个 Restful 请求,可能碰到以下两个问题:
Warning: ODPS request failed, requestID:xxxx, retryCount:1, will retry in xxx seconds.
READ_TABLE_MAX_ROW
(默认为 10000)。这就会出现明明我们的查询结果 2W 条,最后却在 Maxcompute Console 或者 Logview 上只看到 1W条 的诡异情况了。针对上述问题,提出下列解法,其中解法 3 可解决上述两种问题,解法 1、2 仅适用于问题 1:
alter table <source_table_name> merge smallfiles ;
命令将小文件合并之后,再重新执行 select 查询。create table <tmp_table_name> as select ....
命令创建临时表,再参照解法 a 合并临时表的存储小文件。create table <tmp_table_name> as select ....
命令创建临时表,然后通过 Tunnel 命令或者 Tunnel SDK 来将临时表数据下载下来。上面的方法虽然能解决问题,但总是有那么点"绕",有那么点"费"。
有没有更加直接的方法呢?
如果是之前,只能怂了,但是现在我们要大声的说有!在最近的 Maxcompute 版本( >=S27 ) 中,我们开发了Instanc Tunnel功能。
Instance Tunnel 提供使用 Tunnel 来下载 SQL 查询结果的功能,不仅能摆脱上述两类问题,可直接获取查询结果;还丰富了 Maxcompute Tunnel 下载通道,不再局限于表数据。换句话说,以前我们可以用 Tunnel 来下载 Maxcompute 表数据,如今,我们也可以用 Tunnel 来下载 Maxcompute Instance 的数据。Instance Tunnel 的使用方式与 Table Tunnel 基本一致,下面分别介绍使用客户端和 SDK 来下载 Instance 执行结果的方法。
命令:
tunnel download instance://<[project_name/]instance_id> <path>
参数: project_name: instance 所在的项目名称;instance_id: 待下载数据的 instance id
举例:
// 执行一条 select 查询:odps@ odps_test_project>select * from wc_in;ID = 20170724071705393ge3csfb8... ...// 使用 Instance Tunnel Download 命令下载执行结果到本地文件odps@ odps_test_project>tunnel download instance://20170724071705393ge3csfb8 result;2017-07-24 15:18:47 - new session: 2017072415184785b6516400090ca8 total lines: 82017-07-24 15:18:47 - file [0]: [0, 8), resultdownloading 8 records into 1 file2017-07-24 15:18:47 - file [0] start2017-07-24 15:18:48 - file [0] OK. total: 44 bytesdownload OK// 查看结果 cat resultslkdfjhellpappleteapeachappleteateaa
在 Maxcompute Console 中打开 use_instance_tunnel
选项之后,执行的 select query 就会默认使用 Instance tunnel 来下载结果了,再也不会出现文章开头所描述的两种问题了。打开该配置有两种方法:
# download sql results by instance tunneluse_instance_tunnel=true# the max records when download sql results by instance tunnelinstance_tunnel_max_record=10000
其中
instance_tunnel_max_record
表示使用 Instance tunnel 下载 sql 查询结果的条数。若不设置,下载条数不受限。
set console.sql.result.instancetunnel=true;
开启此功能。// 打开 Instance tunnel 选项odps@ odps_test_tunnel_project>set console.sql.result.instancetunnel=true;OK// 运行 select queryodps@ odps_test_tunnel_project>select * from wc_in;ID = 20170724081946458g14csfb8Log view:http://logview/xxxxx.....+------------+| key |+------------+| slkdfj || hellp || apple || tea || peach || apple || tea || teaa |+------------+A total of 8 records fetched by instance tunnel.
可以看到,如果使用 Instance tunnel 的方式来输出 select 查询结果,会在最后打印一条提示。比如上面例子中的提示告诉我们这个 instance 的执行结果一共有 8 条数据。同样也可以
set console.sql.result.instancetunnel=false;
来关闭此功能。
MaxCompute Java SDK 和 Python SDK 都对 Instance tunnel 进行了支持,下面介绍用法。
对于 Java SDK,从 0.27.2-public
( )版本开始,我们提供两种方式来获取数据。
SQLTask.getResultSet()
静态方法获取:Odps odps = OdpsUtils.newDefaultOdps(); // 初始化 Odps 对象 Instance i = SQLTask.run(odps, "select * from wc_in;"); i.waitForSuccess(); // 根据 instance 对象,获取结果迭代器 ResultSet rs = SQLTask.getResultSet(i); for (Record r : rs) { // 输出结果条数 System.out.println(rs.getRecordCount()); for (int col = 0; col < rs.getTableSchema().getColumns().size(); ++col) { // wc_in 表字段均为 STRING, 这里就直接打印输出 System.out.println(r.get(col)); } }
InstanceTunnel.DownloadSession
来获取:Odps odps = OdpsUtils.newDefaultOdps(); // 初始化 Odps 对象 Instance i = SQLTask.run(odps, "select * from wc_in;"); i.waitForSuccess(); // 创建 InstanceTunnel InstanceTunnel tunnel = new InstanceTunnel(odps); // 根据 instance id,创建 DownloadSession InstanceTunnel.DownloadSession session = tunnel.createDownloadSession(odps.getDefaultProject(), i.getId()); long count = session.getRecordCount(); // 输出结果条数 System.out.println(count); // 获取数据的写法与 TableTunnel 一样 TunnelRecordReader reader = session.openRecordReader(0, count); Record record; while ((record = reader.read()) != null) { for (int col = 0; col < session.getSchema().getColumns().size(); ++col) { // wc_in 表字段均为 STRING, 这里就直接打印输出 System.out.println(record.get(col)); } } reader.close();
对于 PyODPS 来说,我们可以在 instance 上通过 open_reader
来获取数据,而从 0.7.7.1 的版本开始,我们可以通过 open_reader
使用 instance tunnel 来获取全量数据。
instance = o.execute_sql('select * from movielens_ratings limit 20000')with instance.open_reader() as reader: print(reader.count) # for record in reader 就是遍历这2万条数据,这里通过切片只取10条 for record in reader[:10]: print(record)
虽然 Instance Tunnel 为我们提供了非常方便的获取 Instance 执行结果的方式,但是为了保护用户数据安全,此功能也受到了诸多的权限约束:
转载地址:http://kpega.baihongyu.com/