用DuckDB和Go构建强大的GraphQL API服务,让您的S3数据湖文件堪称利器
1. 简介
数据湖表主要由使用大数据计算引擎(如Spark或Flink)的数据工程团队使用,以及使用重型SQL查询引擎(如Trino或Redshift)创建模型和报告的数据分析员和科学家。这些计算引擎已经成为访问数据湖中的数据的标准,因为它们被设计成能够有效地处理大数据处理所涉及的挑战:扫描大量数据、处理基于云的对象存储、读取和写入查询优化格式的文件,如Parquet或ORC等等。
然而,通常还需要将大型数据产品(或其中的某些聚合视图)提供给较轻量级的客户端,例如内部微服务,使用某种API。假设我们有一个数据湖表,它存储了由某个Spark应用程序生成的关于我们客户的实时统计数据。这些数据可能主要用于内部报告,但也可能对我们组织中的其他服务很有价值。尽管这是一个常见的要求,但它远非简单,主要是因为它需要相当不同的工具集。将S3存储桶中的parquet文件用于低延迟的基于HTTP的API并不简单(特别是当文件不断更新且在使其可访问之前需要进行某些转换时)。
为了使这种用例工作,通常需要一个能够以快速面向客户的延迟处理查询的数据库。类似地,我们需要一些能够处理和转换S3中的数据文件并将其加载到数据库中的ETL作业。最后,我们必须创建一个合适的API端点来处理客户端的查询。
的确,正如插图所示,为薄客户端提供查询数据湖文件的能力通常会增加更多的移动部件和流程到我们的流水线中,以便将数据复制和摄入至更昂贵的面向客户的数据仓库或者将其聚合和转换以适应低延迟数据库。
本文的目的是探索和演示一种不同且更简单的方法来解决使用较轻量级进程内查询引擎处理此需求的方法。具体地说,我将展示如何使用DuckDB和Arrow Data Fusion等进程内引擎来创建既能处理数据湖文件和数据量,又能充当快速内存存储并提供低延迟API调用的服务。使用这种方法,我们可以高效地将所需功能整合到一个可横向扩展的单个查询服务中,它将加载数据、聚合并存储在内存中,并高效快速地提供API调用。
接下来,第2节将概述此服务的主要要求和构建块,并解释它们是如何帮助我们解决所涉及的主要挑战的。第3节将深入探讨服务的核心部分-数据加载和查询功能,并演示如何使用DuckDB和Go来实现它(本文将专注于DuckDB和Go,但您可以在下方链接的存储库中找到使用Arrow Data Fusion的Rust实现)。第4节将在其上添加GraphQL API服务层。第5节将做出总结。
2. 主要构建块
值得注意的是,这种方法的基本假设是我们要提供给API的数据可以适应我们的服务的内存,或者运行它的机器的内存。对于某些用例,这可能是一个有问题的限制,但我相信这比看起来的限制要少。例如,我曾使用这种方法处理由2百万行和10列组成的内存关系表,其内存使用量约为350MB。我们往往忘记我们所服务的数据通常比我们存储或处理的数据要小得多。无论如何,这是一个重要的考虑因素。
独立服务应该作为一个比上述普通架构更简单、更吸引人的选择,至少应满足以下要求:
- 能够轻松读取和转换来自数据湖或对象存储的数据文件。
- 能够在内存中存储关系数据,并以低延迟响应查询。
- 具备水平可扩展性。
- 能够轻松且声明式地查询、转换和加载数据,SQL 是最方便的方法。
简单来说,我们不希望通过向应用程序基础架构添加数据库和额外的 ETL 过程来扩展。理想情况下,我们希望创建一个服务,可以直接从数据源加载和转换数据,高效存储数据,并能够快速查询。
在我看来,DuckDB(本文的重点)和 Arrow Data Fusion 结合的这3个功能是最大的优势之一。虽然内存数据库并不是什么新鲜事物,但 DuckDB 和 Arrow Data Fusion 的可扩展性是改变游戏规则的因素,它们使我们能够使用扩展来轻松地添加能力,直接在不同格式和位置读写数据,而且能够以大规模和高速度进行。
因此,我们的服务将由 3 个主要的组件或层包装或封装在一起:一个低级数据组件将封装 DuckDB 连接(我将其称为 DataDriver),一个 DAO 组件将使用该驱动程序执行查询并处理 API 请求,以及一个 API 解析器将提供服务。
换句话说,就依赖关系而言,我们有以下结构:
API-解析器封装了 DAO struct,DAO struct 封装了 DataDriver struct,DataDriver 封装了 DuckDB 连接
下一节将重点关注底层组件(DAO struct 和 DataDriver),而下一节将讨论顶层 API 层以及如何将它们全部整合在一起。
3、使用 DuckDB 加载和查询数据
在本节中,我们将创建一个包装 DuckDB 连接的驱动程序组件。它将负责初始化 DuckDB 并公开一个接口来执行 SQL 语句和查询。我们将使用优秀的 go-duckdb 库,该库通过静态链接到其 C lib,为 DuckDB 提供了一个 database/sql 接口。
初始化连接到 DuckDB 的 sql.DB
如前所述,我们将使用一个名为 DuckDBDriver 的结构体将 go-duckdb 库提供的 sql.DB 接口进行封装,以确保正确地进行初始化。我们通过执行一些初始化语句(bootQueries
)使用一个连接器对象初始化 DuckDB。该连接器执行设置 AWS 凭证的语句(因为我们希望从 S3 加载数据),以及加载和安装我们服务所需的扩展名:parquet 扩展(用于读取 parquet 文件)和 httpfs(用于直接从基于 HTTP 的对象存储(如 S3)中读取数据)。
如上面的代码块所示,函数 getBootQueries() 简单地返回一组作为字符串的初始化语句(可以在这里查看语句)。连接器执行这些初始化语句,所以当我们调用 OpenDB() 时,我们获得一个带有所需扩展和机密信息的 DuckDB 作为 sql.DB 连接。因为 go-duckdb 提供了一个将 DuckDB 连接作为 database/sql 接口的功能,所以它的主要查询功能可以很简单地实现和暴露为:
如前所述,DuckDB 数据驱动程序结构体只是作为一个实用类来封装到 DuckDB 的 DB 连接,所有查询执行将由一个 DAO 结构体有效地管理,该结构体具有使用驱动程序的方法的一组业务逻辑函数。DAO 结构体将再次封装 DuckDBDriver 结构体。
加载缓存数据
在服务数据后端初始化的最后阶段,我们从S3的parquet文件中加载需要服务的数据到内存表中。为此,我们将使用我们的驱动程序的execute()函数,使用CTAS查询创建一个命名表,该表可以使用任何我们可以用SQL表达的转换来自read_parquet()函数。一个示例将使这一点更清楚。
假设我们有一个包含描述用户的数据的parquet表。我们希望创建一个服务,仅向快速API访问公开来自此parquet表的3个字段:姓名、姓氏和年龄。我们还希望确保年龄字段可以作为整数访问,尽管它保存在parquet文件中是字符串形式。
为此,在我们的DuckDB驱动程序初始化所需的扩展名和设置所需的AWS凭据之后,我们只需执行一个选择我们想要的数据的SQL语句,直接从S3的parquet文件中读取到内存中,使用read_parquet()函数。
CREATE TABLE Users AS SELECT NAME, LAST_NAME, CAST(AGE as integer)FROM read_parquet('s3://somewhere/data/*')
在此语句中,我们实际上创建了一个名为Users的内存表,该表由从read_parquet()函数给定的位置的parquet文件中选择的字段组成。我们可以使用DuckDB支持的任何SQL函数和语法,包括复杂的查询语句和聚合。下面是如何使用此方法来初始化服务的更完整示例。
这实际上是该服务的核心-当服务被创建和启动时,它执行的一个语句实际上是将数据从S3加载到内存中。
服务初始化和数据加载完成后,我们可以直接在内存表上执行任何需要的SQL查询,以便以亚秒级的延迟为API提供服务。
4. 提供GraphQL服务
现在我们已经连接到一个加载有缓存parquet数据的内存表,最后一步是在其上创建一个GraphQL端点,以高效地回答数据查询。为此,我们将使用99designs的库gqlgen,这使得这个任务变得非常简单。
对gqlgen的深入介绍是超出我在此范围内的。对GraphQL不太熟悉的读者可以浏览它的文档,文档非常清晰明了。然而,我相信对GraphQL概念的一些了解足以理解本节并掌握其思想。
使用gqlgen来公开GraphQL端点通常涉及3个主要步骤:(1)创建一个模式,(2)生成解析器代码和存根,和(3)添加实现API函数的解析器代码。
我们将从描述我们表中的用户和2个主要函数来获取用户数据的模式开始。
scalar Timetype User { name: String! last_name: String! email: String! age: Int!}type Query { users: [User!]! getUsersByEmail(email: String!): [User!]!}
创建了模式之后,我们在项目目录中调用gqlgen代码生成过程:
go run github.com/99designs/gqlgen generate
运行生成过程将生成大量代码,包括实际的User struct(我们的数据模型struct),相应的解析器声明模板和解析器实现。让我们按顺序讨论它们。
解析器struct被生成在一个名为resolver.go的文件中,只有两个语句-一个没有属性或成员的结构类型声明,以及一个构造器(一个new()方法)用于初始化它。正如我们很快将看到的,解析器是我们的API服务层,对于API的每个方法都实现了一个函数。resolver.go文件的目的是让我们将任何所需的依赖项注入到解析器中,或者为了为API提供查询而添加任何其他内容。回忆一下,这正是我们DAO struct的目的。我们的DAO struct包装了DuckDB数据驱动程序,该驱动程序具有与我们内存表的连接,并负责将数据的API请求“翻译”为SQL查询。因此,我们只需将一个初始化的DAO对象注入到解析器中,以便解析器可以使用它来执行查询。
// resolver.gotype Resolver struct { dao *data.DAO // 将DAO注入}func NewResolver(dao *data.DAO) *Resolver { return &Resolver{dao: dao} //初始化DAO}
接下来生成的文件(每次运行gqlgen generate过程都会重新生成)是schema.resolvers.go,它是resolver方法的实现。生成的schema.resolvers文件实际上包含了在schema中声明的API函数的方法签名。在我们的情况下,它将包含这两个方法
// schema.resolvers.gofunc (r *queryResolver) GetUsersByEmail(ctx context.Context, email string) ([]*model.User, error) {}func (s *DAO) GetUsers() ([]*model.User, error) {}
要实现这些函数,我们首先需要在DAO结构中有对应的方法,但让我们先实现一个例子,然后完成所需的DAO代码。
// schema.resolvers.gofunc (r *queryResolver) Users(ctx context.Context) ([]*model.User, error) { res, err := r.dao.GetUsers() if err != nil { log.Printf("error getting users: %v", err) return nil, err } return res, nil}
正如您所见,因为我们将DAO注入到resolver结构中,我们可以简单地使用我们的resolver调用其函数。这种结构使得API层的代码非常干净和简洁。
现在让我们在DAO结构中编写所需函数的实际实现。如下所示,所需的代码非常简单。虽然我在使用一些辅助函数(您可以在附带的github存储库中看到),但GetUsers()函数只是在我们的内存DuckDB表上执行SQL查询并构建用户列表(回想一下,model.User结构是由gqlgen使用我们的schema生成的)。
//dao.gofunc (s *DAO) GetUsers() ([]*model.User, error) {//QryAllUsers := "select * from users" rows, err := s.driver.Query(QryAllUsers) if err != nil { return nil, err } defer rows.Close() resultset, err := sqlhelper.ResultSetFromRows(rows) if err != nil { return nil, err } users := make([]*model.User, 0) for _, row := range resultset { user := newUserFromRow(row) // populate the user struct users = append(users, user) } return users, nil}
现在我们基本上拥有了所有需要链接在一起的必需层。也就是说,一个数据驱动程序结构(封装了一个db连接),它被注入到实现并作为所有必需API函数的接口的DAO结构中,这些函数由resolver(我们的API处理程序)调用。这些组件之间的关系和角色在主server.go文件中的链接方式清晰地表达出来,它引导我们的服务及其依赖项。
// server.godataDriver := data.NewDuckDBDriver(awsCred)dataStore := data.NewStore(dataDriver) resolver := graph.NewResolver(dataStore)srv := handler.NewDefaultServer(graph.NewExecutableSchema(graph.Config{Resolvers: resolver}))http.Handle("/query", srv)
当我们的服务初始化时,我们首先初始化我们的驱动程序,它获取与我们的DuckDB内存存储的连接。接下来,我们将驱动程序注入到创建DAO的NewStore方法中,并使用驱动程序将数据从parquet文件加载到内存中。最后,我们将DAO结构注入到API处理程序中,在处理API请求时调用其函数。
5. 结论
本文的目的是提供一种用于为轻量级客户端启用数据湖表的HTTP API访问的替代方法。这个用例越来越常见,通常需要添加许多组件、监控和资源到我们的流水线中。在本文中,我提出了一种更简单的替代方法,我相信它适用于许多用例。我演示了如何使用DuckDB的查询性能和扩展来使我们的服务能够从远程对象存储加载数据,将其保存在内存中的关系表中,并能够以子秒级别的延迟进行查询。更一般地说,我试图通过一个例子展示DuckDB扩展的强大功能,以及它可以嵌入的简易性。
希望这对您有用!
**除非特别注明,所有图片均由作者提供