构建基于 Python 的数据湖
本文介绍如何使用 MinIO、PyIceberge、PyArrow 和 Postgres,构建一个基于 Python 的数据湖。

数据湖已成为现代数据生态系统中的重要组成部分,使组织能够存储和分析来自多样化来源的大量数据。数据湖的主要优势之一是其能够以“原样”存储数据,而无需预定义的模式。这种灵活性,加上使用目录服务来发现数据结构和形式,大大减少了获得洞察所需的时间和精力。
之前,我们探索了使用 MinIO、Hive Metastore/Nessie、Trino/Dremio 等工具设置数据湖的方法。虽然 Hive Metastore 和 Nessie 目录功能强大且适合大规模数据操作,但它们的复杂性可能对较小的设置来说过于繁重。今天,我们将专注于一个基于 Python 的数据湖实现,简化配置过程,使其非常适合小型到中型部署。
完整的代码可在 GitHub 仓库 中找到,并在 YouTube 上有代码演示 视频。
1、为什么选择 Python?
Python 的广泛库和易用性使其成为构建和管理数据湖的理想选择。在这个设置中,我们将使用:
- PyIceberg:一个用于访问 Iceberg 表的 Python 库,无需 JVM。
- PyArrow:Apache Arrow 的 Python API,支持高效的内存列式数据存储。
- SQL Catalog:将元数据存储在 PostgreSQL 数据库中,同时与对象存储(如 S3)交互以存储数据文件。
2、设置数据湖
2.1 安装所需的库
首先,在您的 Python 环境中安装必要的库:
pip install pyiceberg pyarrow pandas psycopg2
2.2 定义配置
我们将设置目录架构、表名和 S3 存储凭据。在这个示例中,我们将使用 MinIO 作为兼容 S3 的存储,它运行在一个 Docker 容器中(已在之前的教程中设置)。MinIO 将充当我们的对象存储,而 PostgreSQL 将作为目录后端。
# 目录
schemaname = "dw"
tablename = "sales"
# 访问密钥和秘密密钥
pwd = os.environ['S3PASS']
uid = os.environ['S3ACCESS']
s3location = "s3://pyiceberg"
# PostgreSQL 凭据
pswd = os.environ['PGPASS']
puid = os.environ['PGUID']
2.3 创建 SQL 目录
使用 PyIceberg 和 PyArrow,我们可以配置 SQL 目录以与对象存储交互。目录将元数据存储在 PostgreSQL 中,并指向 MinIO 进行数据存储。如下所示:
# 设置目录
catalog = SqlCatalog(
"docs",
**{
"uri": f"postgresql+psycopg2://{puid}:{pswd}[@192](http://twitter.com/192).168.1.39:5432/iceberg",
"warehouse": "s3://pyiceberg",
"s3.endpoint": "<http://localhost:9000>",
"py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
"s3.access-key-id": uid,
"s3.secret-access-key": pwd,
},
)
2.4 加载和转换数据
让我们从 Parquet 文件读取数据到 Pandas DataFrame 中。可以使用 Pandas 进行转换或清理,然后将 DataFrame 转换为 PyArrow 表以加载到 Iceberg 中。
import pandas as pd
df = pd.read_parquet('data\\sales_summary.parquet', engine='pyarrow')
df.head()
# 将 Pandas 转换为 PyArrow
tbl = pa.Table.from_pandas(df)
# 将数据追加到表中
table.append(tbl)
2.5 创建模式和 Iceberg 表
定义模式并在目录中创建表。如果表已存在,则该过程会优雅地处理。
catalog.create_namespace_if_not_exists(schemaname)
#
try:
table = catalog.create_table(
f'{schemaname}.{tablename}',
schema=tbl.schema,
location=s3location, )
except:
print("表已存在,追加 " + tablename)
table = catalog.load_table(f'{schemaname}.{tablename}')
2.6 查询和过滤数据
Iceberg 表达式有助于查询和过滤数据。例如,您可以使用比较、小于和大于运算符来过滤行。然而,使用 Pandas 进行数据转换会更加容易。
from pyiceberg.expressions import And, LessThan, GreaterThan
print(
table \
.scan(
row_filter=And(
GreaterThan('SalesAmount', 5),
LessThan('SalesAmount', 15),
),
selected_fields=['SalesOrderNumber', 'ProductName']
) \
.to_arrow() \
.to_string(preview_cols=10)
)
2.7 模式演化
模式演化是 Iceberg 的关键特性之一。让我们向表模式中添加新列并将其持久化到 Iceberg 表中:
# 查询要更新的行。
id_1_tbl = table \
.scan(row_filter=EqualTo('SalesOrderNumber', 'SO43697')) \
.to_arrow()
#
# 确定值列的索引并检索列的字段
value_column_index = id_1_tbl.column_names.index('SalesAmount')
value_column_field = id_1_tbl.field(value_column_index)
# 通过替换值列修改结果的 PyArrow 表
id_1_tbl = id_1_tbl.set_column(
value_column_index,
value_column_field,
pa.array([3400.00], type=pa.float64()) # 确保数据类型一致
)
# 通过覆盖行更新 Iceberg 表
table.overwrite(
df=id_1_tbl,
overwrite_filter=EqualTo('SalesOrderNumber', 'SO43697')
)
2.8 探索 S3 桶
Iceberg 在 S3 桶中存储数据和元数据。导航到您的 MinIO 界面或使用 MinIO Python 库查看 Iceberg 如何组织这些文件。
3、使用 DuckDB 的高级操作
对于 SQL 爱好者,DuckDB 可以直接查询 Pandas DataFrame。如下所示:
import duckdb
import pandas as pd
pd.options.display.float_format = '{:,.0f}'.format
# 使用 SQL 查询 DataFrame
result = duckdb.query("SELECT sum(SalesAmount) FROM df").to_df()
4、结束语
本教程展示了如何配置和与基于 Python 的数据湖进行交互,而不依赖于 JVM 工具。使用 PyIceberg、PyArrow 和由 PostgreSQL 支持的 SQL 目录,我们创建了一个轻量级但功能强大的数据湖设置。这种方法特别适合中小型部署,其中简单性和易用性至关重要。
原文链接:Building a Python-Based Data Lake
汇智网翻译整理,转载请标明出处
