构建基于 Python 的数据湖

本文介绍如何使用 MinIO、PyIceberge、PyArrow 和 Postgres,构建一个基于 Python 的数据湖。

构建基于 Python 的数据湖

数据湖已成为现代数据生态系统中的重要组成部分,使组织能够存储和分析来自多样化来源的大量数据。数据湖的主要优势之一是其能够以“原样”存储数据,而无需预定义的模式。这种灵活性,加上使用目录服务来发现数据结构和形式,大大减少了获得洞察所需的时间和精力。

之前,我们探索了使用 MinIO、Hive Metastore/Nessie、Trino/Dremio 等工具设置数据湖的方法。虽然 Hive Metastore 和 Nessie 目录功能强大且适合大规模数据操作,但它们的复杂性可能对较小的设置来说过于繁重。今天,我们将专注于一个基于 Python 的数据湖实现,简化配置过程,使其非常适合小型到中型部署。

完整的代码可在 GitHub 仓库 中找到,并在 YouTube 上有代码演示 视频

1、为什么选择 Python?

Python 的广泛库和易用性使其成为构建和管理数据湖的理想选择。在这个设置中,我们将使用:

  • PyIceberg:一个用于访问 Iceberg 表的 Python 库,无需 JVM。
  • PyArrow:Apache Arrow 的 Python API,支持高效的内存列式数据存储。
  • SQL Catalog:将元数据存储在 PostgreSQL 数据库中,同时与对象存储(如 S3)交互以存储数据文件。

2、设置数据湖

2.1 安装所需的库

首先,在您的 Python 环境中安装必要的库:

pip install pyiceberg pyarrow pandas psycopg2

2.2 定义配置

我们将设置目录架构、表名和 S3 存储凭据。在这个示例中,我们将使用 MinIO 作为兼容 S3 的存储,它运行在一个 Docker 容器中(已在之前的教程中设置)。MinIO 将充当我们的对象存储,而 PostgreSQL 将作为目录后端。

# 目录  
schemaname = "dw"   
tablename = "sales"  
# 访问密钥和秘密密钥  
pwd = os.environ['S3PASS']  
uid = os.environ['S3ACCESS']  
s3location = "s3://pyiceberg"  
# PostgreSQL 凭据  
pswd = os.environ['PGPASS']  
puid = os.environ['PGUID']

2.3 创建 SQL 目录

使用 PyIceberg 和 PyArrow,我们可以配置 SQL 目录以与对象存储交互。目录将元数据存储在 PostgreSQL 中,并指向 MinIO 进行数据存储。如下所示:

# 设置目录  
catalog = SqlCatalog(  
    "docs",  
    **{  
        "uri": f"postgresql+psycopg2://{puid}:{pswd}[@192](http://twitter.com/192).168.1.39:5432/iceberg",  
        "warehouse": "s3://pyiceberg",  
        "s3.endpoint": "<http://localhost:9000>",  
        "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",  
        "s3.access-key-id": uid,  
        "s3.secret-access-key": pwd,  
    },  
)

2.4 加载和转换数据

让我们从 Parquet 文件读取数据到 Pandas DataFrame 中。可以使用 Pandas 进行转换或清理,然后将 DataFrame 转换为 PyArrow 表以加载到 Iceberg 中。

import pandas as pd  
df = pd.read_parquet('data\\sales_summary.parquet', engine='pyarrow')  
df.head()  
# 将 Pandas 转换为 PyArrow  
tbl = pa.Table.from_pandas(df)  
# 将数据追加到表中  
table.append(tbl)

2.5 创建模式和 Iceberg 表

定义模式并在目录中创建表。如果表已存在,则该过程会优雅地处理。

catalog.create_namespace_if_not_exists(schemaname)  
#  
try:  
    table = catalog.create_table(  
        f'{schemaname}.{tablename}',  
           schema=tbl.schema,  
           location=s3location, )  
except:  
    print("表已存在,追加 " + tablename)      
    table = catalog.load_table(f'{schemaname}.{tablename}')

2.6 查询和过滤数据

Iceberg 表达式有助于查询和过滤数据。例如,您可以使用比较、小于和大于运算符来过滤行。然而,使用 Pandas 进行数据转换会更加容易。

from pyiceberg.expressions import And, LessThan, GreaterThan  

print(  
  table \  
    .scan(  
      row_filter=And(  
        GreaterThan('SalesAmount', 5),  
        LessThan('SalesAmount', 15),  
      ),  
      selected_fields=['SalesOrderNumber', 'ProductName']  
    ) \  
    .to_arrow() \  
    .to_string(preview_cols=10)  
)

2.7 模式演化

模式演化是 Iceberg 的关键特性之一。让我们向表模式中添加新列并将其持久化到 Iceberg 表中:

# 查询要更新的行。  
id_1_tbl = table \  
  .scan(row_filter=EqualTo('SalesOrderNumber', 'SO43697')) \  
  .to_arrow()  
#  
# 确定值列的索引并检索列的字段  
value_column_index = id_1_tbl.column_names.index('SalesAmount')  
value_column_field = id_1_tbl.field(value_column_index)
# 通过替换值列修改结果的 PyArrow 表  
id_1_tbl = id_1_tbl.set_column(  
  value_column_index,   
  value_column_field,   
  pa.array([3400.00], type=pa.float64()) # 确保数据类型一致  
)  
# 通过覆盖行更新 Iceberg 表  
table.overwrite(  
  df=id_1_tbl,  
  overwrite_filter=EqualTo('SalesOrderNumber', 'SO43697')  
)

2.8 探索 S3 桶

Iceberg 在 S3 桶中存储数据和元数据。导航到您的 MinIO 界面或使用 MinIO Python 库查看 Iceberg 如何组织这些文件。

3、使用 DuckDB 的高级操作

对于 SQL 爱好者,DuckDB 可以直接查询 Pandas DataFrame。如下所示:

import duckdb  
import pandas as pd  
pd.options.display.float_format = '{:,.0f}'.format  
# 使用 SQL 查询 DataFrame  
result = duckdb.query("SELECT sum(SalesAmount) FROM df").to_df()

4、结束语

本教程展示了如何配置和与基于 Python 的数据湖进行交互,而不依赖于 JVM 工具。使用 PyIceberg、PyArrow 和由 PostgreSQL 支持的 SQL 目录,我们创建了一个轻量级但功能强大的数据湖设置。这种方法特别适合中小型部署,其中简单性和易用性至关重要。


原文链接:Building a Python-Based Data Lake

汇智网翻译整理,转载请标明出处