背景
可观测的一个重要的趋势是链路、日志、指标的统一存储和使用。大模型时代,在业务层实现链路、日志、指标的统一查询语言将是一个重要的过渡方案。本文尝试实现一个简单的通过 sql 查询 prometheus 指标的工具。
实现步骤
经调研,我决定基于 Apache Calcite,实现 sql 解析并对接真实的 promql 查询终端。
Apache Calcite 是一个优秀的动态数据管理框架,提供了如:SQL 解析、SQL 校验、SQL 查询优化、SQL 生成以及数据连接查询等典型数据库管理功能1。
在Calcite中,查询处理分为逻辑计划和物理计划两个层次,这种分离使得优化更加灵活和模块化。逻辑计划描述了查询要做什么,而不关心具体如何执行。它是对SQL语句的抽象表示,专注于查询的语义而非执行细节。物理计划描述了查询如何执行,包含了具体的算法选择和执行策略。它是逻辑计划的具体实现,考虑了底层存储系统的特性和约束。
我的思路是,利用calcite自带的 sql 解析和逻辑计划分析能力,手动解析最终的物理计划逻辑,并翻译为 prom ql 和实际的查询动作。
效果预期
select `app_name`, `timestamp` , `value` from prometheus.system_metrics_cpu_util where app_name = 'my_app' and hostname = 'host1' and `timestamp` >= 1696118400000 and `timestamp` <= 1696204800000
应翻译为 prometheus 查询
system_metrics_cpu_util{app_name = 'my_app', hostname = 'host1'}
并查询出 JSON/CSV 数据,示例如下
app_name, timestamp, value
my_app, 1696118460000, 0.01
my_app, 1696118520000, 0.02
查询的方式是标准的 jdbc 连接和查询语法。如下图所示
Properties info = new Properties();
info.setProperty(CalciteConnectionProperty.LEX.camelName(), Lex.MYSQL.name());
// 获取连接
Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
// 获取Calcite连接
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
// 获取根模式
SchemaPlus rootSchema = calciteConnection.getRootSchema();
// 注册Prometheus Schema
String dstStr = """
[
{
"tableName": "system_metrics_cpu_util",
"fields": [
{"fieldName": "timestamp", "fieldType": "BIGINT"},
{"fieldName": "app_name", "fieldType": "VARCHAR"},
{"fieldName": "hostname", "fieldType": "VARCHAR"},
{"fieldName": "value", "fieldType": "DOUBLE"}
]
}
]
""";
List<DataSourceTable> dataSourceTables = JSON.parseArray(dstStr, DataSourceTable.class);
Sql2PrometheusSchema promSchema = new Sql2PrometheusSchema(dataSourceTables, false);
rootSchema.add("prometheus", promSchema);
// 创建语句对象
Statement statement = connection.createStatement();
// 执行查询语句
String sql = "select `timestamp` , `value` from prometheus.system_metrics_cpu_util where app_name = 'my_app' and hostname = 'host1' and `timestamp` >= 1696118400000 and `timestamp` <= 1696204800000";
ResultSet rs = statement.executeQuery(sql);
List<Map<String, Object>> list = new ArrayList<>(); // 转换 rs 为最终结果
ResultSetMetaData metaData = rs.getMetaData();
int columnCount = metaData.getColumnCount();
设计并实现一个 schema
从上面代码的使用可以看到,我们需要先实现一个 Sql2PrometheusSchema。calcite 里的 schema 和 Postgres 的schema的概念比较像。在PostgreSQL中,schema是一个逻辑容器,用于组织和管理数据库中的对象(如表、视图、函数、索引等)。它类似于一个命名空间,可以将相关的数据库对象分组在一起。在 MySQL中,类似的概念是 database。
public class Sql2PrometheusSchema extends AbstractSchema {
private final Map<String, Table> tableMap = new HashMap<>();
public Sql2PrometheusSchema(List<DataSourceTable> dataSourceTables, boolean enableQuery) {
if (Objects.isNull(dataSourceTables)) {
throw new IllegalArgumentException("dataSourceTables can not be null");
}
for (DataSourceTable dataSourceTable : dataSourceTables) {
String tableName = dataSourceTable.getTableName();
tableMap.put(tableName, new Sql2PrometheusTable(tableName, dataSourceTable.getFields(), enableQuery));
}
}
@Override
protected Map<String, Table> getTableMap() {
return tableMap;
}
}
注意到我们在初始化 Sql2PrometheusSchema 的时候,传入了表的定义 List<DataSourceTable> dataSourceTables,表的定义对应不同的 promtheus 的指标和 label,一个 promtheus 指标对应一张表,每个指标的所有 label 即为表的字段。每个表还有额外的2个字段:timestamp 和 value。
public class DataSourceTable {
private String tableName;
private java.util.List<DataSourceTableField> fields;
}
public class DataSourceTableField {
private String fieldName;
private String fieldType; // e.g., "STRING", "DOUBLE", "LONG"
private Boolean metricField; // 是否为指标字段
}
public record TimeRange(long start, long end) {}
设计并实现一个 table
Sql2PrometheusTable 是该方案的核心。table 的scan方法就是执行解析后的物理计划的地方。只需要解析物理计划,并翻译成 prometheus 的查询语句,然后将查询动作包装成一个Enumerator即可。
public class Sql2PrometheusTable extends AbstractTable implements FilterableTable {
private static final String TIMESTAMP_FIELD = "timestamp";
@Getter
private final String metricTable;
private final List<DataSourceTableField> fields;
private final boolean enableQuery;
private final Map<String, Integer> fieldIndexMap = new HashMap<>();
private final Map<Integer, String> indexFieldMap = new HashMap<>();
@Getter
private boolean active;
@Getter
private String query;
@Getter
private String activeQuery;
public Sql2PrometheusTable(String metricTable, List<DataSourceTableField> fields, boolean enableQuery) {
this.metricTable = metricTable;
this.fields = fields;
this.enableQuery = enableQuery;
for (int i = 0; i < fields.size(); i++) {
fieldIndexMap.put(fields.get(i).getFieldName(), i);
indexFieldMap.put(i, fields.get(i).getFieldName());
}
}
@Override
public Enumerable<Object[]> scan(DataContext root, List<RexNode> filters) {
this.active = true;
JavaTypeFactory typeFactory = root.getTypeFactory();
return new AbstractEnumerable<>() {
public Enumerator<Object[]> enumerator() {
// 在这里处理过滤逻辑
// 具体实现取决于filters中的条件
return new Sql2PrometheusEnumerator(buildMetricsQueryRequest(typeFactory, filters), fieldIndexMap);
}
};
}
@Override
public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
// 这里应该返回 table 的 schema
RelDataTypeFactory.Builder builder = relDataTypeFactory.builder();
for (DataSourceTableField field : fields) {
SqlTypeName fieldType = SqlTypeName.get(field.getFieldType());
Objects.requireNonNull(fieldType, "fieldType is null");
builder.add(field.getFieldName(), fieldType);
}
return builder.build();
}
private MetricsQueryRequest buildMetricsQueryRequest(JavaTypeFactory typeFactory, List<RexNode> filters) {
StringBuilder labelSelectors = new StringBuilder(metricTable).append("{");
List<RexNode> filtersRemoveTs = new ArrayList<>();
List<TimeRange> timeRanges = new ArrayList<>();
for (RexNode filter : filters) {
TimestampVisitor timestampVisitor = new TimestampVisitor(fieldIndexMap.get(TIMESTAMP_FIELD), typeFactory);
filtersRemoveTs.add(filter.accept(timestampVisitor));
timeRanges.add(timestampVisitor.getTimeRange());
}
if (timeRanges.isEmpty()) {
throw new IllegalArgumentException("Time range is required");
}
TimeRange timeRange = timeRanges.getFirst();
for (RexNode filter : filtersRemoveTs) {
PromQLConverterVisitor visitor = new PromQLConverterVisitor(typeFactory, indexFieldMap);
String labelSelector = filter.accept(visitor);
if (!labelSelector.isEmpty()) {
labelSelectors.append(labelSelector).append(", ");
}
}
if (labelSelectors.lastIndexOf(",") == labelSelectors.length() - 2) {
labelSelectors.delete(labelSelectors.length() - 2, labelSelectors.length());
}
labelSelectors.append("}");
this.query = labelSelectors.toString();
return new MetricsQueryRequest(this.query, timeRange.start(), timeRange.end(), this.enableQuery);
}
}
包装成Enumerator
上面的scan方法返回的是一个Enumerator:Sql2PrometheusEnumerator。其入参即 prometheus 的 query range 参数。
public class MetricsQueryRequest {
private final String query;
private final long start;
private final long end;
}
在Enumerator里把 prom 查询包装起来,并提供Enumerator接口。
public class Sql2PrometheusEnumerator implements Enumerator<Object[]> {
private Iterator<Object[]> iterator;
private final Map<String, Integer> fieldIndexMap;
private Object[] currentElement; // 用于存储当前元素的成员变量
public Sql2PrometheusEnumerator(MetricsQueryRequest metricsQueryRequest, Map<String, Integer> fieldIndexMap) {
this.fieldIndexMap = fieldIndexMap;
long start = metricsQueryRequest.getStart();
long end = metricsQueryRequest.getEnd();
String query = metricsQueryRequest.getQuery();
System.out.println("MetricsQueryRequest: " + query + ", start: " + start + ", end: " + end);
// todo 执行 prom 查询,并将查询结果转换为 Iterator
this.iterator = fetchData(metricsQueryRequest).iterator();
}
@Override
public Object[] current() {
if (currentElement == null) {
throw new NoSuchElementException("No current element, call moveNext() first.");
}
return currentElement;
}
@Override
public boolean moveNext() {
if (iterator.hasNext()) {
currentElement = iterator.next();
return true;
}
currentElement = null; // 当没有更多元素时,清除当前元素
return false;
}
@Override
public void reset() {
iterator = convertData().iterator();
currentElement = null; // 重置时也应清除当前元素
}
@Override
public void close() {
}
}
解析物理计划
介绍完实现步骤,我们再来重点看下物理计划的解析。物理计划解析的入口是Sql2PrometheusTable 的buildMetricsQueryRequest方法。
时间戳提取
sql 里是包含时间的过滤条件的,但是 prometheus 的查询语法里,时间戳范围和 query 是分开的。因此,我们的第一步是从物理计划里把时间戳相关的查询提取出来。即TimestampVisitor。
public class TimestampVisitor extends RexShuttle {
/**
* 时间戳列的索引
*/
private final int tsIndex;
private final RexBuilder rexBuilder;
private Long start = null;
private Long end = null;
public TimestampVisitor(int tsIndex, JavaTypeFactory typeFactory) {
// Pass true to visit each node only once
this.tsIndex = tsIndex;
this.rexBuilder = new RexBuilder(typeFactory);
}
@Override
public RexNode visitCall(RexCall call) {
if (isTimestampFilter(call)) {
RexNode right = call.getOperands().get(1);
setTimeRange(call.getOperator(), (RexLiteral) right);
return rexBuilder.makeLiteral(true);
}
return super.visitCall(call);
}
private boolean isTimestampFilter(RexCall call) {
if (call.getOperands().size() != 2) {
return false;
}
// 检查是否包含时间戳列
RexNode left = call.getOperands().getFirst();
if (left instanceof RexInputRef ref) {
return Objects.equals(tsIndex, ref.getIndex());
}
return false;
}
private void setTimeRange(SqlOperator sqlOperator, RexLiteral literal) {
SqlKind kind = sqlOperator.getKind();
if (SqlKind.GREATER_THAN.equals(kind) || SqlKind.GREATER_THAN_OR_EQUAL.equals(kind)) {
// ts >, ts >=
log.debug("greater than");
this.start = Optional.ofNullable((BigDecimal) literal.getValue()).map(BigDecimal::longValue).orElse(null);
} else if (SqlKind.LESS_THAN.equals(kind) || SqlKind.LESS_THAN_OR_EQUAL.equals(kind)) {
// ts <, ts <=
log.debug("less than");
this.end = Optional.ofNullable((BigDecimal) literal.getValue()).map(BigDecimal::longValue).orElse(null);
} else if (SqlKind.SEARCH.equals(kind)) {
// ts between
if (literal.getValue() instanceof Sarg<?> sarg) {
log.debug("sarg: {}", sarg);
Range<BigDecimal> range = (Range<BigDecimal>) sarg.rangeSet.span();
this.start = range.lowerEndpoint().longValue();
this.end = range.upperEndpoint().longValue();
}
}
}
public TimeRange getTimeRange() {
// start 不能是 null,但是 end 可以是 null
Objects.requireNonNull(this.start, "start is null");
if (Objects.isNull(end)) {
this.end = new Date().getTime();
}
return new TimeRange(this.start, this.end);
}
}
label查询条件拼接
第二步,是将除了时间戳之外的 where 查询条件拼接为 prometheus 查询的 label 条件。支持的 where 条件包含 =、!=、like、not like,分别对应 prometheus 查询的 =、!=、=~、!~。
public class PromQLConverterVisitor extends RexVisitorImpl<String> {
private final Map<Integer, String> indexFieldMap;
public PromQLConverterVisitor(JavaTypeFactory typeFactory, Map<Integer, String> indexFieldMap) {
super(true);
this.indexFieldMap = indexFieldMap;
}
@Override
public String visitCall(RexCall call) {
SqlOperator operator = call.getOperator();
if (operator == SqlStdOperatorTable.AND) {
return handleAndCondition(call);
} else if (operator == SqlStdOperatorTable.OR) {
return handleOrCondition(call);
} else {
return handleComparisonOperator(call);
}
}
@Override
public String visitInputRef(RexInputRef inputRef) {
return indexFieldMap.get(inputRef.getIndex());
}
@Override
public String visitLiteral(RexLiteral literal) {
if (literal.isAlwaysTrue()) {
return "";
}
if (literal.getValue() instanceof Sarg) {
return handleSargLiteral(literal);
}
Object value = literal.getValue2();
return value != null ? value.toString() : "null";
}
public String visitNode(RexNode node) {
if (node instanceof RexCall) {
return visitCall((RexCall) node);
} else if (node instanceof RexInputRef) {
return visitInputRef((RexInputRef) node);
} else if (node instanceof RexLiteral) {
return visitLiteral((RexLiteral) node);
}
return "";
}
private String handleAndCondition(RexCall call) {
List<String> conditions = new ArrayList<>();
for (RexNode operand : call.getOperands()) {
String condition = visitNode(operand);
if (!condition.isEmpty()) {
conditions.add(condition);
}
}
return String.join(",", conditions);
}
private String handleOrCondition(RexCall call) {
List<List<String>> orGroups = new ArrayList<>();
for (RexNode operand : call.getOperands()) {
List<String> branchConditions = extractBranchConditions(operand);
if (!branchConditions.isEmpty()) {
orGroups.add(branchConditions);
}
}
return combineOrConditions(orGroups);
}
private List<String> extractBranchConditions(RexNode node) {
List<String> conditions = new ArrayList<>();
if (node instanceof RexCall call) {
if (call.getOperator() == SqlStdOperatorTable.AND) {
for (RexNode operand : call.getOperands()) {
String condition = visitNode(operand);
if (!condition.isEmpty()) {
conditions.add(condition);
}
}
} else {
String condition = visitNode(node);
if (!condition.isEmpty()) {
conditions.add(condition);
}
}
}
return conditions;
}
private String handleComparisonOperator(RexCall call) {
if (call.getOperands().size() != 2) {
return "";
}
String columnName = visitNode(call.getOperands().get(0));
String value = visitNode(call.getOperands().get(1));
String operator = convertOperator(call.getOperator());
if (Strings.isNullOrEmpty(columnName) || Strings.isNullOrEmpty(value) || Strings.isNullOrEmpty(operator)) {
return "";
}
return String.format("%s%s\"%s\"", columnName, operator, escapeValue(value));
}
private String combineOrConditions(List<List<String>> orGroups) {
Map<String, Set<String>> labelValueMap = new HashMap<>();
for (List<String> group : orGroups) {
for (String condition : group) {
String[] parts = condition.split("=");
if (parts.length == 2) {
String label = parts[0];
String value = parts[1].replaceAll("\"", "");
labelValueMap.computeIfAbsent(label, k -> new HashSet<>()).add(value);
}
}
}
List<String> combined = new ArrayList<>();
for (Map.Entry<String, Set<String>> entry : labelValueMap.entrySet()) {
String label = entry.getKey();
Set<String> values = entry.getValue();
if (values.size() == 1) {
combined.add(String.format("%s=\"%s\"", label, values.iterator().next()));
} else {
combined.add(String.format("%s=~\"%s\"",
label,
String.join("|", values)));
}
}
return String.join(",", combined);
}
private String convertOperator(SqlOperator operator) {
if (operator == SqlStdOperatorTable.EQUALS) {
return "=";
} else if (operator == SqlStdOperatorTable.NOT_EQUALS) {
return "!=";
} else if (operator == SqlStdOperatorTable.LIKE || operator == SqlStdOperatorTable.SEARCH) {
return "=~";
} else if (operator == SqlStdOperatorTable.NOT_LIKE) {
return "!~";
}
return "";
}
private String escapeValue(String value) {
return value.replace("\"", "\\\"");
}
private String handleSargLiteral(RexLiteral literal) {
Sarg<NlsString> sarg = (Sarg<NlsString>) literal.getValue();
Objects.requireNonNull(sarg, "Sarg value is null");
// 处理范围类型
return handleSargPoints(sarg);
}
private String handleSargPoints(Sarg<NlsString> sarg) {
List<String> points = new ArrayList<>();
for (Range<NlsString> range : sarg.rangeSet.asRanges()) {
if (RangeSets.isPoint(range)) {
points.add(range.lowerEndpoint().getValue());
} else {
log.error("Invalid range: {}", range);
}
}
if (points.size() == 1) {
return String.format("%s", points.getFirst());
} else {
// 多个点使用正则表达式匹配
return String.format("%s", String.join("|", points));
}
}
private String formatValue(Object value) {
if (value == null) {
return "null";
}
if (value instanceof String) {
return "\"" + escapeValue((String) value) + "\"";
}
return value.toString();
}
}
至此,我们实现了一个简易的通过 sql 查询 prometheus 数据的工具。执行的打印日志为:
MetricsQueryRequest: system_metrics_cpu_util{app_name="my_app",hostname="host1"}, start: 1696118400000, end: 1696204800000
-
https://strongduanmu.com/blog/apache-calcite-learning-materials.html 挺不错的系列文章 ↩