在数据库中,通常有三类不同的函数,简称UDF,UDAF和UDTF。
UDF是指用户自定义函数,在ClickHouse中,这类函数不会改变数据的行数,其会对输入的列进行相应的计算,产生新的数据列。
UDAF是指用户自定义聚合函数,数据库中常见的sum
、count
等函数即为聚合函数,这类函数对输入的数据进行聚合计算,最终只输出一个聚合后的数据。
UDTF是指用户自定义表函数,正如名字所示,这类函数会返回一个临时的表(Table),从而可以通过SELECT
语句从中读取数据,甚至通过INSERT
语句插入数据。
下面,我们开始介绍如何在ClickHouse中添加这三类不同的自定义函数。
在ClickHouse中,普通函数的代码文件位于src/Functions/
目录下,当我们要在其中添加新的函数时,通常只需继承IFunction
接口类(注:在ClickHouse中,所有的接口类都是以大写字母I开头的,即Interface),并实现一些相应的接口函数。
下面,我们以一个简单的示例来展示如何在ClickHouse中添加UDF。我们想要添加的这个函数接收一个String
类型的参数,并返回String
的长度。首先,在src/Functions/
目录下新建代码文件StrLen.cpp
(由于该函数较为简单,不再将头文件和源文件分开),下面开始编写代码。
首先,引入一些必要的头文件,在实现函数时,需要和ClickHouse中的DataType
和Column
相关的类型和方法打交道。
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/IFunction.h>
#include <Functions/IFunctionImpl.h>
之后,我们开始实现继承自IFunction
接口类的子类。
class FunctionStrLen : public IFunction
{
public:
static constexpr auto name = "strLen";
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 1; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
....
}
bool useDefaultImplementationForConstants() const override { return true; }
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t) const override
{
...
}
}
首先,我们声明一个静态的String
类型的成员变量name
,值为strLen
,也就是该函数的名字,在getName
方法中返回该变量。
getNumberOfArguments
函数返回1,表示函数的参数个数为1。如果函数的参数个数是可变的,那么该函数需要返回0,同时需要继承isVariadic
函数,并返回true
。
useDefaultImplementationForConstants
表示如果对于所有参数都为常量的情况下,是否使用系统的默认实现,通常返回true
即可。当函数参数为"hello","world"之类的即为常量,而如果参数是某个表中的一个String
列则不是常量。当返回true
时,对于常量参数,系统会将其转为一个非常量的列,从而按列的方式进行计算,因此,在计算逻辑中我们只用实现非常量的情况。
下面来看另外两个较为复杂的函数。
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (!isString(arguments[0]))
throw Exception(
"Illegal type " + arguments[0]->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return std::make_shared<DataTypeUInt64>();
}
getReturnTypeImpl
函数完成对参数个数和数据类型的检查,同时返回函数返回值对应的DataType。在这里,我们需要检查函数参数为String
类型,如果不是,则抛出异常。然后返回函数的返回值类型。
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t) const override
{
const auto & strcolumn = arguments[0].column;
if (const ColumnString * col = checkAndGetColumn<ColumnString>(strcolumn.get()))
{
const auto & offsets = col->getOffsets();
auto col_res = ColumnVector<UInt64>::create();
auto & res_data = col_res->getData();
res_data.resize(offsets.size());
for (size_t i = 0; i < offsets.size(); ++i)
res_data[i] = offsets[i] - offsets[i - 1] - 1;
return col_res;
}
throw Exception(
"Illegal column " + arguments[0].column->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_COLUMN);
}
executeImpl
函数实现函数具体的执行逻辑,第一个参数为输入的参数,可通过下标访问对应的参数,第二个参数为函数返回值的类型,第三个参数为输入的行数。在该函数,需要根据输入参数(对应的列),执行相应计算,并构造一个新的列,其中存储计算产生的结果值,最后返回新生成的列,其即为函数作用在这一参数对应的Block上面生成的结果。
在上面的代码中,输入参数是一个ColumnString
列,首先从参数中获取到该列。我们要计算的是该列中每一个String
的长度,并返回一个长度列。ColumnString
的内存表示是两个数组,其中一个数组保存String
的数组,两个String
之间有一个0值,另一个数组为offset数组,其中的值表示每一个String
在前一个数组中的偏移,为了计算String
的长度,我们只需访问offset数组。首先,创建一个UInt64
类型的ColumnVector
,并将大小resize为和ColumnString
中的String
个数相同,然后通过offset数组计算出每一个String
的长度并填到ColumnVector
列中。最后返回创建的整数列。
事实上,IFunction
接口类中还有许多其他的方法,可以通过继承并实现那些方法进一步控制函数的行为和功能等,同时,实现函数并非一定是继承该接口类。更多信息可参考src/Functions/IFunction.h
, src/Functions/IFunctionImpl.h
。
最后,当实现完函数类之后,我们需要在FunctionFactory
中完成该函数的注册,FunctionFactory使用一个std::unordered_map
来保存函数名和对应的函数,从而在执行SQL时,能够通过名字来找到对应的函数并执行。
注册函数时,我们需要在src/Functions/registerFunctions.cpp
中声明函数registerFunctionStrLen
,并在StrLen
函数中实现:
void registerFunctionStrLen(FunctionFactory & factory) { factory.registerFunction<FunctionStrLen>(); }
最后,在src/Functions/registerFunctions.cpp
中的registerFunctions
函数中调用该函数完成函数的注册。
下面,是StrLen.cpp
文件的完整内容:
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/IFunction.h>
#include <Functions/IFunctionImpl.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int TOO_LARGE_STRING_SIZE;
}
namespace
{
class FunctionStrLen : public IFunction
{
public:
static constexpr auto name = "strLen";
static FunctionPtr create(const Context &) { return std::make_shared<FunctionStrLen>(); }
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 1; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (!isString(arguments[0]))
throw Exception(
"Illegal type " + arguments[0]->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return std::make_shared<DataTypeUInt64>();
}
bool useDefaultImplementationForConstants() const override { return true; }
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t) const override
{
const auto & strcolumn = arguments[0].column;
if (const ColumnString * col = checkAndGetColumn<ColumnString>(strcolumn.get()))
{
const auto & offsets = col->getOffsets();
auto col_res = ColumnVector<UInt64>::create();
auto & res_data = col_res->getData();
res_data.resize(offsets.size());
for (size_t i = 0; i < offsets.size(); ++i)
res_data[i] = offsets[i] - offsets[i - 1] - 1;
return col_res;
}
throw Exception(
"Illegal column " + arguments[0].column->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_COLUMN);
}
};
}
void registerFunctionStrLen(FunctionFactory & factory)
{
factory.registerFunction<FunctionStrLen>();
}
}
聚合函数是状态函数,其将传入的值激活到某个状态,并可以从该状态中获取值。
聚合函数的代码文件位于src/AggregateFunctions/
目录下。同样地,当添加一个新的聚合函数时,需要继承接口类IAggregateFunctionDataHelper
作为该类的子类来实现。
下面,我们来实现一个简单的聚合函数aggStrLen
,该函数也是获取String
的长度,但与上面的函数不同的是,它获取的是所有String
长度的和。
首先,在src/AggregateFunctions/
目录下创建两个新的文件AggregateFunctionStrLen.h
和AggregateFunctionStrLen.cpp
。在头文件中,我们完成函数实现,在源文件实现注册函数。
同样地,首先引入必要的文件:
#include <type_traits>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <AggregateFunctions/IAggregateFunction.h>
接下来,由于聚合函数是状态函数,我们需要先定义一个结构,保存该聚合函数的中间状态值:
struct AggregateFunctionStrLenData
{
UInt64 sum{};
void ALWAYS_INLINE add(UInt64 value) { sum += value; }
void merge(const AggregateFunctionStrLenData & rhs) { sum += rhs.sum; }
void write(WriteBuffer & buf) const
{
writeBinary(sum, buf);
}
void read(ReadBuffer & buf)
{
readBinary(sum, buf);
}
UInt64 get() const { return sum; }
};
由于该聚合函数计算的是String
的长度,因此用一个UInt64
的值来保存状态值即可。add
方法将一个新的值加到sum
中,merge
方法完成两个状态的聚合。
下面,开始实现函数类:
template <typename Data>
class AggregateFunctionStrLen final : public IAggregateFunctionDataHelper<Data, AggregateFunctionStrLen<Data>>
{
public:
String getName() const override { return "aggStrLen"; }
AggregateFunctionStrLen(const DataTypes & argument_types_)
: IAggregateFunctionDataHelper<Data, AggregateFunctionStrLen<Data>>(argument_types_, {})
{}
DataTypePtr getReturnType() const override { return std::make_shared<DataTypeUInt64>(); }
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
{
const auto & column = static_cast<const ColumnString &>(*columns[0]);
const auto & offsets = column.getOffsets();
this->data(place).add(offsets[row_num] - offsets[row_num - 1] - 1);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
{
this->data(place).merge(this->data(rhs));
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
this->data(place).write(buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
this->data(place).read(buf);
}
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
auto & column = static_cast<ColumnVector<UInt64> &>(to);
column.getData().push_back(this->data(place).get());
}
};
该聚合函数继承自IAggregateFunctionDataHelper
接口类,getName
方法返回函数的名字aggStrLen
,防止和上面的strLen
混淆。
getReturnType
方法返回函数的返回值类型。
add
方法将columns
中对应列的row_num
行的数据取出来,和place
指针指向的聚合状态进行聚合计算,place
指向的即为一个上面定义的struct
对象。
merge
方法将rhs
对应的聚合状态取出来,和place
对应的聚合状态进行聚合。在并发执行聚合函数的过程中,需要将对应的聚合结果进行合并。
serialize
和deserialize
方法完成聚合状态的序列化和反序列化。序列化和反序列化方法在分布式查询执行进行网络传输和内存不够的时候被使用。
最后insertResultInto
方法将最终计算得到的值插入到IColumn
to中。
对于更复杂的聚合函数实现,可能还需要继承实现更多的接口,具体可参考src/AggregateFunctions/IAggregateFunction.h
下面,我们需要在AggregateFunctionStrLen.cpp
中实现注册函数。
AggregateFunctionPtr createAggregateFunctionStrLen(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
assertNoParameters(name, parameters);
assertUnary(name, argument_types);
DataTypePtr data_type = argument_types[0];
if (!isString(data_type))
throw Exception(
"Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return std::make_shared<AggregateFunctionStrLen<AggregateFunctionStrLenData>>(argument_types);
}
void registerAggregateFunctionStrLen(AggregateFunctionFactory & factory)
{
factory.registerFunction("aggStrLen", createAggregateFunctionStrLen);
}
在该文件中,我们新实现一个函数createAggregateFunctionStrLen
,该函数检查聚合函数的参数是否符合要求,然后创建一个AggregateFunctionStrLen
对象返回。
registerAggregateFunctionStrLen
函数是在src/AggregateFunctions/registerAggregateFunctions.cpp
中声明的,最后,我们同样需要在registerAggregateFunctions
中调用该函数以完成该聚合函数的注册。
下面,是AggregateFunctionStrLen.h
和AggregateFunctionStrLen.cpp
的完整代码:
AggregateFunctionStrLen.h
#pragma once
#include <type_traits>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <AggregateFunctions/IAggregateFunction.h>
namespace DB
{
struct AggregateFunctionStrLenData
{
UInt64 sum{};
void ALWAYS_INLINE add(UInt64 value) { sum += value; }
void merge(const AggregateFunctionStrLenData & rhs) { sum += rhs.sum; }
void write(WriteBuffer & buf) const
{
writeBinary(sum, buf);
}
void read(ReadBuffer & buf)
{
readBinary(sum, buf);
}
UInt64 get() const { return sum; }
};
template <typename Data>
class AggregateFunctionStrLen final : public IAggregateFunctionDataHelper<Data, AggregateFunctionStrLen<Data>>
{
public:
String getName() const override { return "aggStrLen"; }
AggregateFunctionStrLen(const DataTypes & argument_types_)
: IAggregateFunctionDataHelper<Data, AggregateFunctionStrLen<Data>>(argument_types_, {})
{}
DataTypePtr getReturnType() const override { return std::make_shared<DataTypeUInt64>(); }
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
{
const auto & column = static_cast<const ColumnString &>(*columns[0]);
const auto & offsets = column.getOffsets();
this->data(place).add(offsets[row_num] - offsets[row_num - 1] - 1);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
{
this->data(place).merge(this->data(rhs));
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
this->data(place).write(buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
this->data(place).read(buf);
}
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
auto & column = static_cast<ColumnVector<UInt64> &>(to);
column.getData().push_back(this->data(place).get());
}
};
}
AggregateFunctionStrLen.cpp
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionStrLen.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include <AggregateFunctions/Helpers.h>
#include "registerAggregateFunctions.h"
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
namespace
{
AggregateFunctionPtr createAggregateFunctionStrLen(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
assertNoParameters(name, parameters);
assertUnary(name, argument_types);
DataTypePtr data_type = argument_types[0];
if (!isString(data_type))
throw Exception(
"Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return std::make_shared<AggregateFunctionStrLen<AggregateFunctionStrLenData>>(argument_types);
}
}
void registerAggregateFunctionStrLen(AggregateFunctionFactory & factory)
{
factory.registerFunction("aggStrLen", createAggregateFunctionStrLen);
}
}
表函数的代码文件位于src/TableFunctions/
目录下。事实上,当实现一个UDTF
时,首先需要实现对应的表引擎,在这里,我们暂时不实现新的表引擎,使用一个系统中已有的表引擎:StorageSystemContributors
,该表引擎文件位于src/Storages/System/
目录下,其包含一个名为name
的String
列,列中的每一个值为一个ClickHouse Contributor的GitHub名字。
下面,我们来实现一个简单的表函数,该函数返回一个临时的StorageSystemContributors
表。
首先,在src/TableFunctions/
目录下创建两个新的文件:TableFunctionContributors.h
和TableFunctionContributors.cpp
。
在头文件中,完成类的定义:
class TableFunctionContributors : public ITableFunction
{
public:
static constexpr auto name = "contributors";
std::string getName() const override { return name; }
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const String & table_name, ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return "StorageSystemContributors"; }
ColumnsDescription getActualTableStructure(const Context & context) const override;
表函数需要继承ITableFunction
接口。同样地,getName
返回该函数的名字:contributors
。getStorageTypeName
返回表引擎类型名字。
下面,我们在源文件中实现另外两个个函数以及注册函数。
ColumnsDescription TableFunctionContributors::getActualTableStructure(const Context & /*context*/) const
{
return ColumnsDescription{{{"name", std::make_shared<DataTypeString>()}}};
}
StoragePtr TableFunctionContributors::executeImpl(
const ASTPtr & /*ast_function*/, const Context & , const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
auto res = StorageSystemContributors::create(StorageID(getDatabaseName(), table_name));
res->startup();
return res;
}
void registerTableFunctionContributors(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionContributors>();
}
getActualTableStructure
函数返回ColumnDescription
,即返回的临时表的列列信息(列名以及列的数据类型)。
executeImpl
函数实现表函数的具体执行逻辑,在这儿,只需要创建一个对应的StorageSystemContributors
表并返回即可。
registerTableFunctionContributors
函数同样是在src/TableFunctions/registerTableFunctions.h
中声明中,在这儿实现,然后在src/TableFunctions/registerTableFunctions/
中的registerTableFunctions
中调用完成函数注册。
下面,是这两个文件的完整内容:
TableFunctionContributors.h
#pragma once
#include <TableFunctions/ITableFunction.h>
#include <Core/Types.h>
namespace DB
{
class TableFunctionContributors : public ITableFunction
{
public:
static constexpr auto name = "contributors";
std::string getName() const override { return name; }
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const String & table_name, ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return "Contributors"; }
ColumnsDescription getActualTableStructure(const Context & context) const override;
};
}
TableFunctionContributors.cpp
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Storages/System/StorageSystemContributors.h>
#include <TableFunctions/ITableFunction.h>
#include <TableFunctions/TableFunctionContributors.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/parseColumnsListForTableFunction.h>
#include "registerTableFunctions.h"
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
ColumnsDescription TableFunctionContributors::getActualTableStructure(const Context & /*context*/) const
{
return ColumnsDescription{{{"name", std::make_shared<DataTypeString>()}}};
}
StoragePtr TableFunctionContributors::executeImpl(
const ASTPtr & /*ast_function*/, const Context & , const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
auto res = StorageSystemContributors::create(StorageID(getDatabaseName(), table_name));
res->startup();
return res;
}
void registerTableFunctionContributors(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionContributors>();
}
}
添加完这些函数之后,需要重新编译ClickHouse并重启。
下面,我们开始展示如何使用上面实现的这些函数。
:) select strLen('hello,world')
SELECT strLen('hello,world')
Query id: 4f3be5f4-5001-4f5e-95dd-620edbd99132
┌─strLen('hello,world')─┐
│ 11 │
└───────────────────────┘
1 rows in set. Elapsed: 0.002 sec.
strLen
函数接收一个参数,返回输入参数的长度
:) select aggStrLen('hello,world')
SELECT aggStrLen('hello,world')
Query id: ff829043-e241-429e-9db1-a68d6b439457
┌─aggStrLen('hello,world')─┐
│ 11 │
└──────────────────────────┘
1 rows in set. Elapsed: 0.002 sec.
同样地,aggStrLen
也接收String
参数,在这里,我们看到返回值和上面的strLen
一样,这是因为参数为常量,只有一个String
,故两个函数的返回值相同。
:) select * from contributors() limit 10
SELECT *
FROM contributors()
LIMIT 10
Query id: fd2efbc2-ca5b-4414-a6d0-76ae7c88987f
┌─name─────────────┐
│ franklee │
│ tiger.yan │
│ ikopylov │
│ alexey-milovidov │
│ Andrey Mironov │
│ Alexander Lukin │
│ maxulan │
│ ageraab │
│ Nikolay Kirsh │
│ Stepan Herold │
└──────────────────┘
10 rows in set. Elapsed: 0.002 sec.
上面展示了contributors
表函数的使用,其会返回一个临时表,我们可以通过SELECT
从中获取数据,从上面的结果中可以看出,临时表含有一个name
列,列中的每一个值为一个人名,即为ClickHouse contributor。
下面,我们展示一下如何将这三个函数结合使用,strLen
和aggStrLen
可以作用在非常量列上面。
:) select strLen(name) from contributors() limit 10
SELECT strLen(name)
FROM contributors()
LIMIT 10
Query id: ec0abeb5-28df-41b2-9db0-47932a2d9202
┌─strLen(name)─┐
│ 20 │
│ 4 │
│ 6 │
│ 8 │
│ 12 │
│ 9 │
│ 12 │
│ 15 │
│ 6 │
│ 6 │
└──────────────┘
10 rows in set. Elapsed: 0.002 sec.
:) select aggStrLen(name) from contributors()
SELECT aggStrLen(name)
FROM contributors()
Query id: e5fa1c7d-4d53-4a64-babf-90c058682b69
┌─aggStrLen(name)─┐
│ 9727 │
└─────────────────┘
1 rows in set. Elapsed: 0.003 sec.
从上面的结果中,我们能够清晰地看出strLen
和aggStrLen
的区别了,前者计算每一个名字的长度,返回一个新的列,后者计算所有名字长度的和,最后只返回一个聚合后的值。
从上面的教程中,我们可以看到,ClickHouse具有良好的代码结构,易于扩展,这使得我们可以很容易地在其中添加自定义函数,完成各种各样丰富的功能。