Documentation Index
Fetch the complete documentation index at: https://wb-21fd5541-weave-caching.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
W&B Weave の Threads を使用すると、LLM アプリケーションにおけるマルチターンの会話を追跡・分析できます。Threads は関連する呼び出しを共通の thread_id でグループ化し、一連のセッション全体を可視化したり、ターンをまたいだ会話レベルのメトリクスを追跡したりすることを可能にします。Threads はプログラムから作成でき、Weave UI で可視化できます。
Threads を使い始めるには、以下の手順を行ってください。
- Threads の基本を理解する
- コードサンプルを試す。一般的な使用パターンや実世界のユースケースを紹介しています。
ユースケース
Threads は、以下のようなものを整理・分析したい場合に役立ちます。
- マルチターンの会話
- セッションベースのワークフロー
- 関連する一連の操作シーケンス
Threads を使用すると、コンテキストごとに呼び出しをグループ化できるため、複数のステップにわたってシステムがどのように応答するかを理解しやすくなります。例えば、単一のユーザーセッション、エージェントの意思決定チェーン、またはインフラ層とビジネスロジック層をまたぐ複雑なリクエストなどを追跡できます。
アプリケーションを Threads とターンで構造化することで、Weave UI においてよりクリーンなメトリクスと高い視認性が得られます。すべての低レベルな操作を見る代わりに、重要な高レベルのステップに集中できます。
Thread
Thread は、共通の会話コンテキストを共有する、関連する呼び出しの論理的なグループです。Thread の特徴は以下の通りです。
- 一意の
thread_id を持つ
- 1 つ以上の ターン を含む
- 呼び出し間でコンテキストを維持する
- 完全なユーザーセッションやインタラクションフローを表す
ターン (Turn)
ターン は Thread 内のハイレベルな操作であり、UI ではスレッドビューの個々の行として表示されます。各ターンの特徴は以下の通りです。
- 会話やワークフローにおける 1 つの論理的なステップを表す
- ターンはスレッドコンテキストの直接の子であり、ネストされた下位レベルの呼び出し(スレッドレベルの統計には表示されない)を含む場合がある
呼び出し (Call)
Call は、アプリケーション内で @weave.op デコレータが付与された関数の実行を指します。
- ターン呼び出し は、新しいターンを開始するトップレベルの操作です
- ネストされた呼び出し は、ターン内の下位レベルの操作です
トレース (Trace)
トレース は、単一の操作の完全なコールスタックをキャプチャします。Threads は、同じ論理的な会話やセッションの一部であるトレースをグループ化します。言い換えれば、スレッドは複数のターンで構成され、各ターンが会話の 1 つのパートを表します。トレースの詳細については、Tracing の概要 を参照してください。
UI の概要
Weave のサイドバーで Threads を選択して、Threads リストビュー にアクセスします。
Threads リストビュー
- プロジェクト内の最近のスレッドを一覧表示します
- カラムにはターンの数、開始時間、最終更新時間が含まれます
- 行をクリックすると 詳細ドロワー が開きます
Threads 詳細ドロワー
- 任意の行をクリックすると、その行の詳細ドロワーが開きます
- スレッド内のすべてのターンを表示します
- ターンは開始時間順(期間や終了時間ではなく)にリストされます
- 呼び出しレベルのメタデータ(レイテンシ、入力、出力)が含まれます
- ログに記録されている場合は、メッセージ内容や構造化データもオプションで表示されます
- ターンの完全な実行内容を確認するには、スレッド詳細ドロワーからそのターンを開くことができます。これにより、その特定のターン中に発生したすべてのネストされた操作を詳細に調べることができます。
- ターンに LLM 呼び出しから抽出されたメッセージが含まれている場合、それらは右側のチャットペインに表示されます。これらのメッセージは通常、サポートされているインテグレーション(例:
openai.ChatCompletion.create)による呼び出しから取得され、表示には特定の基準を満たす必要があります。詳細については、チャットビューの振る舞い を参照してください。
チャットビューの振る舞い
チャットペインには、各ターン中に行われた LLM 呼び出しから抽出された構造化メッセージデータが表示されます。このビューにより、インタラクションを対話形式でレンダリングして確認できます。
何がメッセージとして扱われますか?
メッセージは、LLM プロバイダーとの直接のやり取り(プロンプトの送信とレスポンスの受信など)を表す、ターン内の呼び出しから抽出されます。他の呼び出しの中にさらにネストされていない呼び出しのみがメッセージとして表示されます。これにより、中間ステップや集約された内部ロジックの重複表示を避けます。
通常、メッセージは以下のような自動パッチが適用されたサードパーティ SDK から出力されます。
openai.ChatCompletion.create
anthropic.Anthropic.completion
メッセージが存在しない場合はどうなりますか?
ターンがメッセージを出力しない場合、チャットペインにはそのターンの空のメッセージセクションが表示されます。ただし、同じスレッド内の他のターンのメッセージは引き続き表示される場合があります。
ターンとチャットのインタラクション
- ターンをクリックすると、チャットペインがそのターンのメッセージ位置までスクロールします(ピン留め動作)。
- チャットペインをスクロールすると、左側のリストで対応するターンが強調表示されます。
トレースビューとの行き来
ターンをクリックして、そのターンの完全なトレースを開くことができます。
左上隅にスレッド詳細ビューに戻るための「戻る」ボタンが表示されます。スクロール位置などの UI 状態は遷移間で保持されません。
SDK の使用方法
このセクションの各例では、アプリケーションでターンとスレッドを整理するための異なる戦略を示します。ほとんどの例では、スタブ関数の中に独自の LLM 呼び出しやシステムの振る舞いを実装してください。
- セッションや会話を追跡するには、
weave.thread() コンテキストマネージャーを使用します。
- 論理的な操作に
@weave.op を付与して、ターンまたはネストされた呼び出しとして追跡します。
thread_id を渡すと、Weave はそのブロック内のすべての操作を同じスレッドにグループ化します。thread_id を省略すると、Weave は一意の ID を自動生成します。
weave.thread() からの戻り値は、thread_id プロパティを持つ ThreadContext オブジェクトです。これはログに記録したり、再利用したり、他のシステムに渡したりできます。
ネストされた weave.thread() コンテキストは、同じ thread_id が再利用されない限り、常に新しいスレッドを開始します。子のコンテキストが終了しても、親のコンテキストが中断されたり上書きされたりすることはありません。これにより、アプリのロジックに応じて、フォークされたスレッド構造やレイヤー化されたスレッドのオーケストレーションが可能になります。
基本的なスレッド作成
以下のコードサンプルは、weave.thread() を使用して 1 つ以上の操作を共通の thread_id でグループ化する方法を示しています。これは、アプリケーションで Threads を使い始める最も簡単な方法です。
import weave
@weave.op
def say_hello(name: str) -> str:
return f"Hello, {name}!"
# 新しいスレッドコンテキストを開始
with weave.thread() as thread_ctx:
print(f"Thread ID: {thread_ctx.thread_id}")
say_hello("Bill Nye the Science Guy")
手動でのエージェントループ実装
この例では、@weave.op デコレータと weave.thread() コンテキスト管理を使用して、対話型エージェントを手動で定義する方法を示します。process_user_message を呼び出すたびに、スレッド内に新しいターンが作成されます。独自のエージェントループを構築し、コンテキストとネストの処理を完全に制御したい場合にこのパターンを使用します。
短時間のインタラクションには自動生成されたスレッド ID を使用し、セッションをまたいでスレッドコンテキストを維持したい場合は、カスタムのセッション ID(例:user_session_123)を渡します。
import weave
class ConversationAgent:
@weave.op
def process_user_message(self, message: str) -> str:
"""
ターンレベルの操作: これが1つの会話ターンを表します。
この関数のみがスレッド統計にカウントされます。
"""
# ユーザーメッセージを保存
# ネストされた呼び出しを通じてAIレスポンスを生成
response = self._generate_response(message)
# アシスタントのレスポンスを保存
return response
@weave.op
def _generate_response(self, message: str) -> str:
"""ネストされた呼び出し: 実装の詳細であり、スレッド統計にはカウントされません。"""
context = self._retrieve_context(message) # 別のネストされた呼び出し
intent = self._classify_intent(message) # 別のネストされた呼び出し
response = self._call_llm(message, context) # LLM呼び出し(ネスト)
return self._format_response(response) # 最後のネストされた呼び出し
@weave.op
def _retrieve_context(self, message: str) -> str:
# ベクトルDB検索、ナレッジベースクエリなど
return "retrieved_context"
@weave.op
def _classify_intent(self, message: str) -> str:
# インテント分類ロジック
return "general_inquiry"
@weave.op
def _call_llm(self, message: str, context: str) -> str:
# OpenAI/AnthropicなどのAPI呼び出し
return "llm_response"
@weave.op
def _format_response(self, response: str) -> str:
# レスポンスのフォーマットロジック
return f"Formatted: {response}"
# 使用例: スレッドコンテキストは自動的に確立されます
agent = ConversationAgent()
# スレッドコンテキストを確立 - process_user_messageの各呼び出しがターンになります
with weave.thread() as thread_ctx: # thread_idを自動生成
print(f"Thread ID: {thread_ctx.thread_id}")
# process_user_messageの各呼び出しで1つのターンと複数のネストされた呼び出しが作成されます
agent.process_user_message("Hello, help with setup") # ターン 1
agent.process_user_message("What languages do you recommend?") # ターン 2
agent.process_user_message("Explain Python vs JavaScript") # ターン 3
# 結果: 3つのターンと約15-20の総呼び出し(ネストを含む)を持つスレッド
# 代替案: セッション追跡のために明示的なthread_idを使用する
session_id = "user_session_123"
with weave.thread(session_id) as thread_ctx:
print(f"Session Thread ID: {thread_ctx.thread_id}") # "user_session_123"
agent.process_user_message("Continue our previous conversation") # このセッションのターン 1
agent.process_user_message("Can you summarize what we discussed?") # このセッションのターン 2
呼び出しの深さが異なる手動エージェント
この例では、スレッドコンテキストが適用される場所に応じて、コールスタックの異なる深さでターンを定義できることを示します。このサンプルでは 2 つのプロバイダー(OpenAI と Anthropic)を使用しており、それぞれターン境界に到達するまでの呼び出しの深さが異なります。
すべてのターンは同じ thread_id を共有しますが、プロバイダーのロジックに応じて、ターンの境界がスタックの異なるレベルに現れます。これは、バックエンドごとにトレース方法を変えつつ、同じスレッドにグループ化したい場合に便利です。
import weave
import random
import asyncio
class OpenAIProvider:
"""OpenAIブランチ: ターン境界まで2レベルの深さのコールチェーン"""
@weave.op
def route_to_openai(self, user_input: str, thread_id: str) -> str:
"""レベル1: OpenAIリクエストのルーティングと準備"""
# 入力バリデーション、ルーティングロジック、基本的な前処理
print(f" L1: Routing to OpenAI for: {user_input}")
# ここがターンの境界 - スレッドコンテキストでラップ
with weave.thread(thread_id):
# レベル2を直接呼び出す - これがコールチェーンの深さを作る
return self.execute_openai_call(user_input)
@weave.op
def execute_openai_call(self, user_input: str) -> str:
"""レベル2: ターン境界 - OpenAI API呼び出しの実行"""
print(f" L2: Executing OpenAI API call")
response = f"OpenAI GPT-4 response: {user_input}"
return response
class AnthropicProvider:
"""Anthropicブランチ: ターン境界まで3レベルの深さのコールチェーン"""
@weave.op
def route_to_anthropic(self, user_input: str, thread_id: str) -> str:
"""レベル1: Anthropicリクエストのルーティングと準備"""
# 入力バリデーション、ルーティングロジック、プロバイダー選択
print(f" L1: Routing to Anthropic for: {user_input}")
# レベル2を呼び出す - これがコールチェーンの深さを作る
return self.authenticate_anthropic(user_input, thread_id)
@weave.op
def authenticate_anthropic(self, user_input: str, thread_id: str) -> str:
"""レベル2: Anthropicの認証とセットアップの処理"""
print(f" L2: Authenticating with Anthropic")
# 認証、レート制限、セッション管理
auth_token = "anthropic_key_xyz_authenticated"
# ここがターンの境界 - レベル3でスレッドコンテキストをラップ
with weave.thread(thread_id):
# レベル3を呼び出す - コールチェーンをさらにネスト
return self.execute_anthropic_call(user_input, auth_token)
@weave.op
def execute_anthropic_call(self, user_input: str, auth_token: str) -> str:
"""レベル3: ターン境界 - Anthropic API呼び出しの実行"""
print(f" L3: Executing Anthropic API call with auth")
response = f"Anthropic Claude response (auth: {auth_token[:15]}...): {user_input}"
return response
class MultiProviderAgent:
"""異なるコールチェーンの深さを持つプロバイダー間をルーティングするメインエージェント"""
def __init__(self):
self.openai_provider = OpenAIProvider()
self.anthropic_provider = AnthropicProvider()
def handle_conversation_turn(self, user_input: str, thread_id: str) -> str:
"""
コールチェーンの深さが異なるプロバイダーにルーティング。
スレッドコンテキストは各チェーンの異なるネストレベルで適用される。
"""
# デモ用にランダムにプロバイダーを選択
use_openai = random.choice([True, False])
if use_openai:
print(f"Choosing OpenAI (2-level call chain)")
# OpenAI: レベル1 → レベル2 (ターン境界)
response = self.openai_provider.route_to_openai(user_input, thread_id)
return f"[OpenAI Branch] {response}"
else:
print(f"Choosing Anthropic (3-level call chain)")
# Anthropic: レベル1 → レベル2 → レベル3 (ターン境界)
response = self.anthropic_provider.route_to_anthropic(user_input, thread_id)
return f"[Anthropic Branch] {response}"
async def main():
agent = MultiProviderAgent()
conversation_id = "nested_depth_conversation_999"
# 異なるコールチェーンの深さを持つマルチターン会話
conversation_turns = [
"What's deep learning?",
"Explain neural network backpropagation",
"How do attention mechanisms work?",
"What's the transformer architecture?",
"Compare CNNs vs RNNs"
]
print(f"Starting conversation: {conversation_id}")
for i, user_input in enumerate(conversation_turns, 1):
print(f"\\n--- Turn {i} ---")
print(f"User: {user_input}")
# 異なるコールチェーンの深さで同じthread_idを使用
response = agent.handle_conversation_turn(user_input, conversation_id)
print(f"Agent: {response}")
if __name__ == "__main__":
asyncio.run(main())
# 期待される結果: 5つのターンを持つ単一のスレッド
# - OpenAIのターン: コールチェーンのレベル2にスレッドコンテキストがある
# コールスタック: route_to_openai() → execute_openai_call() ← ここにスレッドコンテキスト
# - Anthropicのターン: コールチェーンのレベル3にスレッドコンテキストがある
# コールスタック: route_to_anthropic() → authenticate_anthropic() → execute_anthropic_call() ← ここにスレッドコンテキスト
# - すべてのターンが thread_id: "nested_depth_conversation_999" を共有
# - ターンの境界が異なるコールスタックの深さでマークされる
# - コールチェーン内の補助的な操作はターンではなくネストされた呼び出しとして追跡される
以前のセッションの再開
以前に開始したセッションを再開し、同じスレッドに呼び出しを追加し続けたい場合があります。また、既存のセッションを再開できず、代わりに新しいスレッドを開始しなければならない場合もあります。
オプションでスレッドを再開するように実装する場合、thread_id パラメータを None のままに しないでください。そうするとスレッドのグループ化が完全に無効になります。代わりに、常に有効なスレッド ID を指定してください。新しいスレッドを作成する必要がある場合は、generate_id() のような関数を使用して一意の識別子を生成してください。
thread_id が指定されていない場合、Weave の内部実装はランダムな UUID v7 を自動的に生成します。これと同様の動作を独自の generate_id() 関数で再現するか、任意のユニークな文字列値を使用できます。
import weave
import uuidv7
import argparse
def generate_id():
"""UUID v7を使用して一意のスレッドIDを生成する。"""
return str(uuidv7.uuidv7())
@weave.op
def load_history(session_id):
"""指定されたセッションの会話履歴を読み込む。"""
# ここに実装を記述
return []
# セッション再開のためのコマンドライン引数をパース
parser = argparse.ArgumentParser()
parser.add_argument("--session-id", help="再開する既存のセッションID")
args = parser.parse_args()
# スレッドIDの決定: 既存セッションの再開または新規作成
if args.session_id:
thread_id = args.session_id
print(f"Resuming session: {thread_id}")
else:
thread_id = generate_id()
print(f"Starting new session: {thread_id}")
# 呼び出しを追跡するためのスレッドコンテキストを確立
with weave.thread(thread_id) as thread_ctx:
# 会話履歴の読み込みまたは初期化
history = load_history(thread_id)
print(f"Active thread ID: {thread_ctx.thread_id}")
# アプリケーションロジックをここに記述...
ネストされたスレッド
この例では、複数の協調するスレッドを使用して複雑なアプリケーションを構造化する方法を示します。
各レイヤーは独自のスレッドコンテキストで実行され、関心事のクリーンな分離が可能になります。親アプリケーションのスレッドは、共有の ThreadContext を使用してスレッド ID を設定することで、これらのレイヤーを調整します。システムの異なる部分を独立して分析または監視しつつ、それらを共通のセッションに関連付けたい場合にこのパターンを使用します。
import weave
from contextlib import contextmanager
from typing import Dict
# ネストされたスレッドを調整するためのグローバルスレッドコンテキスト
class ThreadContext:
def __init__(self):
self.app_thread_id = None
self.infra_thread_id = None
self.logic_thread_id = None
def setup_for_request(self, request_id: str):
self.app_thread_id = f"app_{request_id}"
self.infra_thread_id = f"{self.app_thread_id}_infra"
self.logic_thread_id = f"{self.app_thread_id}_logic"
# グローバルインスタンス
thread_ctx = ThreadContext()
class InfrastructureLayer:
"""専用のスレッドですべてのインフラ操作を処理する"""
@weave.op
def authenticate_user(self, user_id: str) -> Dict:
# 認証ロジック...
return {"user_id": user_id, "authenticated": True}
@weave.op
def call_payment_gateway(self, amount: float) -> Dict:
# 支払い処理...
return {"status": "approved", "amount": amount}
@weave.op
def update_inventory(self, product_id: str, quantity: int) -> Dict:
# 在庫管理...
return {"product_id": product_id, "updated": True}
def execute_operations(self, user_id: str, order_data: Dict) -> Dict:
"""専用のスレッドコンテキストですべてのインフラ操作を実行する"""
with weave.thread(thread_ctx.infra_thread_id):
auth_result = self.authenticate_user(user_id)
payment_result = self.call_payment_gateway(order_data["amount"])
inventory_result = self.update_inventory(order_data["product_id"], order_data["quantity"])
return {
"auth": auth_result,
"payment": payment_result,
"inventory": inventory_result
}
class BusinessLogicLayer:
"""専用のスレッドでビジネスロジックを処理する"""
@weave.op
def validate_order(self, order_data: Dict) -> Dict:
# バリデーションロジック...
return {"valid": True}
@weave.op
def calculate_pricing(self, order_data: Dict) -> Dict:
# 価格計算...
return {"total": order_data["amount"], "tax": order_data["amount"] * 0.08}
@weave.op
def apply_business_rules(self, order_data: Dict) -> Dict:
# ビジネスルール...
return {"rules_applied": ["standard_processing"], "priority": "normal"}
def execute_logic(self, order_data: Dict) -> Dict:
"""専用のスレッドコンテキストですべてのビジネスロジックを実行する"""
with weave.thread(thread_ctx.logic_thread_id):
validation = self.validate_order(order_data)
pricing = self.calculate_pricing(order_data)
rules = self.apply_business_rules(order_data)
return {"validation": validation, "pricing": pricing, "rules": rules}
class OrderProcessingApp:
"""メインアプリケーションのオーケストレーター"""
def __init__(self):
self.infra = InfrastructureLayer()
self.business = BusinessLogicLayer()
@weave.op
def process_order(self, user_id: str, order_data: Dict) -> Dict:
"""メインの注文処理 - アプリスレッドのターンになる"""
# ネストされた操作をそれぞれの専用スレッドで実行
infra_results = self.infra.execute_operations(user_id, order_data)
logic_results = self.business.execute_logic(order_data)
# 最終的なオーケストレーション
return {
"order_id": f"order_12345",
"status": "completed",
"infra_results": infra_results,
"logic_results": logic_results
}
# グローバルなスレッドコンテキスト調整を伴う使用方法
def handle_order_request(request_id: str, user_id: str, order_data: Dict):
# このリクエスト用のスレッドコンテキストを設定
thread_ctx.setup_for_request(request_id)
# アプリスレッドコンテキストで実行
with weave.thread(thread_ctx.app_thread_id):
app = OrderProcessingApp()
result = app.process_order(user_id, order_data)
return result
# 使用例
order_result = handle_order_request(
request_id="req_789",
user_id="user_001",
order_data={"product_id": "laptop", "quantity": 1, "amount": 1299.99}
)
# 期待されるスレッド構造:
#
# App Thread: app_req_789
# └── Turn: process_order() ← メインオーケストレーション
#
# Infra Thread: app_req_789_infra
# ├── Turn: authenticate_user() ← インフラ操作 1
# ├── Turn: call_payment_gateway() ← インフラ操作 2
# └── Turn: update_inventory() ← インフラ操作 3
#
# Logic Thread: app_req_789_logic
# ├── Turn: validate_order() ← ビジネスロジック操作 1
# ├── Turn: calculate_pricing() ← ビジネスロジック操作 2
# └── Turn: apply_business_rules() ← ビジネスロジック操作 3
#
# メリット:
# - スレッド間での関心事のクリーンな分離
# - スレッドIDのパラメータ引き回しが不要
# - アプリ/インフラ/ロジック層の独立したモニタリング
# - スレッドコンテキストによるグローバルな調整
API 仕様
エンドポイント
エンドポイント: POST /threads/query
リクエストスキーマ
class ThreadsQueryReq:
project_id: str
limit: Optional[int] = None
offset: Optional[int] = None
sort_by: Optional[list[SortBy]] = None # サポートされているフィールド: thread_id, turn_count, start_time, last_updated
sortable_datetime_after: Optional[datetime] = None # 粒度最適化によるスレッドのフィルタリング
sortable_datetime_before: Optional[datetime] = None # 粒度最適化によるスレッドのフィルタリング
レスポンススキーマ
class ThreadSchema:
thread_id: str # スレッドの一意識別子
turn_count: int # このスレッド内のターン呼び出し数
start_time: datetime # このスレッド内のターン呼び出しの最も早い開始時間
last_updated: datetime # このスレッド内のターン呼び出しの最も遅い終了時間
class ThreadsQueryRes:
threads: List[ThreadSchema]
最近のアクティブなスレッドをクエリする
この例では、最近更新された 50 個のスレッドを取得します。my-project を実際のプロジェクト ID に置き換えてください。
# 最近アクティブだったスレッドを取得
response = client.threads_query(ThreadsQueryReq(
project_id="my-project",
sort_by=[SortBy(field="last_updated", direction="desc")],
limit=50
))
for thread in response.threads:
print(f"Thread {thread.thread_id}: {thread.turn_count} turns, last active {thread.last_updated}")
アクティビティレベルでスレッドをクエリする
この例では、ターン数が多い順に上位 20 個のスレッドを取得します。
# アクティビティの多い(ターン数が多い)スレッドを取得
response = client.threads_query(ThreadsQueryReq(
project_id="my-project",
sort_by=[SortBy(field="turn_count", direction="desc")],
limit=20
))
最近のスレッドのみをクエリする
この例では、過去 24 時間以内に開始されたスレッドを返します。timedelta の days の値を変更することで、時間枠を調整できます。
from datetime import datetime, timedelta
# 過去24時間以内に開始されたスレッドを取得
yesterday = datetime.now() - timedelta(days=1)
response = client.threads_query(ThreadsQueryReq(
project_id="my-project",
sortable_datetime_after=yesterday,
sort_by=[SortBy(field="start_time", direction="desc")]
))