Select a result to preview
cuda::pipeline 是一個用來「分階段(staging)暫存工作」並協調 多緩衝區 producer–consumer 模式的機制,最常見的用途是把 計算 與 非同步資料拷貝 重疊(overlap)。它建構於 Advanced Synchronization Primitives(非同步 barrier)之上,但提供更高階、更簡潔的 API。
| 項目 | 重點 |
|---|---|
| 用途 | 分階段暫存工作、協調 multi-buffer producer–consumer、重疊 compute 與 async copy |
| Scope | thread/block/更大範圍;非 thread scope 需 pipeline_shared_state<scope, count> |
| count(stages) | shared state 中可同時處理的並行 stage 數(緩衝深度) |
| Unified vs Partitioned | unified:每個 thread 同時是 producer 與 consumer;partitioned:固定角色不可變 |
| Submit(生產) | producer_acquire() → 提交 memcpy_async → producer_commit() |
| Consume(消費) | consumer_wait()(等 tail/最舊 stage)→ consumer_release() |
| 資源耗盡 | producer_acquire() 會 block 直到 consumer 釋放下一 stage 的資源 |
| Warp Entanglement | 同 warp 共用 pipeline,commit 會被合併,diverge 可能造成 over-wait |
| Early Exit | 提前離開的 thread 必須先 pipeline::quit() 退出參與 |
| 對比 primitives | __pipeline_memcpy_async / __pipeline_commit / __pipeline_wait_prior(N) |
本章的 cuda::pipeline(<cuda/pipeline>)是 C++ 高階介面;底層另有 C primitives(<cuda_pipeline.h>)如 __pipeline_commit()、__pipeline_wait_prior(N)。兩者語意對應,適用情境不同。
cuda::pipeline 可建立於不同的 thread scope。對於 非 cuda::thread_scope_thread 的 scope,需要一個 cuda::pipeline_shared_state<scope, count> 物件來協調參與的 threads;此 state 封裝了讓 pipeline 最多能處理 count 個並行 stage 的有限資源。
// thread scope:thread-local pipeline,不需 shared_state
constexpr auto scope = cuda::thread_scope_thread;
cuda::pipeline<scope> pipeline = cuda::make_pipeline();
// block scope:需要 pipeline_shared_state
constexpr auto scope = cuda::thread_scope_block;
constexpr auto stages_count = 2;
__shared__ cuda::pipeline_shared_state<scope, stages_count> shared_state;
auto pipeline = cuda::make_pipeline(group, &shared_state);
thread scope 用 make_pipeline() 即可;block(或更大)scope 需傳入 group 與 __shared__ 的 pipeline_shared_state。
Unified vs Partitioned:
make_pipeline() 提供 producer 數量或該 thread 的角色。// 在 block scope 建立 partitioned pipeline,只有 thread 0 是 producer
constexpr auto scope = cuda::thread_scope_block;
constexpr auto stages_count = 2;
__shared__ cuda::pipeline_shared_state<scope, stages_count> shared_state;
auto thread_role = (group.thread_rank() == 0)
? cuda::pipeline_role::producer
: cuda::pipeline_role::consumer;
auto pipeline = cuda::make_pipeline(group, &shared_state, thread_role);
第三個參數可給 producer 數量或 pipeline_role,藉此切換到 partitioned 模式。
為了支援 partition,共享的 cuda::pipeline 會產生額外開銷,包括 每個 stage 一組 shared memory barriers 來做同步。即使 pipeline 是 unified(其實可改用 __syncthreads())也會付出這些成本。因此 能用 thread-local pipeline 時就優先用它,以避開這些開銷。
把工作 commit 到一個 pipeline stage 包含三步:
pipeline.producer_acquire() 取得 pipeline head。memcpy_async)。pipeline.producer_commit() commit(推進) pipeline head。
若所有資源都在使用中,producer_acquire() 會 block 住 producer threads,直到 consumer threads 釋放下一個 pipeline stage 的資源為止。這正是 pipeline 形成「背壓(back-pressure)」與緩衝深度限制(count)的關鍵。
從先前 commit 的 stage 消費工作包含兩步:
pipeline.consumer_wait() 等待 tail(最舊) stage。pipeline.consumer_release() 釋放 該 stage。對 cuda::pipeline<cuda::thread_scope_thread>,還可用 friend function cuda::pipeline_consumer_wait_prior<N>() 等待「除了最後 N 個 stage 之外」的全部 stage 完成,語意對應 primitives API 的 __pipeline_wait_prior(N)。
Pipeline(環狀緩衝,count = 3 為例)
producer 端 consumer 端
acquire → memcpy_async → commit wait(tail) → release
│ ▲
▼ │
head ──► [stage0][stage1][stage2] ──► tail
(最新) (最舊)
資源用盡時 acquire 阻塞,等 release 騰出 stage
pipeline 機制在 同一 warp 的 CUDA threads 之間共用。這種共用會讓 warp 內已提交的操作序列「糾纏」在一起,在某些情況下會影響效能。
Commit 的合併行為:commit 操作會被 coalesce,使得 pipeline 的序列對「所有 converged 且呼叫 commit 的 thread」只遞增一次,且它們提交的操作會被 batch 在一起。
定義兩個序列:
PB:warp 共享 pipeline 的 實際 操作序列 {BP0, BP1, …, BPL}。TB:某個 thread 感知 的序列(彷彿序列只因「該 thread 自己的 commit」而遞增){BT0, BT1, …, BTL}。producer_commit() 的回傳值來自 thread 感知 的 batch 序列。thread 感知序列的某個 index 一定對齊到「實際序列中相等或更大的 index」:BTn ≡ BPm,其中 n <= m;只有當所有 commit 都來自完全 converged 的 thread 時兩者才相等。
完全 diverged 的 warp:
實際序列 PB = {0,1,2,3, ... ,31} (PL = 31)
感知序列 Thread0: TB={0} (TL=0)
Thread1: TB={0} (TL=0)
...
Thread31:TB={0} (TL=0)
→ 每個 thread 都以為只有 1 個 batch,實際卻有 32 個
Wait 的 over-wait 問題:thread 呼叫 consumer_wait() 或 wait_prior<N>() 是要等 感知序列 TB 中的 batch 完成;而 consumer_wait() 等價於 wait_prior<N>() 且 N = PL。wait_prior 變體實際上會等「實際序列 中至少到 PL-N 為止(含)」的 batch。由於 TL <= PL,等到 PL-N 也涵蓋了等待 TL-N;因此 當 TL < PL 時,thread 會非預期地等到更多、更新的 batch。在上面完全 diverged 的極端例子中,每個 thread 可能要等全部 32 個 batch。
建議讓 commit 由 converged threads 發出,使各 thread 的感知序列與實際序列對齊,避免 over-wait。若 commit 前的程式碼讓 threads diverge,應在呼叫 commit 操作前用 __syncwarp 重新收斂 warp。
當一個參與 pipeline 的 thread 必須提前離開時,該 thread 必須 在離開前用 cuda::pipeline::quit() 顯式退出參與。其餘仍在參與的 threads 即可正常進行後續操作。
若不 quit() 就離開,剩餘 threads 在集體操作(acquire/commit/wait/release)上可能永久等待已消失的 thread。early exit 路徑務必補上 pipeline::quit()。
下例示範如何用 pipeline 集體把資料從 global 拷貝到 shared memory,並用 pipeline 追蹤這些拷貝。每個 thread 用 自己的 pipeline 獨立提交 memory copy,再等待完成並消費資料。
#include <cuda/pipeline>
__global__ void example_kernel(const float *in) {
constexpr int block_size = 128;
__shared__ __align__(sizeof(float)) float buffer[4 * block_size];
// 每個 thread 建立一個 unified、thread-scope 的 pipeline
cuda::pipeline<cuda::thread_scope_thread> pipeline = cuda::make_pipeline();
// Stage 1
pipeline.producer_acquire();
cuda::memcpy_async(buffer, in, sizeof(float), pipeline);
pipeline.producer_commit();
// Stage 2(提交兩筆拷貝)
pipeline.producer_acquire();
cuda::memcpy_async(buffer + block_size, in + block_size, sizeof(float), pipeline);
cuda::memcpy_async(buffer + 2 * block_size, in + 2 * block_size, sizeof(float), pipeline);
pipeline.producer_commit();
// Stage 3
pipeline.producer_acquire();
cuda::memcpy_async(buffer + 3 * block_size, in + 3 * block_size, sizeof(float), pipeline);
pipeline.producer_commit();
pipeline.consumer_wait(); // 等最舊 stage(=第一 stage)
pipeline.consumer_release(); // 用完第一 stage 的資料 ...
pipeline.consumer_wait(); // 等第二 stage
pipeline.consumer_release();
pipeline.consumer_wait(); // 等第三 stage
pipeline.consumer_release();
}
consumer_wait() 永遠等 最舊(tail) 的 stage;連續呼叫即依序消費各 stage。每段資料消費完通常會配合 __syncthreads()(若資料要跨 thread 共享)。
對應的 C primitives 版本以 batch 思維表達同一流程:
#include <cuda_pipeline.h>
__pipeline_memcpy_async(buffer, in, sizeof(float));
__pipeline_commit(); // batch 0
__pipeline_memcpy_async(buffer + block_size, in + block_size, sizeof(float));
__pipeline_memcpy_async(buffer + 2 * block_size, in + 2 * block_size, sizeof(float));
__pipeline_commit(); // batch 1
__pipeline_memcpy_async(buffer + 3 * block_size, in + 3 * block_size, sizeof(float));
__pipeline_commit(); // batch 2
__pipeline_wait_prior(2); // 等到只剩最後 2 個 batch 未完成(=等第一 batch)
// 使用第一 batch 資料 ...
__pipeline_wait_prior(1); // 等到只剩最後 1 個 batch(=等第二 batch)
__pipeline_wait_prior(0); // 等所有 batch(=等第三 batch)
__pipeline_wait_prior(N) = 等待「除了最後 N 個 batch 之外」全部完成,與 cuda::pipeline_consumer_wait_prior<N>() 對應。更多 async copy 細節見 Section 3.2.5(本章 4.11)。
在 4.9.7(04-CUDA-Features/11-Asynchronous-Barriers-Deep-Dive)中,producer-consumer 是用「每個 buffer 兩個 async barrier」、把 thread block 做空間切分來實作。改用 cuda::pipeline 可大幅簡化:用單一 partitioned pipeline、每個資料 buffer 對應一個 stage,取代每 buffer 兩個 async barrier。
#include <cuda/pipeline>
#include <cooperative_groups.h>
using pipeline = cuda::pipeline<cuda::thread_scope_block>;
__device__ void produce(pipeline &pipe, int /*num_stages*/, int stage,
int num_batches, int batch, float *buffer, int buffer_len,
float *in, int N) {
if (batch < num_batches) {
pipe.producer_acquire();
/* 用 async copy 把 in(batch) 拷到 buffer(stage) */
pipe.producer_commit();
}
}
__device__ void consume(pipeline &pipe, /*...*/ int stage, int batch,
float *buffer, int buffer_len, float *out, int N) {
pipe.consumer_wait();
/* 消費 buffer(stage),更新 out(batch) */
pipe.consumer_release();
}
__global__ void producer_consumer_pattern(float *in, float *out, int N, int buffer_len) {
auto block = cooperative_groups::this_thread_block();
__shared__ extern float buffer[]; // 大小 2*buffer_len,交替使用兩個 buffer
const int num_batches = N / buffer_len;
// partitioned,2 stages:一半 threads 當 producer,另一半當 consumer
constexpr auto scope = cuda::thread_scope_block;
constexpr int num_stages = 2;
cuda::std::size_t producer_count = block.size() / 2;
__shared__ cuda::pipeline_shared_state<scope, num_stages> shared_state;
pipeline pipe = cuda::make_pipeline(block, &shared_state, producer_count);
// Fill the pipeline:producer 先把前 num_stages 個 batch 灌入(資料 in-flight)
if (block.thread_rank() < producer_count) {
for (int s = 0; s < num_stages; ++s)
produce(pipe, num_stages, s, num_batches, s, buffer, buffer_len, in, N);
}
// 接著主迴圈:依角色 produce 未來 batch 或 consume 當前 batch ...
}
要點:
cuda::thread_scope_block,因為要區分 producer / consumer。pipeline_shared_state 來協調參與 threads;此處在 block scope 初始化 2-stage state 再 make_pipeline()。producer_count(= block.size() / 2)→ 前半 threads 為 producer,後半為 consumer。num_stages 個 batch 的 async copy,此時所有拷貝皆 in-flight。1 個 partitioned pipeline(num_stages = 2,雙緩衝)
producer 半邊 consumer 半邊
acquire→async copy(in→buf)→commit wait→consume(buf→out)→release
│ stage 0 / stage 1 ▲
└──────► back-pressure ◄─────────┘
取代「每 buffer 兩個 async barrier」的舊寫法(見 4.9.7)
pipeline_shared_state<scope, count>