Pipelines 深入 (Pipelines Deep Dive)

重點總覽

cuda::pipeline 是一個用來「分階段(staging)暫存工作」並協調 多緩衝區 producer–consumer 模式的機制,最常見的用途是把 計算非同步資料拷貝 重疊(overlap)。它建構於 Advanced Synchronization Primitives(非同步 barrier)之上,但提供更高階、更簡潔的 API。

項目 重點
用途 分階段暫存工作、協調 multi-buffer producer–consumer、重疊 compute 與 async copy
Scope threadblock/更大範圍;非 thread scope 需 pipeline_shared_state<scope, count>
count(stages) shared state 中可同時處理的並行 stage 數(緩衝深度)
Unified vs Partitioned unified:每個 thread 同時是 producer 與 consumer;partitioned:固定角色不可變
Submit(生產) producer_acquire() → 提交 memcpy_asyncproducer_commit()
Consume(消費) consumer_wait()(等 tail/最舊 stage)→ consumer_release()
資源耗盡 producer_acquire() 會 block 直到 consumer 釋放下一 stage 的資源
Warp Entanglement 同 warp 共用 pipeline,commit 會被合併,diverge 可能造成 over-wait
Early Exit 提前離開的 thread 必須先 pipeline::quit() 退出參與
對比 primitives __pipeline_memcpy_async / __pipeline_commit / __pipeline_wait_prior(N)
兩套 API

本章的 cuda::pipeline<cuda/pipeline>)是 C++ 高階介面;底層另有 C primitives(<cuda_pipeline.h>)如 __pipeline_commit()__pipeline_wait_prior(N)。兩者語意對應,適用情境不同。

Initialization(初始化與 scope)

cuda::pipeline 可建立於不同的 thread scope。對於 cuda::thread_scope_thread 的 scope,需要一個 cuda::pipeline_shared_state<scope, count> 物件來協調參與的 threads;此 state 封裝了讓 pipeline 最多能處理 count 個並行 stage 的有限資源。

// thread scope:thread-local pipeline,不需 shared_state
constexpr auto scope = cuda::thread_scope_thread;
cuda::pipeline<scope> pipeline = cuda::make_pipeline();

// block scope:需要 pipeline_shared_state
constexpr auto scope = cuda::thread_scope_block;
constexpr auto stages_count = 2;
__shared__ cuda::pipeline_shared_state<scope, stages_count> shared_state;
auto pipeline = cuda::make_pipeline(group, &shared_state);

thread scope 用 make_pipeline() 即可;block(或更大)scope 需傳入 group 與 __shared__pipeline_shared_state

Unified vs Partitioned

// 在 block scope 建立 partitioned pipeline,只有 thread 0 是 producer
constexpr auto scope = cuda::thread_scope_block;
constexpr auto stages_count = 2;
__shared__ cuda::pipeline_shared_state<scope, stages_count> shared_state;
auto thread_role = (group.thread_rank() == 0)
                   ? cuda::pipeline_role::producer
                   : cuda::pipeline_role::consumer;
auto pipeline = cuda::make_pipeline(group, &shared_state, thread_role);

第三個參數可給 producer 數量或 pipeline_role,藉此切換到 partitioned 模式。

partitioned 的額外成本

為了支援 partition,共享的 cuda::pipeline 會產生額外開銷,包括 每個 stage 一組 shared memory barriers 來做同步。即使 pipeline 是 unified(其實可改用 __syncthreads())也會付出這些成本。因此 能用 thread-local pipeline 時就優先用它,以避開這些開銷。

Submitting Work(提交工作 / 生產端)

把工作 commit 到一個 pipeline stage 包含三步:

  1. 由一組 producer threads 集體pipeline.producer_acquire() 取得 pipeline head。
  2. 對該 head 提交非同步操作(例如 memcpy_async)。
  3. 集體用 pipeline.producer_commit() commit(推進) pipeline head。
acquire 會 block

若所有資源都在使用中,producer_acquire()block 住 producer threads,直到 consumer threads 釋放下一個 pipeline stage 的資源為止。這正是 pipeline 形成「背壓(back-pressure)」與緩衝深度限制(count)的關鍵。

Consuming Work(消費工作 / 消費端)

從先前 commit 的 stage 消費工作包含兩步:

  1. 由一組 consumer threads 集體 等待該 stage 完成,例如用 pipeline.consumer_wait() 等待 tail(最舊) stage。
  2. 集體用 pipeline.consumer_release() 釋放 該 stage。

cuda::pipeline<cuda::thread_scope_thread>,還可用 friend function cuda::pipeline_consumer_wait_prior<N>() 等待「除了最後 N 個 stage 之外」的全部 stage 完成,語意對應 primitives API 的 __pipeline_wait_prior(N)

Pipeline(環狀緩衝,count = 3 為例)
 producer 端                          consumer 端
 acquire → memcpy_async → commit      wait(tail) → release
      │                                    ▲
      ▼                                    │
   head ──► [stage0][stage1][stage2] ──► tail
            (最新)              (最舊)
   資源用盡時 acquire 阻塞,等 release 騰出 stage

Warp Entanglement(warp 糾纏)

pipeline 機制在 同一 warp 的 CUDA threads 之間共用。這種共用會讓 warp 內已提交的操作序列「糾纏」在一起,在某些情況下會影響效能。

Commit 的合併行為:commit 操作會被 coalesce,使得 pipeline 的序列對「所有 converged 且呼叫 commit 的 thread」只遞增一次,且它們提交的操作會被 batch 在一起

定義兩個序列:

producer_commit() 的回傳值來自 thread 感知 的 batch 序列。thread 感知序列的某個 index 一定對齊到「實際序列中相等或更大的 index」:BTn ≡ BPm,其中 n <= m;只有當所有 commit 都來自完全 converged 的 thread 時兩者才相等。

完全 diverged 的 warp:
  實際序列  PB = {0,1,2,3, ... ,31}  (PL = 31)
  感知序列  Thread0: TB={0} (TL=0)
            Thread1: TB={0} (TL=0)
              ...
            Thread31:TB={0} (TL=0)
  → 每個 thread 都以為只有 1 個 batch,實際卻有 32 個

Wait 的 over-wait 問題:thread 呼叫 consumer_wait()wait_prior<N>() 是要等 感知序列 TB 中的 batch 完成;而 consumer_wait() 等價於 wait_prior<N>()N = PL。wait_prior 變體實際上會等「實際序列 中至少到 PL-N 為止(含)」的 batch。由於 TL <= PL,等到 PL-N 也涵蓋了等待 TL-N;因此 TL < PL 時,thread 會非預期地等到更多、更新的 batch。在上面完全 diverged 的極端例子中,每個 thread 可能要等全部 32 個 batch。

避免 over-wait:先 re-converge

建議讓 commit 由 converged threads 發出,使各 thread 的感知序列與實際序列對齊,避免 over-wait。若 commit 前的程式碼讓 threads diverge,應在呼叫 commit 操作前用 __syncwarp 重新收斂 warp。

Early Exit(提前離開)

當一個參與 pipeline 的 thread 必須提前離開時,該 thread 必須 在離開前用 cuda::pipeline::quit() 顯式退出參與。其餘仍在參與的 threads 即可正常進行後續操作。

Warning

若不 quit() 就離開,剩餘 threads 在集體操作(acquire/commit/wait/release)上可能永久等待已消失的 thread。early exit 路徑務必補上 pipeline::quit()

Tracking Asynchronous Memory Operations(追蹤非同步記憶體操作)

下例示範如何用 pipeline 集體把資料從 global 拷貝到 shared memory,並用 pipeline 追蹤這些拷貝。每個 thread 用 自己的 pipeline 獨立提交 memory copy,再等待完成並消費資料。

#include <cuda/pipeline>
__global__ void example_kernel(const float *in) {
  constexpr int block_size = 128;
  __shared__ __align__(sizeof(float)) float buffer[4 * block_size];
  // 每個 thread 建立一個 unified、thread-scope 的 pipeline
  cuda::pipeline<cuda::thread_scope_thread> pipeline = cuda::make_pipeline();

  // Stage 1
  pipeline.producer_acquire();
  cuda::memcpy_async(buffer, in, sizeof(float), pipeline);
  pipeline.producer_commit();
  // Stage 2(提交兩筆拷貝)
  pipeline.producer_acquire();
  cuda::memcpy_async(buffer + block_size,     in + block_size,     sizeof(float), pipeline);
  cuda::memcpy_async(buffer + 2 * block_size, in + 2 * block_size, sizeof(float), pipeline);
  pipeline.producer_commit();
  // Stage 3
  pipeline.producer_acquire();
  cuda::memcpy_async(buffer + 3 * block_size, in + 3 * block_size, sizeof(float), pipeline);
  pipeline.producer_commit();

  pipeline.consumer_wait();    // 等最舊 stage(=第一 stage)
  pipeline.consumer_release(); // 用完第一 stage 的資料 ...
  pipeline.consumer_wait();    // 等第二 stage
  pipeline.consumer_release();
  pipeline.consumer_wait();    // 等第三 stage
  pipeline.consumer_release();
}

consumer_wait() 永遠等 最舊(tail) 的 stage;連續呼叫即依序消費各 stage。每段資料消費完通常會配合 __syncthreads()(若資料要跨 thread 共享)。

對應的 C primitives 版本以 batch 思維表達同一流程:

#include <cuda_pipeline.h>
__pipeline_memcpy_async(buffer, in, sizeof(float));
__pipeline_commit();                                   // batch 0
__pipeline_memcpy_async(buffer + block_size,     in + block_size,     sizeof(float));
__pipeline_memcpy_async(buffer + 2 * block_size, in + 2 * block_size, sizeof(float));
__pipeline_commit();                                   // batch 1
__pipeline_memcpy_async(buffer + 3 * block_size, in + 3 * block_size, sizeof(float));
__pipeline_commit();                                   // batch 2

__pipeline_wait_prior(2);  // 等到只剩最後 2 個 batch 未完成(=等第一 batch)
// 使用第一 batch 資料 ...
__pipeline_wait_prior(1);  // 等到只剩最後 1 個 batch(=等第二 batch)
__pipeline_wait_prior(0);  // 等所有 batch(=等第三 batch)

__pipeline_wait_prior(N) = 等待「除了最後 N 個 batch 之外」全部完成,與 cuda::pipeline_consumer_wait_prior<N>() 對應。更多 async copy 細節見 Section 3.2.5(本章 4.11)。

Producer-Consumer Pattern using Pipelines(用 pipeline 實作 producer-consumer)

在 4.9.7(04-CUDA-Features/11-Asynchronous-Barriers-Deep-Dive)中,producer-consumer 是用「每個 buffer 兩個 async barrier」、把 thread block 做空間切分來實作。改用 cuda::pipeline 可大幅簡化:用單一 partitioned pipeline、每個資料 buffer 對應一個 stage,取代每 buffer 兩個 async barrier。

#include <cuda/pipeline>
#include <cooperative_groups.h>
using pipeline = cuda::pipeline<cuda::thread_scope_block>;

__device__ void produce(pipeline &pipe, int /*num_stages*/, int stage,
                        int num_batches, int batch, float *buffer, int buffer_len,
                        float *in, int N) {
  if (batch < num_batches) {
    pipe.producer_acquire();
    /* 用 async copy 把 in(batch) 拷到 buffer(stage) */
    pipe.producer_commit();
  }
}
__device__ void consume(pipeline &pipe, /*...*/ int stage, int batch,
                        float *buffer, int buffer_len, float *out, int N) {
  pipe.consumer_wait();
  /* 消費 buffer(stage),更新 out(batch) */
  pipe.consumer_release();
}

__global__ void producer_consumer_pattern(float *in, float *out, int N, int buffer_len) {
  auto block = cooperative_groups::this_thread_block();
  __shared__ extern float buffer[];           // 大小 2*buffer_len,交替使用兩個 buffer
  const int num_batches = N / buffer_len;

  // partitioned,2 stages:一半 threads 當 producer,另一半當 consumer
  constexpr auto scope = cuda::thread_scope_block;
  constexpr int num_stages = 2;
  cuda::std::size_t producer_count = block.size() / 2;
  __shared__ cuda::pipeline_shared_state<scope, num_stages> shared_state;
  pipeline pipe = cuda::make_pipeline(block, &shared_state, producer_count);

  // Fill the pipeline:producer 先把前 num_stages 個 batch 灌入(資料 in-flight)
  if (block.thread_rank() < producer_count) {
    for (int s = 0; s < num_stages; ++s)
      produce(pipe, num_stages, s, num_batches, s, buffer, buffer_len, in, N);
  }
  // 接著主迴圈:依角色 produce 未來 batch 或 consume 當前 batch ...
}

要點:

1 個 partitioned pipeline(num_stages = 2,雙緩衝)
 producer 半邊                         consumer 半邊
 acquire→async copy(in→buf)→commit     wait→consume(buf→out)→release
        │  stage 0 / stage 1               ▲
        └──────►  back-pressure  ◄─────────┘
 取代「每 buffer 兩個 async barrier」的舊寫法(見 4.9.7)

考試/測驗重點

主題 必記重點 何時需要 shared_state 非 thread_scope_thread(如 block scope)才需要 pipeline_shared_state<scope, count> count 意義 shared state 能並行處理的 stage 數(緩衝深度);非「總 batch 數」 unified vs partitioned unified:thread 同時 producer+consumer;partitioned:固定角色、不可變、thread-local 不能 partition partitioned 開銷 每 stage 一組 shared memory barriers;unified 也會付(其實可改用 __syncthreads);優先 thread-local submit 三部曲 producer_acquire → memcpy_async → producer_commit consume 兩部曲 consumer_wait(等 tail / 最舊 stage)→ consumer_release acquire 阻塞條件 資源用盡時 producer_acquire 會 block,直到 consumer_release 釋放下一 stage wait_prior 語意 wait_prior:等到「除最後 N 個 stage 外」全部完成;consumer_wait == wait_prior<N=PL> primitives 對應 **pipeline_memcpy_async /**pipeline_commit / __pipeline_wait_prior(N) commit 合併 同 warp converged:序列 +1 同 stage;fully diverged:序列 +32 散到不同 stage 感知 vs 實際序列 producer_commit 回傳「感知序列」TB;BTn ≡ BPm 且 n <= m;僅全 converged 時相等 over-wait 成因 TL < PL 時 wait 會多等更新的 batch;極端 diverge 每 thread 等 32 batch 避免 over-wait commit 由 converged threads 發出;diverge 後先 __syncwarp 再 commit early exit 提前離開的 thread 必須先 cuda::pipeline::quit(),其餘 threads 才能正常續行 取代 async barrier producer-consumer:1 個 partitioned pipeline / 每 buffer 1 stage,取代每 buffer 2 個 async barrier