在线零售多模态AI搜索引擎

“哇,那件衬衫看起来真棒。我想买一件一样的!”没有品牌名称,没有时尚术语,甚至没有布料。只是一个猜测,然后在亚马逊上快速搜索。立刻:成千上万的选择,各种尺寸,每种颜色。点击一下,第二天就送到你家门口。这就是AI驱动购物的魔力和客户满意度的新面貌。

最近,AI改变了许多行业的格局,而电子商务是最大的受益者之一。过去,我通过颜色和品牌进行搜索,经常在找不到合适的匹配时放弃搜索。现在,我可以准确地描述我想要的东西,并立即得到它——这就是AI带来的变化。一个高效且个性化的搜索引擎是成功在线业务最关键的功能之一。

AI超越了简单的关键词匹配,通过分析用户意图、上下文和行为,从而减少不相关的结果和被放弃的搜索。根据最近的研究,这种进步可以将收入提高多达40%,并提升客户满意度。

1、超越文本的搜索引擎

现代搜索引擎进一步让您可以使用相似的图像与文本查询一起进行搜索。喜欢Google的Circle to Search吗?你不是唯一一个。多模态搜索利用不同类型的数据显示,帮助您更快、更准确地找到所需内容。然而,构建这些系统并不是一项简单的任务。让我们使用来自电子商务网站Shein的数据集来仔细看看。

1.1 异构产品数据

Shein数据集中的每个产品都包含文本描述、图像、结构化属性(如颜色、类别和品牌)以及价格等混合信息。例如,一行可能有:

这种多样性要求一个多模态搜索系统,能够理解和索引所有类型的内容——文本、视觉和结构化数据。用户可以搜索任何东西,从一个短语到一张照片。

1.2 查询歧义

用户可能会输入像 “black dress”“comfy top.” 这样的内容。但什么是 comfy?棉质?宽松?无袖?数据集中并不总是用与用户相同的术语标记产品。这种语义差距使得除非系统理解意图,而不是仅仅关键词,否则很难返回相关结果。

1.3 可扩展性

即使是一个过滤后的Shein数据集也包含数万个SKU,而生产系统可能有数百万个。您的搜索基础设施必须支持实时检索。向量索引、混合搜索模型和高效的过滤管道对于确保结果在毫秒内交付至关重要。

1.4 个性化

两个输入 “floral dress” 的用户可能想要非常不同的东西:一个喜欢波西米亚长裙,另一个喜欢短款滑板风格。为了解决这个问题,搜索引擎需要实时整合用户行为和偏好,使结果不仅与查询匹配,而且与个人匹配。

1.5 元数据过滤

电子商务用户通常希望使用过滤器来细化他们的搜索:价格范围、颜色、尺寸、材料、袖长等。在Shein数据集中,这些是作为结构化字段提供的,但将其集成到无缝的分面搜索体验中而不减慢检索速度并非易事。

1.6 实时需求

搜索引擎不能慢。每次查询都必须击中一个大规模索引,跨多种数据类型进行匹配,应用过滤器,并且可选地重新排序结果,在200ms以内,即使在高峰负载期间也是如此。

解决这些挑战需要结合自然语言处理、计算机视觉、向量搜索、索引调优和实时个性化。尽管Shein数据集只是一个样本,但它展示了现代电子商务搜索引擎必须导航的混乱、多模态世界,以及为什么做好这一点是一项重要的工程壮举。

2、搜索引擎中的向量

为了提供更智能、更相关的结果,现代电子商务平台使用向量搜索或语义搜索,这是一种技术,帮助搜索系统理解意义,而不仅仅是匹配关键词。这是通过向量嵌入实现的。

2.1 什么是向量嵌入?

本质上,向量搜索将产品数据如标题、描述和图像转换为称为向量嵌入(或密集向量)的数值表示。这些嵌入捕捉语义意义,因此相似的项目在多维空间中位置更接近。

 文本: "White Floral Dress"  
                      ↓  
            [0.12, 0.75, -0.33, ...]  
                  ← 密集向量 →

例如,想象将几件时尚产品如连衣裙、包和鞋子嵌入到二维空间中进行可视化:

  • “Red satin evening gown” 和 “Black sleeveless cocktail dress” 聚在一起,因为它们都是正式服装,尽管它们的关键词不同。
  • “White canvas tote bag” 和 “Beige leather shoulder bag” 被放在彼此附近,因为它们属于同一类别并具有类似功能。
  • 鞋子如 “Chunky white sneakers”, “Black suede ankle boots”, 和 “Brown leather loafers” 形成自己的聚类,按风格和材料分组。

这可能看起来像这样:

实际上,嵌入不是二维表示,而是高维数字数组。

2.2 图像如何融入

文本不是唯一可以转换为嵌入的数据类型;图像也可以。关键是使用视觉-语言模型,如CLIP(Contrastive Language–Image Pretraining),它被训练将图像及其对应的描述映射到共享的嵌入空间中。

这意味着一张“红色高跟鞋”的图片和“红色高跟鞋”这句话会被嵌入到类似的向量表示中。当您上传一张照片时,系统可以找到其在产品描述中的最接近匹配,从而实现反向图像搜索,类似于Google Lens或Circle to Search。

比较向量的一种常见方法是余弦相似度,计算为向量的点积除以它们的大小的乘积。结果越接近1,向量就越相似:

C(A,B) = cos(θ) = A.B / ||A|| ||B||

2.3 什么是稀疏向量?

虽然密集向量在捕捉语义意义方面表现出色,但它们并不总是适合精确的关键词匹配,特别是对于简短的查询或特定术语。这就是稀疏向量发挥作用的地方。

稀疏向量是高维表示,其中大多数值为零,但少数关键维度是活跃的,直接对应于重要的关键词或标记。你可以把它们看作是传统基于关键词的搜索方法(如BM25或TF-IDF)的高级演变,适应现代检索引擎。

文本: "White Floral Dress"  
            ↓  
[0, 0, 0, 0, 1.2, 0, 0, 0, 0, 0.9, 0, 0, 0, 1.5, 0, 0, 0, 0, 0, 0, ...]  
              ↑                ↑             ↑  
           "white"          "floral"      "dress"  
            (1.2)            (0.9)         (1.5)

“Nike Air Jordans”“Adidas Messi F50” 这样的产品标题通常很短,包含很多可搜索的术语。稀疏向量模型(如BM25或SPLADE)保留了这种词级精度,确保查询返回确切的产品,即使它们缺乏丰富的描述。

稀疏向量搜索通常通过算法如BM25 (Best Match 25) 实现。BM25通过考虑两个因素来确定给定查询中最相关的文档:

  • 词频 (TF):查询词在每个文档中出现的频率?(越多越好。)
  • 逆文档频率 (IDF):查询词在整个文档集中的罕见程度?(越少见越好。)

文档 D 对于查询 Q 的 BM25 分数是各个查询词得分的总和:

BM25(D, Q) = ∑(IDF(q) * ((TF(q, D) * (k1 + 1)) / (TF(q, D) + k1 * (1 — b + b * (|D| / avgdl)))))

其中:

  • IDF (q) = 逆文档频率
  • TF (q,D) = 词频
  • |D| = 文档长度
  • avgdl = 平均文档长度
  • k1b = 可调节常数

3、有效载荷和元数据过滤

在电子商务中,用户搜索的意图往往超越关键词。例如:

  • “价格低于 ₹2000 的红色运动鞋”
  • “H&M 的棉质上衣,尺码 M”

为了支持这种基于属性的搜索,你需要的不仅仅是嵌入;你需要元数据过滤。元数据可以包括与产品相关的任何结构化属性,如:

  • 品牌: H&M, Nike, Shein
  • 颜色: 黑色, 白色, 红色
  • 类别: 连衣裙, 包, 鞋子
  • 价格: ₹499, ₹1299
  • 尺码: S, M, L, XL
  • 材质: 棉, 聚酯纤维, 皮革

这种结构化信息通常存储为有效载荷,即附加到向量数据库中每个产品的键值字典。虽然嵌入处理语义相似性,但元数据过滤强制执行硬性约束,确保用户在询问夏季凉鞋时不会看到冬季靴子。

例如,在一个向量搜索系统中,你可以检索与 “白色夏季连衣裙” 语义相似的物品(通过密集向量),然后按以下条件过滤:

brand = SHEIN; price < 1500; size = M

这种组合允许一些基于规则的精度,从而减少不相关结果的可能性。

以下是在Qdrant格式中的示例有效载荷:

{  
  "id": "SKU123",  
  "vector": [0.12, 0.75, -0.33, ...],  
  "payload": {  
    "brand": "SHEIN",  
    "color": "White",  
    "category": "Dress",  
    "price": 1299,  
    "size": ["M", "L"],  
    "material": "Cotton"  
  }  
}

在搜索时,你可以使用这些过滤器以及向量搜索。

filter = {  
  "must": [  
    {"key": "category", "match": {"value": "Dress"}},  
    {"key": "price", "range": {"lt": 1500}},  
    {"key": "size", "match": {"value": "M"}}  
  ]  
}

你将在后面的教程中看到这个效果。

3.1 质量重排序

重排序是现代搜索系统中的一个可选但强大的后处理步骤。它对向量搜索返回的初始结果集进行优化。虽然向量相似性可以高效地根据嵌入距离检索出top-k最相关的候选者,但它可能无法完全捕捉到用户意图、上下文或领域特定的相关性等细微差别。

重排序通过获取top-k结果并将它们以及原始查询传递给一个更强大的模型(通常是交叉编码器或基于Transformer的重排序器)来工作。这个模型会同时检查查询和每个结果(而不是独立地)并根据更深层次的语义理解分配相关性分数。

然后根据这些分数对候选者进行重新排序,通常会将最精确和上下文相关的结果推到顶部。

现在你已经了解了这些概念,让我们继续构建一个多模态搜索引擎。你将使用 Qdrant 作为向量数据库。

4、开始

4.1 设置Qdrant

如果你在本地工作,请确保安装了 Docker 并运行了Docker引擎。Qdrant 可以通过拉取其Docker镜像来安装:

! docker pull qdrant/qdrant

然后运行Qdrant Docker容器:

! docker run -p 6333:6333 \  
    -v $(pwd)/qdrant_storage:/qdrant/storage \  
    qdrant/qdrant

另一种更方便的选择是使用 Qdrant Cloud。登录云平台,创建一个集群,并获取你的API密钥。

安装所需的Python库。

! pip install qdrant-client datasets fastembed transformers qdrant-client[fastembed] openai

现在你可以开始客户端了。

from qdrant_client import models, QdrantClient  
from google.colab import userdata  

client = QdrantClient(  
    url="YOUR_QDRANT_CLOUD_INSTANCE_URL",  
    api_key=userdata.get('qdrant_api_key'),  
)

4.2 数据摄入

首先准备数据库。我们将使用之前讨论过的Shein 数据集

import pandas as pd  
path = "https://raw.githubusercontent.com/luminati-io/eCommerce-dataset-samples/main/shein-products.csv"  
df = pd.read_csv(path)
Index(['product_name', 'description', 'initial_price', 'final_price',  
       'currency', 'in_stock', 'color', 'size', 'reviews_count', 'main_image',  
       'category_url', 'url', 'category_tree', 'country_code', 'domain',  
       'image_count', 'image_urls', 'model_number', 'offers',  
       'other_attributes', 'product_id', 'rating', 'related_products',  
       'root_category', 'top_reviews', 'category', 'brand',  
       'all_available_sizes'],  
      dtype='object')

你可以删除某些重要字段中包含空值的行。

df = df.dropna(subset=['color'])

这里出现的另一个问题是产品描述中包含短语“Free Returns ✓ Free Shipping✓”。由于这对搜索无关紧要,从所有行中删除它。嵌入应基于干净、相关文本以确保准确检索。


df['description'] = df['description'].str.replace('Free Returns ✓ Free Shipping✓.', '', regex=False).str.strip()

为了简化,这里只进行了最小的数据清理。

4.3 属性选择与嵌入策略

一旦数据准备好了,决定哪些字段要包含或排除,并确定如何生成嵌入和过滤器以实现最佳准确性和效率。

包含最多文本内容和上下文的字段最适合生成密集向量。在这个数据集中,通过结合以下内容可以获得最高质量的嵌入:

product_name + description + category

documents = []  
for _, row in df.iterrows():  
    doc_text = ""  
    if pd.notna(row.get('product_name')):  
        doc_text += str(row['product_name'])  
    if pd.notna(row.get('description')):  
        if doc_text:  
            doc_text += " " + str(row['description'])  
        else:  
            doc_text = str(row['description'])  
    if pd.notna(row.get('category')):  
        doc_text += " " + str(row['category'])  

    documents.append(doc_text)

对于图像,数据集包含一个 main_image 列和一个 image_urls 列,包含额外的产品图像。将所有图像下载到本地以便以后计算图像嵌入

import os  
import urllib  
import pandas as pd  
import numpy as np  
import json  
from typing import Optional  

def download_images_for_row(row, base_folder="data/images") -> Optional[str]:  
    folder_name = str(row.name)  
    folder_path = os.path.join(base_folder, folder_name)  
    os.makedirs(folder_path, exist_ok=True)  

    urls = []  
    # Handle main_image  
    main_image = row.get("main_image")  
    if isinstance(main_image, str) and main_image.startswith("http"):  
        urls.append(main_image)  

    # Handle image_urls (should be a stringified list)  
    image_urls_raw = row.get("image_urls")  
    try:  
        image_urls = json.loads(image_urls_raw) if isinstance(image_urls_raw, str) else []  
        if isinstance(image_urls, list):  
            urls.extend(image_urls)  
    except Exception as e:  
        print(f"Failed to parse image_urls: {e}")  

    # Download images  
    for i, url in enumerate(urls):  
        try:  
            ext = os.path.splitext(url)[-1].split("?")[0]  
            filename = f"image_{i}{ext or '.jpg'}"  
            filepath = os.path.join(folder_path, filename)  
            if not os.path.exists(filepath):  
                urllib.request.urlretrieve(url, filepath)  
        except Exception as e:  
            print(f"Failed to download {url}: {e}")  

    if os.listdir(folder_path):  # At least one image downloaded  
        return folder_path  
    return None  

# Apply to each row  
df["image_folder_path"] = df.apply(download_images_for_row, axis=1)  

# Drop rows with failed downloads  
df = df.dropna(subset=["image_folder_path"])  

# Preview sample  
display(df[["main_image", "image_folder_path"]].sample(5).T)

price, color, size, rating,brand 这样的字段是优秀的元数据属性。这些可以从自然语言查询中提取,并在搜索过程中作为过滤器应用。

4.4 创建嵌入

为了计算嵌入,使用 fastembed,这是一个由Qdrant开发的库,可以高效运行量化HuggingFace模型。

from fastembed import TextEmbedding, LateInteractionTextEmbedding, SparseTextEmbedding, ImageEmbedding

密集嵌入

对于密集嵌入,可以使用常见的SOTA模型,如 all-MiniLM-L6-v2

dense_embedding_model = TextEmbedding("sentence-transformers/all-MiniLM-L6-v2")  
dense_embeddings = list(dense_embedding_model.embed(doc for doc in documents))

稀疏嵌入

你可以使用SPLADE或MiniCOIL来进行稀疏嵌入。MiniCOIL (mini-Contextualized Inverted Lists) 是一种用于文本检索的稀疏神经嵌入模型。它为每个单词根生成四维嵌入,捕捉单词的意义。这些意义嵌入然后组合成输入文本的词袋(BoW)表示。如果一个词不在词汇表中,它的权重仅由其BM25分数决定。最终的稀疏表示使用BM25进行术语加权。

minicoil_embedding_model = SparseTextEmbedding("Qdrant/minicoil-v1")  
minicoil_embeddings = list(minicoil_embedding_model.embed(doc for doc in documents))

图像嵌入

在电子商务网站上,产品图像通常从各种角度和位置拍摄。由于所有这些图像都包含相关信息,你可以采用平均方法来获得最佳的整体表示。然而,这种方法假设所有图像的信息量相同,并且没有过度噪声。在此步骤中,我们将使用前面讨论的CLIP视觉变换器模型。

clip_embedding_model = ImageEmbedding(model_name="Qdrant/clip-ViT-B-32-vision")  

def get_average_image_embedding(folder_path: str) -> Optional[np.ndarray]:  
    """Get average embedding of all images in a folder"""  
    if not os.path.exists(folder_path):  
        return None  

    image_files = [f for f in os.listdir(folder_path)  
                   if f.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp', '.gif', '.webp'))]  

    if not image_files:  
        return None  

    image_paths = [os.path.join(folder_path, f) for f in image_files]  

    try:  
        # Get embeddings for all images in the folder  
        embeddings = list(clip_embedding_model.embed(image_paths))  

        if embeddings:  
            # Convert to numpy arrays and compute average  
            embedding_arrays = [np.array(emb) for emb in embeddings]  
            average_embedding = np.mean(embedding_arrays, axis=0)  
            return average_embedding  

    except Exception as e:  
        print(f"Error processing images in {folder_path}: {e}")  
        return None  

    return None  

image_embeddings = []  
for _, row in df.iterrows():  
    folder_path = row['image_folder_path']  
    avg_embedding = get_average_image_embedding(folder_path)  
    image_embeddings.append(avg_embedding)  

# Filter out None values and keep track of valid indices  
valid_indices = [i for i, emb in enumerate(image_embeddings) if emb is not None]  
valid_image_embeddings = [image_embeddings[i] for i in valid_indices]

如果只有 main_image 是相关的,你可以使用加权平均,其中 main_image 获得更多权重,或者简单地使用 main_image 单独:

weighted_avg = (0.6 * main_emb + 0.2 * side_emb + 0.2 * detail_emb)

另一种替代方法是连接所有嵌入,然后使用PCA或投影头降低维度。

from sklearn.decomposition import PCA  

def get_concatenated_image_embedding(folder_path: str) -> Optional[np.ndarray]:  
    """Concatenate embeddings of all images in a folder"""  
    if not os.path.exists(folder_path):  
        return None  

    image_files = [f for f in os.listdir(folder_path)  
                   if f.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp', '.gif', '.webp'))]  

    if not image_files:  
        return None  

    image_paths = [os.path.join(folder_path, f) for f in image_files]  

    try:  
        embeddings = list(clip_embedding_model.embed(image_paths))  

        if embeddings:  
            # Concatenate all embeddings into one long vector  
            embedding_arrays = [np.array(emb) for emb in embeddings]  
            concatenated_embedding = np.concatenate(embedding_arrays, axis=0)  
            return concatenated_embedding  

    except Exception as e:  
        print(f"Error processing images in {folder_path}: {e}")  
        return None  

    return None  

concatenated_embeddings = []  
for _, row in df.iterrows():  
    folder_path = row['image_folder_path']  
    concat_emb = get_concatenated_image_embedding(folder_path)  
    concatenated_embeddings.append(concat_emb)  

valid_indices = [i for i, emb in enumerate(concatenated_embeddings) if emb is not None]  
valid_concat_embeddings = [concatenated_embeddings[i] for i in valid_indices]  

pca = PCA(n_components=512)  # Choose your target dimension  
reduced_embeddings = pca.fit_transform(valid_concat_embeddings)

用于重排序的嵌入

最后,计算用于重排序的嵌入。在这里,我们将使用流行的基于BERT的 colbertv2.0 模型。

late_interaction_embedding_model = LateInteractionTextEmbedding("colbert-ir/colbertv2.0")  
late_interaction_embeddings = list(late_interaction_embedding_model.embed(doc for doc in documents))  

df["dense_embedding"] = dense_embeddings  
df["image_embedding"] = image_embeddings  
df["sparse_embedding"] = minicoil_embeddings  
df["late_interaction_embedding"] = late_interaction_embeddings

以下是嵌入形状的样子。


密集嵌入: (384,)  
miniCOIL 嵌入: () 稀疏维度  
图像嵌入: (512,)  
晚期交互嵌入: (180, 128)

— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —

一旦准备好,就可以将数据推送到向量数据库中。第一步是创建一个集合。这应该指定我们正在推送的数据和维度,就像蓝图一样。

from qdrant_client.models import Distance, VectorParams, models  
client.recreate_collection(  
    "shein_products",  
    vectors_config={  
        "all-MiniLM-L6-v2": models.VectorParams(  
            size=len(dense_embeddings[0]),  
            distance=models.Distance.COSINE,  
        ),  
        "colbertv2.0": models.VectorParams(  
            size=len(late_interaction_embeddings[0][0]),  
            distance=models.Distance.COSINE,  
            multivector_config=models.MultiVectorConfig(  
                comparator=models.MultiVectorComparator.MAX_SIM,  
            ),  
            hnsw_config=models.HnswConfigDiff(m=0)  # Disable HNSW for reranking  
        ),  
        "clip": VectorParams(size=512, distance=Distance.COSINE)  
    },  
    sparse_vectors_config={  
        "minicoil": models.SparseVectorParams(  
            modifier=models.Modifier.IDF  
        )  
    },  
    quantization_config=models.ScalarQuantization(  
        scalar=models.ScalarQuantizationConfig(  
            type=models.ScalarType.INT8,  
            quantile=0.99,  
            always_ram=True,  
        ),  
    ),  
)

4.5 向量量化

正如代码所示,我使用了向量量化来更高效地存储嵌入。随着数据库的增长,可能会有存储空间和搜索延迟的限制。与其将每个向量存储为高精度的 float32,量化将其转换为较低位的格式,通常是 int8 甚至 1 位二进制表示,大大减少了内存使用,同时保留了大部分语义相似性。

标量量化独立处理向量的每个维度,将高精度浮点值映射到一个小整数范围内的最近的 bin。这类似于将 float32 转换为 int8 对于每个元素。仅标量量化就可以将嵌入大小减少高达 4 倍,而检索 性能 通常保持在 99% 以上。

4.6 有效载荷索引

在您期望经常进行过滤的任何元数据字段上创建有效载荷索引。例如,category, brand, price, color, size, 或 material。这使得过滤更快,并且有助于有效地将过滤器与向量搜索结果结合起来。

from qdrant_client.models import PayloadSchemaType  

client.create_payload_index(  
    collection_name="shein_products",  
    field_name="color",  
    field_schema=PayloadSchemaType.KEYWORD  # keyword (for string match)  
)  

client.create_payload_index(  
    collection_name="shein_products",  
    field_name="final_price",  
    field_schema=PayloadSchemaType.FLOAT # float (for range queries)  
)  

client.create_payload_index(  
    collection_name="shein_products",  
    field_name="category",  
    field_schema=models.TextIndexParams(  
        type="text",  
        tokenizer=models.TokenizerType.WORD,  
        min_token_len=2,  
        max_token_len=10,  
        lowercase=True,  
    ),  
)  

client.create_payload_index("shein_products", "rating", PayloadSchemaType.FLOAT)  
client.create_payload_index("shein_products", "brand", PayloadSchemaType.KEYWORD)  
client.create_payload_index("shein_products", "category", PayloadSchemaType.KEYWORD)  
client.create_payload_index("shein_products", "product_name", PayloadSchemaType.KEYWORD)  
client.create_payload_index("shein_products", "currency", PayloadSchemaType.KEYWORD)

最后,将数据分批上传到向量数据库中。

from qdrant_client.models import PointStruct, SparseVector, Document  

def upload_points_in_batches(df, documents, batch_size=20):  
    """Upload points in small batches to avoid payload size limits"""  
    # Calculate average length only for documents that correspond to rows in the filtered df  
    # This requires mapping back to the original documents  
    original_indices = df.index.tolist()  
    relevant_documents = [documents[i] for i in original_indices if i < len(documents)]  
    avg_documents_length = sum(len(document.split()) for document in relevant_documents) / len(relevant_documents) if relevant_documents else 0  

    total_uploaded = 0  
    batch_points = []  

    # Use enumerate to get a continuous index for accessing documents list  
    for enum_idx, (df_idx, row) in enumerate(df.iterrows()):  
        if row['image_embedding'] is None:  
            continue  

        # Use the original dataframe index to access the correct document  
        original_doc_idx = df_idx  
        if original_doc_idx >= len(documents):  
            print(f"Warning: Original index {original_doc_idx} out of bounds for documents list. Skipping.")  
            continue  
        dense_emb = row['dense_embedding'].tolist() if isinstance(row['dense_embedding'], np.ndarray) else row['dense_embedding']  
        late_interaction_emb = row['late_interaction_embedding'].tolist() if isinstance(row['late_interaction_embedding'], np.ndarray) else row['late_interaction_embedding']  
        image_emb = row['image_embedding'].tolist() if isinstance(row['image_embedding'], np.ndarray) else row['image_embedding']  
        minicoil_doc = Document(  
            text=documents[original_doc_idx], # Use original index for the correct document text  
            model="Qdrant/minicoil-v1",  
            options={"avg_len": avg_documents_length}  
        )  

        point = PointStruct(  
            id=original_doc_idx, # Use the original df index as the point ID  
            vector={  
                "all-MiniLM-L6-v2": dense_emb,  
                "minicoil": minicoil_doc,  
                "colbertv2.0": late_interaction_emb,  
                "clip": image_emb,  
            },  
            payload={  
                "document": documents[original_doc_idx], # Use original index for payload document  
                "product_name": str(row.get('product_name', '')),  
                "final_price": float(row.get('final_price', 0)) if pd.notna(row.get('final_price')) else 0.0,  
                "currency": str(row.get('currency', ''))[:10],  
                "rating": float(row.get('rating', 0)) if pd.notna(row.get('rating')) else 0.0,  
                "category": str(row.get('category', ''))[:100],  
                "brand": str(row.get('brand', ''))[:100],  
                "image_path": str(row.get('main_image', '')),  
                "color":  str(row.get('color', '')),  
                "image_url": str(row.get('main_image', ''))  
            }  
        )  
        batch_points.append(point)  
        # Upload when batch is full  
        if len(batch_points) >= batch_size:  
            client.upsert(collection_name="shein_products", points=batch_points, wait=True) # Added wait=True for robustness  
            total_uploaded += len(batch_points)  
            print(f"Uploaded batch: {total_uploaded} points")  
            batch_points = []  

    # Upload remaining points  
    if batch_points:  
        client.upsert(collection_name="shein_products", points=batch_points, wait=True)   
        total_uploaded += len(batch_points)  
        print(f"Final batch uploaded: {total_uploaded} total points")  

upload_points_in_batches(df, documents, batch_size=20)

4.7 查询数据库

在多模态搜索引擎中,用户可能使用文本、图像或两者的组合进行搜索。让我们看看是如何做到的。

对于纯文本查询,我们可以将查询转换为密集向量并与数据库中的密集向量进行比较。

query="Women's running shoes"  
dense_vectors = list(dense_embedding_model.query_embed([query]))[0]  

    prefetch = [  
        models.Prefetch(  
            query=dense_vectors,  
            using="all-MiniLM-L6-v2",  
            limit=limit,  
        ),  
        models.Prefetch(  
            query=models.Document(  
                text=query,  
                model="Qdrant/minicoil-v1"  
            ),  
            using="minicoil",  
            limit=limit,  
        ),  
    ]  

    results = client.query_points(  
        collection_name="shein_products",  
        query=dense_vectors,  
        prefetch=prefetch,  
        with_payload=True,  
        limit=limit,  
        using="all-MiniLM-L6-v2",    
    )
Score: 0.7146 | Product: Nike Women's Flex Experience Run 11 Next Nature Running Sneakers From Finish Line  
Score: 0.6983 | Product: Asics Women's Gel Kayano 30 Running Shoes In Blue Denim  
Score: 0.5131 | Product: Women's Glitter Strappy Wrapped Wedge Heel Platform Sandals

对于重排序,你需要使用ColBERT重排序模型对查询进行嵌入,并将其传递给query_points函数。

dense_vectors = list(dense_embedding_model.query_embed([query]))[0]  
    late_vectors = list(late_interaction_embedding_model.query_embed([query]))[0]  
    prefetch = [  
        models.Prefetch(  
            query=dense_vectors,  
            using="all-MiniLM-L6-v2",  
            limit=limit * 2,  
        ),  
        models.Prefetch(  
            query=models.Document(  
                text=query,  
                model="Qdrant/minicoil-v1"  
            ),  
            using="minicoil",  
            limit=limit * 2,  
        ),  
    ]  

    # Final reranking with late interaction  
    results = client.query_points(  
        "shein_products",  
        prefetch=prefetch,  
        query=late_vectors,  
        using="colbertv2.0",  
        with_payload=True,  
        limit=limit,  
    )
Score: 26.5242 | Product: Asics Women's Gel Kayano 30 Running Shoes In Blue Denim  
Score: 24.9610 | Product: Nike Women's Flex Experience Run 11 Next Nature Running Sneakers From Finish Line  
Score: 19.6362 | Product: Nike Jordan 13 Retro Low Bred GS 310811 027

如你所见,进行重排序已经消除了不相关的结果:例如,从“Running Shoes”的查询中移除了“Platform Sandals”。

同样,对于基于图像的查询,你可以使用:

query_image_path="/content/data/images/2/image_5.jpg"  
image_vectors = list(clip_embedding_model.embed([query_image_path]))[0]  
    # Direct image similarity search (no prefetch needed)  
    results = client.query_points(  
        "shein_products",  
        query=image_vectors.tolist(),  
        using="clip",  
        with_payload=True,  
        limit=limit,  
    )
Score: 0.8669 | Product: 1PC Plus Size Sexy Lingerie Body Stocking Hollow Out See Through Cover Bodystocking Without Underwear Valentine's Day Women's Swimwear & Clothing Swimsuit  
Score: 0.8095 | Product: 1PC Punk Women's Sexy Underwear Accessories Adjustable Belt Gothic Body Restraint Device Suitable For Halloween Party Costume Matching  
Score: 0.7985 | Product: 1pc Tulle Bow Headpiece Minimalist Wedding Veil Hair Accessory Witch

如果同时涉及文本和图像,你可以以同样的方式处理查询,但在预取步骤中包含图像嵌入。

query="blue shoes",  
query_image_path="/content/data/images/45/image_2.jpg"  
dense_vectors = list(dense_embedding_model.query_embed([query]))[0]  
image_vectors = list(clip_embedding_model.embed([query_image_path]))[0]  
prefetch = [  
    models.Prefetch(  
        query=dense_vectors,  
        using="all-MiniLM-L6-v2",  
        limit=limit * 2,  
    ),  
    models.Prefetch(  
        query=models.Document(  
            text=query,  
            model="Qdrant/minicoil-v1"  
        ),  
        using="minicoil",  
        limit=limit * 2,  
    ),  
    models.Prefetch(  
        query=image_vectors.tolist(),  
        using="clip",  
        limit=limit * 2,  
    ),  
]  

# Use late interaction embeddings for final reranking  
late_vectors = list(late_interaction_embedding_model.query_embed([query]))[0]  

results = client.query_points(  
    "shein_products",  
    prefetch=prefetch,  
    query=late_vectors,  
    using="colbertv2.0",  
    with_payload=True,  
    limit=limit,  
)
Score: 23.4887 | Product: Asics Women's Gel Kayano 30 Running Shoes In Blue Denim  
Score: 16.3749 | Product: Unbeatablesale Jacks 2128F-WH-L Ribbed Bell Boots With Fleece - White, Large  
Score: 14.3357 | Product: Nike Jordan 13 Retro Low Bred GS 310811 027

正如你所看到的,进行重排序已经消除了不相关的结果:例如,从“Running Shoes”的查询中移除了“Platform Sandals”。

4.8 带过滤器的查询

如前所述,我们可以使用查询来创建某些属性的过滤器,这些属性在嵌入中没有很好地表示。在Qdrant中,这是通过应用过滤器来完成的,其中在有效载荷和点的ID上设置了额外的条件。

例如,如果我们需要过滤掉价格低于500美元的红色连衣裙,我们可以这样设置过滤器:

filter = Filter(  
    must=[  
        FieldCondition(  
            key="category",  
            match=MatchValue(value="dress")  
        ),  
        FieldCondition(  
            key="color",  
            match=MatchValue(value="red")  
        ),  
        FieldCondition(  
            key="price",  
            range=Range(lt=500)  
        )  
    ]  
)

如你所见,我们使用了以下条款:

  • must: 等同于AND操作符,其中必须满足其中的所有条件。
  • match: 与查询中的值精确匹配。
  • lt: 表示“小于”。

你可以了解更多关于过滤条款及其组合的信息 这里

你可能会遇到的一个实际问题是静态过滤器并不适用于所有查询。相反,我们需要一种根据查询生成动态过滤器的方法。为此,我们可以使用LLM为给定查询生成适当的过滤器,然后将其传递给查询函数。

from qdrant_client.models import Filter, FieldCondition, MatchValue, Range
from openai import OpenAI

openai_client = OpenAI(api_key=userdata.get('OPENAI_APIKEY'))

user_query = "Show me SHEIN women's top handle bags in white under 15 USD"

def get_llm_filters(natural_language_query):
    system_prompt = "You are an assistant that extracts product filters from search queries. Output as JSON."
    user_prompt = f"""
Query: "{natural_language_query}"

Extract structured filters from this product search query. Return the filters in a JSON format.
Use only these allowed fields (all lowercase and exact spelling):
- product_name (string)
- final_price (numeric: supports lt, lte, gt, gte)
- currency (string)
- rating (numeric: supports lt, lte, gt, gte)
- category (string)
- brand (string)

Return a JSON object where:
- string fields are matched exactly (e.g., "category": "tops")
- numeric fields are expressed using comparison operators (e.g., "final_price": {{"lt": 500}})
- only include filters explicitly mentioned or implied in the query, you dont need to include all fields, 2 or 3 is fine

Example:
Input: "show me products from zara under 100 dollars with rating above 4"
Output:
{{
  "brand": "zara",
  "final_price": {{"lt": 100}},
  "rating": {{"gt": 4}}
}}
"""
    response = openai_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt}
        ],
        temperature=0
    )

    import json
    try:
        parsed = json.loads(response.choices[0].message.content)
        return parsed
    except Exception as e:
        print("Failed to parse filter JSON:", e)
        return {}


def convert_llm_filters_to_qdrant(llm_filters: dict) -> Filter:
    conditions = []
    for key, value in llm_filters.items():
        if isinstance(value, str):
            conditions.append(FieldCondition(
                key=key,
                match=MatchValue(value=value)
            ))
        elif isinstance(value, dict):
            range_args = {}
            if 'lt' in value:
                range_args['lt'] = value['lt']
            if 'gt' in value:
                range_args['gt'] = value['gt']
            if 'lte' in value:
                range_args['lte'] = value['lte']
            if 'gte' in value:
                range_args['gte'] = value['gte']

            conditions.append(FieldCondition(
                key=key,
                range=Range(**range_args)
            ))
    return Filter(must=conditions)

llm_filters = get_llm_filters(user_query)
qdrant_filter = convert_llm_filters_to_qdrant(llm_filters)
dense_vectors = list(dense_embedding_model.query_embed([user_query]))[0]

results = client.query_points(
    collection_name="shein_products",
    query=dense_vectors,
    limit=5,
    query_filter=qdrant_filter,
    with_payload=True,
    using="all-MiniLM-L6-v2"
)
得分:2.6087 | 产品:休闲纯色肩包 新款手提包 时尚简约方形包 - SHEIN 女士手提包。

你也可以让 LLM 直接构建这个过滤器代码,如果可能的话。根据你的设置选择最合适的方法即可。将所有内容整合成一个精美的用户界面,你就拥有一个可以部署的生产级搜索引擎了。

5、补充:使用微调的NER模型进行动态查询过滤

构建动态查询过滤的另一种方法是使用命名实体识别(NER)模型。通过将用户查询传递给微调的NER模型,你可以提取相关属性——如颜色、类别或价格——并自动生成过滤条件。

在这个例子中,我们将使用 spaCy 库中的NER模型。首先创建一个带有示例训练数据的自定义数据加载器类。

import random
import json
from typing import List, Dict, Tuple
import pandas as pd

class SHEINNERDataGenerator:
    """Generate fine-tuning data for NER model based on SHEIN payload fields"""
    def __init__(self):
        # Sample data using only relevant payload fields
        self.sample_products = [
            {
                "product_name": "Women's Casual Floral Print Midi Dress",
                "final_price": 25.99,
                "currency": "USD",
                "rating": 4.3,
                "category": "Dresses",
                "brand": "SHEIN"
            },
            {
                "product_name": "Men's Basic Cotton T-Shirt Black",
                "final_price": 8.50,
                "currency": "USD",
                "rating": 4.1,
                "category": "Tops",
                "brand": "SHEIN"
            },
            {
                "product_name": "Plus Size High Waist Skinny Jeans Blue",
                "final_price": 32.00,
                "currency": "USD",
                "rating": 4.5,
                "category": "Bottoms",
                "brand": "SHEIN"
            },
            {
                "product_name": "Women's Chunky Knit Oversized Sweater Pink",
                "final_price": 29.99,
                "currency": "USD",
                "rating": 4.2,
                "category": "Sweaters",
                "brand": "SHEIN"
            },
            {
                "product_name": "Summer Beach Sandals White Platform",
                "final_price": 18.75,
                "currency": "USD",
                "rating": 3.9,
                "category": "Shoes",
                "brand": "SHEIN"
            }
        ]

        self.colors = [
            "black", "white", "red", "blue", "green", "yellow", "pink", "purple",
            "orange", "brown", "gray", "grey", "navy", "beige", "khaki", "maroon",
            "coral", "mint", "turquoise", "burgundy", "olive", "cream", "ivory",
            "gold", "silver", "rose gold", "neon", "pastel", "dark blue", "light blue",
            "dark green", "light green", "bright red", "deep red", "hot pink", "lime",
            "lavender", "peach", "teal", "magenta", "charcoal", "nude", "camel", "tan"
        ]

        self.categories = [
            "dress", "dresses", "midi dress", "maxi dress", "mini dress", "bodycon dress",
            "shirt", "top", "blouse", "t-shirt", "tank top", "crop top", "tube top", "camisole",
            "pants", "jeans", "trousers", "leggings", "joggers", "sweatpants", "cargo pants",
            "skirt", "mini skirt", "midi skirt", "maxi skirt", "pleated skirt", "pencil skirt",
            "jacket", "blazer", "cardigan", "coat", "hoodie", "sweater", "pullover", "vest",
            "shoes", "boots", "sneakers", "heels", "flats", "sandals", "pumps", "loafers",
            "bag", "purse", "backpack", "tote", "clutch", "crossbody", "handbag",
            "accessories", "jewelry", "necklace", "earrings", "bracelet", "ring", "watch",
            "swimwear", "bikini", "swimsuit", "beachwear", "lingerie", "bra", "underwear"
        ]

        self.price_phrases = [
            "under", "below", "less than", "maximum", "max", "up to", "within",
            "over", "above", "more than", "minimum", "min", "starting from", "at least",
            "between", "from", "to", "around", "approximately", "roughly"
        ]

        self.rating_phrases = [
            "highly rated", "top rated", "best rated", "good reviews", "well reviewed",
            "above", "over", "more than", "at least", "minimum", "4+ stars", "5 star"
        ]

        self.sizes = ["XS", "S", "M", "L", "XL", "XXL", "plus size", "petite", "tall"]

        self.styles = [
            "casual", "formal", "business", "party", "evening", "summer", "winter",
            "vintage", "boho", "gothic", "street", "preppy", "minimalist", "trendy",
            "oversized", "fitted", "loose", "tight", "flowy", "structured"
        ]


    def generate_training_queries(self, num_samples: int = 100) -> List[Tuple[str, Dict]]:
        training_data = []
        templates = [
            "{color} {category} under ${price}",
            "{brand} {category} with {rating_phrase}",
            "{style} {color} {category} from {brand}",
            "Looking for {style} {category} in {color} with rating above {rating}",
            "{brand} {category} between ${price_min} and ${price_max}",
            "Top rated {color} {category} from {brand}",
            "{category} with rating {rating}+ stars",
            "Cheap {color} {category} under ${price}"
        ]
        for _ in range(num_samples):
            vars_used = {
                "color": random.choice(self.colors),
                "category": random.choice(self.categories),
                "brand": "SHEIN",
                "style": random.choice(self.styles),
                "price": round(random.uniform(10, 100), 2),
                "price_min": round(random.uniform(10, 40), 2),
                "price_max": round(random.uniform(41, 100), 2),
                "rating": round(random.uniform(3.0, 5.0), 1),
                "rating_phrase": random.choice(self.rating_phrases),
            }
            template = random.choice(templates)
            try:
                query = template.format(**vars_used)
                entities = self._extract_entities_from_query(query, vars_used)
                training_data.append((query, {"entities": entities}))
            except KeyError:
                continue

        return training_data


    def generate_realistic_queries_from_products(self, products_sample: List[Dict] = None) -> List[Tuple[str, Dict]]:
        if products_sample is None:
            products_sample = self.sample_products
        training_data = []
        for row in products_sample:
            product = {
                "product_name": str(row.get('product_name', '')),
                "final_price": float(row.get('final_price', 0)) if pd.notna(row.get('final_price')) else 0.0,
                "currency": str(row.get('currency', ''))[:10],
                "rating": float(row.get('rating', 0)) if pd.notna(row.get('rating')) else 0.0,
                "category": str(row.get('category', ''))[:100].lower(),
                "brand": str(row.get('brand', ''))[:100]
            }
            queries = [
                f"Looking for {product['category']} under ${product['final_price'] + 10}",
                f"{product['category']} with rating above {product['rating'] - 0.5}",
                f"{product['brand']} {product['category']} with good reviews"
            ]
            for color in self.colors:
                if color in product["product_name"].lower():
                    queries.append(f"{color} {product['category']} under ${product['final_price'] + 5}")
            for query in queries:
                entities = self._extract_entities_from_query(query, {})
                if entities:
                    training_data.append((query, {"entities": entities}))

        return training_data


    def _extract_entities_from_query(self, query: str, vars_used: Dict) -> List[Tuple[int, int, str]]:
        query_lower = query.lower()
        entities = []
        for color in self.colors:
            if color in query_lower:
                idx = query_lower.find(color)
                entities.append((idx, idx + len(color), "COLOR"))
        for category in self.categories:
            if category in query_lower:
                idx = query_lower.find(category)
                entities.append((idx, idx + len(category), "PRODUCT_CATEGORY"))

        if "shein" in query_lower:
            idx = query_lower.find("shein")
            entities.append((idx, idx + 5, "BRAND"))

        for style in self.styles:
            if style in query_lower:
                idx = query_lower.find(style)
                entities.append((idx, idx + len(style), "STYLE"))

        return self._remove_overlapping_entities(entities)


    def _remove_overlapping_entities(self, entities: List[Tuple[int, int, str]]) -> List[Tuple[int, int, str]]:
        entities.sort(key=lambda x: x[0])
        non_overlap = [entities[0]] if entities else []
        for entity in entities[1:]:
            if entity[0] >= non_overlap[-1][1]:
                non_overlap.append(entity)
        return non_overlap


    def save_training_data(self, training_data: List[Tuple[str, Dict]], filename: str):
        with open(filename, 'w') as f:
            json.dump(training_data, f, indent=2)
        print(f"Saved to {filename}")


    def print_sample_data(self, training_data: List[Tuple[str, Dict]], count: int = 10):
        for i, (text, annotations) in enumerate(training_data[:count]):
            print(f"\nExample {i+1}: {text}")
            for start, end, label in annotations["entities"]:
                print(f"  {label}: '{text[start:end]}'")

这是NER模型如何识别查询中的实体的一个窥视。

(
    "Show me red dresses from SHEIN under $500",
    {
        "entities": [
            (8, 11, "COLOR"),             # "red"
            (12, 19, "PRODUCT_CATEGORY"), # "dresses"
            (25, 30, "BRAND"),            # "SHEIN"
            (37, 41, "PRICE")             # "500"
        ]
    }
)

每个训练样本是一个包含查询字符串和具有spaCy NER格式实体注释的字典的元组。使用此脚本用提供的数据训练模型。

import spacy
from spacy.training import Example
from spacy.util import minibatch, compounding
import json
import random
from pathlib import Path
from typing import List, Dict, Tuple, Optional

class EnhancedNERTrainer:
    """NER trainer for SHEIN product queries using SHEINNERDataGenerator"""

    def __init__(self, model_name: str = "en_core_web_sm"):
        try:
            self.nlp = spacy.load(model_name)
        except OSError:
            print(f"Model {model_name} not found. Installing...")
            spacy.cli.download(model_name)
            self.nlp = spacy.load(model_name)

        self.training_nlp = spacy.blank("en")

        if "ner" not in self.training_nlp.pipe_names:
            self.ner = self.training_nlp.add_pipe("ner")
        else:
            self.ner = self.training_nlp.get_pipe("ner")

        self.entity_labels = [
            "COLOR",
            "PRODUCT_CATEGORY",
            "BRAND",
            "SIZE",
            "STYLE",
            "PRICE",
            "RATING",
        ]

        for label in self.entity_labels:
            self.ner.add_label(label)


    def load_training_data_from_generator(self, generator: 'SHEINNERDataGenerator', num_synthetic: int = 300) -> List[Tuple[str, Dict]]:
        synthetic_data = generator.generate_training_queries(num_synthetic)
        realistic_data = generator.generate_realistic_queries_from_products()
        all_data = list({q[0]: q for q in (synthetic_data + realistic_data)}.values())
        print(f"Generated {len(all_data)} unique training examples from generator")
        return all_data


    def train_model(self, training_data: List[Tuple[str, Dict]],
                    model_output_path: str = "custom_shein_ner",
                    iterations: int = 50,
                    dropout: float = 0.35) -> spacy.Language:

        print(f"Training NER model with {len(training_data)} examples...")
        examples = [Example.from_dict(self.training_nlp.make_doc(text), annotations) for text, annotations in training_data]
        self.training_nlp.initialize(lambda: examples)

        for iteration in range(iterations):
            random.shuffle(examples)
            losses = {}
            batches = minibatch(examples, size=compounding(4.0, 32.0, 1.001))
            for batch in batches:
                self.training_nlp.update(batch, losses=losses, drop=dropout)
            if iteration % 10 == 0:
                print(f"Iteration {iteration:3d} - Loss: {losses.get('ner', 0.0):.4f}")

        Path(model_output_path).mkdir(exist_ok=True)
        self.training_nlp.to_disk(model_output_path)
        print(f"Model saved to {model_output_path}")
        return self.training_nlp


    def evaluate_model(self, test_data: List[Tuple[str, Dict]], model_path: str = None) -> Dict:
        nlp = spacy.load(model_path) if model_path else self.training_nlp
        correct, total = 0, 0
        precision_scores, recall_scores = [], []
        for text, annotations in test_data:
            doc = nlp(text)
            predicted = {(ent.start_char, ent.end_char, ent.label_) for ent in doc.ents}
            true = set(annotations["entities"])
            if predicted:
                precision_scores.append(len(predicted & true) / len(predicted))
            if true:
                recall_scores.append(len(predicted & true) / len(true))
            if predicted == true:
                correct += 1
            total += 1
        accuracy = correct / total if total else 0
        precision = sum(precision_scores) / len(precision_scores) if precision_scores else 0
        recall = sum(recall_scores) / len(recall_scores) if recall_scores else 0
        f1 = 2 * precision * recall / (precision + recall) if precision + recall > 0 else 0
        print("Evaluation Results:")
        print(f"Accuracy: {accuracy:.4f}\nPrecision: {precision:.4f}\nRecall: {recall:.4f}\nF1 Score: {f1:.4f}")
        return {
            "accuracy": accuracy,
            "precision": precision,
            "recall": recall,
            "f1_score": f1,
            "total_examples": total
        }


    def test_model_on_queries(self, model_path: str, test_queries: List[str]):
        nlp = spacy.load(model_path)
        print("\nTesting model on sample queries:\n" + "=" * 80)
        for i, query in enumerate(test_queries, 1):
            doc = nlp(query)
            print(f"\nQuery {i}: {query}")
            if doc.ents:
                for ent in doc.ents:
                    print(f"  Entity: '{ent.text}' -> {ent.label_}")
            else:
                print("  No entities found")


def create_comprehensive_training_pipeline():
    trainer = EnhancedNERTrainer()
    generator = SHEINNERDataGenerator()
    training_data = trainer.load_training_data_from_generator(generator, num_synthetic=400)
    random.shuffle(training_data)
    split = int(0.8 * len(training_data))
    train_data, test_data = training_data[:split], training_data[split:]
    print(f"Train: {len(train_data)} | Test: {len(test_data)}")
    trainer.train_model(train_data, iterations=50)
    metrics = trainer.evaluate_model(test_data)
    queries = [
        "red summer dress under $40",
        "SHEIN black skinny jeans with 4+ stars",
        "casual white cotton t-shirt from SHEIN",
        "looking for plus-size formal dress below $60"
    ]
    trainer.test_model_on_queries("custom_shein_ner", queries)
    return trainer, metrics

你现在可以在你的管道中部署这个训练好的模型来提取实体,构建过滤器,并在查询时应用它们。

from qdrant_client import models
import spacy
from typing import Dict, Optional
ner_model = spacy.load("custom_shein_ner")

# Helper to convert extracted entity list into a dict
def extract_ner_payload(query: str) -> Dict[str, str]:
    doc = ner_model(query)
    print("OUTPUT: ", doc)
    payload = {
        "product_name": None,
        "final_price": None,
        "currency": None,
        "rating": None,
        "category": None,
        "brand": None,
    }

    for ent in doc.ents:
        if ent.label_ == "PRICE":
            payload["final_price"] = ent.text.replace("$", "").strip()
        elif ent.label_ == "RATING":
            payload["rating"] = ent.text.replace("stars", "").strip()
        elif ent.label_ == "PRODUCT_CATEGORY":
            payload["category"] = ent.text.strip().lower()
        elif ent.label_ == "BRAND":
            payload["brand"] = ent.text.strip()
        elif ent.label_ == "STYLE":
            payload["product_name"] = ent.text.strip()
        elif ent.label_ == "CURRENCY":
            payload["currency"] = ent.text.strip().upper()

    return {k: v for k, v in payload.items() if v is not None}


# Convert NER payload to Qdrant filter
def ner_payload_to_qdrant_filter(payload: Dict[str, str]) -> Optional[models.Filter]:
    conditions = []
    if "final_price" in payload:
        try:
            price_value = float(payload["final_price"])
            conditions.append(models.FieldCondition(
                key="final_price",
                range=models.Range(lte=price_value)
            ))
        except ValueError:
            pass
    if "rating" in payload:
        try:
            rating_value = float(payload["rating"])
            conditions.append(models.FieldCondition(
                key="rating",
                range=models.Range(gte=rating_value)
            ))
        except ValueError:
            pass
    if "category" in payload:
        conditions.append(models.FieldCondition(
            key="category",
            match=models.MatchValue(value=payload["category"])
        ))
    if "brand" in payload:
        conditions.append(models.FieldCondition(
            key="brand",
            match=models.MatchValue(value=payload["brand"])
        ))
    if "currency" in payload:
        conditions.append(models.FieldCondition(
            key="currency",
            match=models.MatchValue(value=payload["currency"])
        ))

    return models.Filter(must=conditions) if conditions else None

# Example Usage
query = "Red Zara dress under $500 with 4.5 stars"
limit = 10

# 1. Extract dynamic filters
ner_payload = extract_ner_payload(query)
qdrant_filter = ner_payload_to_qdrant_filter(ner_payload)
print("FILTER: ", qdrant_filter)

dense_vectors = list(dense_embedding_model.query_embed([query]))[0]
prefetch = [
    models.Prefetch(
        query=dense_vectors,
        using="all-MiniLM-L6-v2",
        limit=limit * 2,
    ),
    models.Prefetch(
        query=models.Document(
            text=query,
            model="Qdrant/minicoil-v1"
        ),
        using="minicoil",
        limit=limit * 2,
    ),
]

results = client.query_points(
    collection_name="shein_products",
    query=dense_vectors,
    with_payload=True,
    limit=limit,
    query_filter=qdrant_filter,
    using="all-MiniLM-L6-v2",
)

5、结束语

在本文中,你了解了多模态搜索引擎背后的工作原理,以及你最喜欢的购物应用程序如何通过智能搜索和排名系统微妙地引导你的选择。你还探索了如何使用现代向量数据库如Qdrant来构建一个。

完整的代码可在 GitHub 上找到。

多模态搜索弥合了用户思维与数据库响应之间的差距,结合了文本、图像和元数据搜索的优势。


原文链接:Designing Multimodal AI Search Engines for Smarter Online Retail

汇智网翻译整理,转载请标明出处