大数据Presto(四):Presto自定义函数和JDBC连接
admin
2023-09-22 18:25:54
0

Presto自定义函数和JDBC连接

一、Presto 自定义函数

我们可以登录Presto客户端,使用命令:show functions 来查询对应的内置函数。我们也可以自己定义函数,自定义的函数包含UDF和UDAF函数。

1、UDF函数

自定义UDF函数及使用可以按照下面步骤来实现。

1.1、创建Maven项目,加入如下依赖


com.facebook.presto
presto-spi
0.259


com.facebook.presto
presto-array
0.259


io.airlift
stats
0.163






maven-assembly-plugin
2.4




jar-with-dependencies



com.lw.java.myflink.Streaming.example.FlinkReadSocketData





make-assembly
package

assembly







1.2、创建Presto注册插件类

package com.lansonjy.prestocode;
import com.facebook.presto.spi.Plugin;
import com.google.common.collect.ImmutableSet;

import java.util.Set;
//Presto 注册自定义函数的类,此类需要继承Plugin接口
public class MyFunctionsPlugin implements Plugin {
@Override
public Set> getFunctions()
{
return ImmutableSet.>builder()
//注册UDF,这里填写对应的UDF类
.add(MyUDF.class)
.build();
}
}

1.3、创建“MyUDF”类,实现自定义UDF逻辑

这里自定义的UDF函数实现大写字母转换成小写字母。代码如下:

package com.lansonjy.prestocode;

import com.facebook.presto.spi.function.Description;
import com.facebook.presto.spi.function.ScalarFunction;
import com.facebook.presto.spi.function.SqlType;
import com.facebook.presto.spi.type.StandardTypes;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;

//自定义UDF函数
public class MyUDF {
//自定义UDF函数使用时的名称
@ScalarFunction("myudf")
//函数的描述
@Description("转换字母大写为小写")
//指定函数的返回类型,字符串类型必须返回Slice, 使用 Slices.utf8Slice 方法可以方便的将 String 类型转换成 Slice 类型
@SqlType(StandardTypes.VARCHAR)
public static Slice lowercase(@SqlType(StandardTypes.VARCHAR) Slice in)
{
String argument = in.toStringUtf8();
return Slices.utf8Slice(argument.toLowerCase());
}
}

1.4、创建“resources”资源目录

在resouces资源目录中创建“META-INF/services”多级目录,在目录中创建“com.facebook.presto.spi.Plugin”配置文件,Presto将会根据此配置文件找到对应的注册自定义函数类。在此文件中需要指定注册自定义函数的类:

1.5、将项目打包,上传到Presto集群

将项目打包,上传到每台Presto节点的“$PRESTO_HOME/plugin/udf”目录下,默认udf目录没有,需要手动预先创建。所有Presto节点上传完成后,重启Presto集群。

1.6、使用自定义UDF函数

#登录Presto客户端
./presto --server node3:8080 --catalog mysql --schema presto_db

#查询所有函数
presto:presto_db> show functions;



#使用这个函数查询转换数据
presto:presto_db> select myudf('ABCDEF');
_col0
--------
abcdef
(1 row)

2、UDAF函数

UDAF是自定义聚合函数,下面自定义一个UDAF实现计算平均数聚合函数功能,步骤如下:

2.1、在项目中创建“MyUDAF”类

package com.lansonjy.prestocode;

import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.function.*;
import com.facebook.presto.spi.type.DoubleType;
import com.facebook.presto.spi.type.StandardTypes;

//presto 自定义聚合函数实现-实现平均数计算
//自定义聚合函数使用时的名称
@AggregationFunction("myudaf")
//自定义聚合函数注释
@Description("我的自定义聚合函数,实现计算平均数")
public class MyUDAF {
//输入数据注释
@InputFunction
public static void input(LongAndDoubleState state, @SqlType(StandardTypes.DOUBLE) double value) {
//针对每条数据,执行 input 函数。这个过程是并行执行的,因此在每个有数据的节点都会执行,最终得到多个累积的状态数据。
state.setLong(state.getLong() + 1);
state.setDouble(state.getDouble() + value);
}

//聚合数据注释
@CombineFunction
public static void combine(LongAndDoubleState state, LongAndDoubleState otherState) {
//将所有节点的状态数据聚合起来,多次执行,直至所有状态数据被聚合成一个最终状态,也就是 Aggregation 函数的输出结果。
state.setLong(state.getLong() + otherState.getLong());
state.setDouble(state.getDouble() + otherState.getDouble());
}

//输出数据注释
@OutputFunction(StandardTypes.DOUBLE)
public static void output(LongAndDoubleState state, BlockBuilder out) {
//最终输出结果到一个 BlockBuilder。
long count = state.getLong();
if (count == 0) {
out.appendNull();
} else {
double value = state.getDouble();
DoubleType.DOUBLE.writeDouble(out, value / count);
}
}
}

以上类中涉及到了自定义类型LongAndDoubelState接口实现,此接口继承了AccumulatorState接口,对于简单的计算逻辑,只是获取设置值,那么可以定义简单接口来实现,里面只需要实现对应的get,set方法实现即可。对于复杂的计算逻辑需要自定义类实现接口,实现复杂的计算逻辑,代码如下:

package com.lansonjy.prestocode;

import com.facebook.presto.spi.function.AccumulatorState;

public interface LongAndDoubleState extends AccumulatorState {
long getLong();

void setLong(long value);

double getDouble();

void setDouble(double value);
}

2.2、在“MyFunctionPlugin”中注册UDAF

//Presto 注册自定义函数的类,此类需要继承Plugin接口
public class MyFunctionsPlugin implements Plugin {
@Override
public Set> getFunctions()
{
return ImmutableSet.>builder()
//注册UDF,这里填写对应的UDF类
.add(MyUDF.class)
//注册UDAF,这里填写对应的UDAF 类
.add(MyUDAF.class)
.build();
}
}

2.3、打包,上传到各个Presto

将项目打包,上传到每台Presto节点的“$PRESTO_HOME/plugin/udf”目录下,默认udf目录没有,需要手动预先创建。所有Presto节点上传完成后,重启Presto集群。

2.4、在presto中执行如下命令

#登录Presto客户端
[root@node3 presto-0.259]# ./presto --server node3:8080 --catalog mysql --schema presto_db

#查看函数
presto:presto_db> show functions;



#执行聚合查询
presto:presto_db> select pkg_name,myudaf(amount) as abc from machine_consume_detail group by pkg_name;



二、Presto JDBC连接

使用JDBC连接Presto需要在项目中导入以下依赖:


io.prestosql
presto-jdbc
312

JDBC连接代码如下:

public class ReadDataFromPresto {
public static void main(String[] args) throws ClassNotFoundException, SQLException {
Connection conn = DriverManager.getConnection("jdbc:presto://node3:8080/mysql/presto_db","root",null);
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("select pkg_name,sum(amount) as total_amount from machine_consume_detail group by pkg_name");
while (rs.next()) {
String pkgName = rs.getString("pkg_name");
double totalAmount = rs.getDouble("total_amount");
System.out.println("pkgName = "+pkgName+",totalAmount="+totalAmount);
}
rs.close();
conn.close();
}
}

相关内容