Skip to content

流水线 API

基础路径:/api/v1/pipelines

简介

流水线 API 用于编排 LangGraph 工作流:关键词检索(search → dedup → crawl → OCR → index)和 PDF 上传(extract → dedup → OCR → index)。流水线异步执行,支持 HITL(人机协同)中断以处理去重冲突。使用 thread_id 轮询状态、在 HITL 后恢复或取消流水线。

端点概览

方法路径说明
POST/pipelines/search运行检索流水线(支持 HITL)
POST/pipelines/upload运行上传流水线
GET/pipelines/{thread_id}/status获取流水线状态
POST/pipelines/{thread_id}/resume恢复 HITL 中断的流水线
POST/pipelines/{thread_id}/cancel取消流水线

POST /api/v1/pipelines/search

说明: 启动关键词检索流水线:search → dedup → crawl → OCR → index。发现去重冲突时可能中断以等待 HITL 处理。

请求体: SearchPipelineRequest

字段类型必填说明
project_idint项目 ID
querystring检索词(默认:""
sourcesstring[]检索源(如 ["semantic_scholar", "openalex"]
max_resultsint最大结果数(1–200,默认:50)

响应: ApiResponse[dict]

字段类型说明
thread_idstring流水线线程 ID(如 search_a1b2c3d4e5f6
statusstringrunning
project_idint项目 ID

检索流水线示例

bash
curl -X POST "http://localhost:8000/api/v1/pipelines/search" \
  -H "Content-Type: application/json" \
  -d '{
    "project_id": 1,
    "query": "transformer attention",
    "sources": ["semantic_scholar"],
    "max_results": 30
  }'
json
{
  "code": 200,
  "message": "success",
  "data": {
    "thread_id": "search_a1b2c3d4e5f6",
    "status": "running",
    "project_id": 1
  }
}

POST /api/v1/pipelines/upload

说明: 启动 PDF 上传流水线:提取元数据 → dedup → OCR → index。接受允许目录内的本地文件路径。

请求体: UploadPipelineRequest

字段类型必填说明
project_idint项目 ID
pdf_pathsstring[]PDF 文件的绝对路径(需在配置的 pdf_dir 下)

响应: ApiResponse[dict]

字段类型说明
thread_idstring流水线线程 ID(如 upload_x1y2z3a4b5c6
statusstringrunning
project_idint项目 ID

上传流水线示例

bash
curl -X POST "http://localhost:8000/api/v1/pipelines/upload" \
  -H "Content-Type: application/json" \
  -d '{
    "project_id": 1,
    "pdf_paths": [
      "/data0/djx/omelette/pdfs/paper1.pdf",
      "/data0/djx/omelette/pdfs/paper2.pdf"
    ]
  }'
json
{
  "code": 200,
  "message": "success",
  "data": {
    "thread_id": "upload_x1y2z3a4b5c6",
    "status": "running",
    "project_id": 1
  }
}

GET /api/v1/pipelines/{thread_id}/status

说明: 获取流水线执行状态。当 statusinterrupted 时,包含 conflicts 用于 HITL 处理。

路径参数

参数类型说明
thread_idstring流水线线程 ID

响应: ApiResponse[dict]

字段类型说明
thread_idstring线程 ID
statusstringrunninginterruptedcompletedfailedcancelled
stagestring当前阶段(若可用)
progressint进度 0–100
conflictsobject[]去重冲突(interrupted 时)
interrupted_atstring[]中断节点 ID(interrupted 时)
resultobject最终结果(completed 时)
errorstring错误信息(failed 时)

状态查询示例

bash
curl -X GET "http://localhost:8000/api/v1/pipelines/search_a1b2c3d4e5f6/status"

运行中:

json
{
  "code": 200,
  "message": "success",
  "data": {
    "thread_id": "search_a1b2c3d4e5f6",
    "status": "running"
  }
}

HITL 中断:

json
{
  "code": 200,
  "message": "success",
  "data": {
    "thread_id": "search_a1b2c3d4e5f6",
    "status": "interrupted",
    "conflicts": [
      {
        "existing": {"id": 1, "title": "Paper A", "doi": "10.1234/abc"},
        "new": {"title": "Paper A (preprint)", "doi": "10.1234/abc"}
      }
    ],
    "stage": "dedup",
    "progress": 45,
    "interrupted_at": ["dedup_resolve"]
  }
}

已完成:

json
{
  "code": 200,
  "message": "success",
  "data": {
    "thread_id": "search_a1b2c3d4e5f6",
    "status": "completed",
    "stage": "completed",
    "progress": 100,
    "result": {"papers_imported": 12}
  }
}

POST /api/v1/pipelines/{thread_id}/resume

说明: 使用已解决的冲突恢复 HITL 中断的流水线。仅在 statusinterrupted 时有效。

路径参数

参数类型说明
thread_idstring流水线线程 ID

请求体: ResumeRequest

字段类型必填说明
resolved_conflictsobject[]已解决的冲突决策(默认:[]

响应: ApiResponse[dict]

字段类型说明
thread_idstring线程 ID
statusstringrunning

恢复流水线示例

bash
curl -X POST "http://localhost:8000/api/v1/pipelines/search_a1b2c3d4e5f6/resume" \
  -H "Content-Type: application/json" \
  -d '{
    "resolved_conflicts": [
      {"conflict_id": 0, "action": "keep_existing"},
      {"conflict_id": 1, "action": "import_new"}
    ]
  }'
json
{
  "code": 200,
  "message": "success",
  "data": {
    "thread_id": "search_a1b2c3d4e5f6",
    "status": "running"
  }
}

POST /api/v1/pipelines/{thread_id}/cancel

说明: 取消运行中或已中断的流水线。

路径参数

参数类型说明
thread_idstring流水线线程 ID

响应: ApiResponse[dict]

字段类型说明
thread_idstring线程 ID
statusstringcancelled

取消流水线示例

bash
curl -X POST "http://localhost:8000/api/v1/pipelines/search_a1b2c3d4e5f6/cancel"
json
{
  "code": 200,
  "message": "success",
  "data": {
    "thread_id": "search_a1b2c3d4e5f6",
    "status": "cancelled"
  }
}

错误码

错误码说明
200成功
400请求错误(如路径不在允许目录内、流水线未处于中断状态)
404流水线不存在(thread_id 未知或已完成且已清理)

Released under the MIT License.