Skip to content

@retrieval_span

Wraps a retrieval function with an OpenInference RETRIEVER span. Captures the query, retrieved documents, and their scores as span attributes.


Decorator / Wrapper

Signature

def retrieval_span(
    name: str | None = None,
    query_arg: str = "query",
) -> Callable
interface RetrievalSpanOptions {
    name?: string;
    queryArg?: string;
}

function retrievalSpan<A extends unknown[], R>(
    options: RetrievalSpanOptions,
    fn: (...args: A) => R | Promise<R>,
): (...args: A) => Promise<R>

Parameters

Parameter Type Default Description
name str \| None None Span name. Defaults to the decorated function's name.
query_arg str "query" Name of the keyword argument containing the query string. Falls back to the first positional str argument.

Span Attributes

Attribute Value
openinference.span.kind "RETRIEVER"
input.value The query string (from query_arg kwarg or first str positional argument)
retrieval.documents JSON-encoded list of documents (set when the function returns list[Document])

Example

from coalex.ext.retrieval import retrieval_span, Document

@retrieval_span(name="pubmed_search", query_arg="query")
def search_pubmed(query: str, max_results: int = 10) -> list[Document]:
    """Search PubMed for relevant medical abstracts."""
    results = pubmed_client.search(query, max_results=max_results)
    return [
        Document(
            content=r.abstract,
            id=r.pmid,
            score=r.relevance_score,
            metadata={"title": r.title, "year": r.year},
        )
        for r in results
    ]

# Usage inside a coalex_context
with coalex.coalex_context(agent_id="medical-bot"):
    docs = search_pubmed(query="side effects of metformin", max_results=5)
    # Span "pubmed_search" is emitted with:
    #   openinference.span.kind = "RETRIEVER"
    #   input.value = "side effects of metformin"
    #   retrieval.documents = '[{"document.content": "...", "document.id": "12345", ...}]'
import { retrievalSpan, type Document } from "@coalex-ai/sdk/ext";
import { coalexContext } from "@coalex-ai/sdk";

const searchPubmed = retrievalSpan(
    { name: "pubmed_search" },
    async (query: string, maxResults = 10): Promise<Document[]> => {
        const results = await pubmedClient.search(query, { maxResults });
        return results.map(r => ({
            content: r.abstract,
            id: r.pmid,
            score: r.relevanceScore,
            metadata: { title: r.title, year: r.year },
        }));
    },
);

// Usage inside a coalexContext
await coalexContext({ agentId: "medical-bot" }, async () => {
    const docs = await searchPubmed("side effects of metformin", 5);
    // Span "pubmed_search" is emitted with:
    //   openinference.span.kind = "RETRIEVER"
    //   input.value = "side effects of metformin"
    //   retrieval.documents = '[{"document.content": "...", "document.id": "12345", ...}]'
});

Async Support

The decorator automatically detects async functions:

@retrieval_span(name="async_vector_search")
async def search_vectors(query: str) -> list[Document]:
    results = await vector_store.asearch(query, top_k=10)
    return [Document(content=r.text, id=r.id, score=r.score) for r in results]

All wrapper functions natively support async:

const searchVectors = retrievalSpan(
    { name: "async_vector_search" },
    async (query: string): Promise<Document[]> => {
        const results = await vectorStore.search(query, { topK: 10 });
        return results.map(r => ({ content: r.text, id: r.id, score: r.score }));
    },
);

RetrievalSpan Context Manager / Class

For fine-grained control over when the query and documents are set on the span (e.g., streaming, multi-step retrieval), use the RetrievalSpan context manager (Python) or class (TypeScript).

Signature

class RetrievalSpan:
    def __init__(self, name: str = "retrieval") -> None: ...
    def set_query(self, query: str) -> None: ...
    def set_documents(self, docs: list[Document]) -> None: ...
class RetrievalSpan {
    constructor(name?: string);
    start(): void;
    setQuery(query: string): void;
    setDocuments(docs: Document[]): void;
    end(): void;
}

Methods

Python TypeScript Description
set_query(query) setQuery(query) Set the input.value attribute on the span.
set_documents(docs) setDocuments(docs) Set the retrieval.documents attribute (JSON-encoded).
-- start() Start the span (TypeScript only; Python uses with statement).
-- end() End the span (TypeScript only; Python uses with statement).

Example

from coalex.ext.retrieval import RetrievalSpan, Document

with RetrievalSpan("multi_step_retrieval") as r:
    # Step 1: Set the query
    r.set_query("cardiovascular risk factors")

    # Step 2: Retrieve from multiple sources
    pubmed_docs = search_pubmed("cardiovascular risk factors")
    internal_docs = search_internal_kb("cardiovascular risk factors")

    # Step 3: Combine and set documents
    all_docs = pubmed_docs + internal_docs
    r.set_documents(all_docs)
import { RetrievalSpan } from "@coalex-ai/sdk/ext";

const r = new RetrievalSpan("multi_step_retrieval");
r.start();
try {
    // Step 1: Set the query
    r.setQuery("cardiovascular risk factors");

    // Step 2: Retrieve from multiple sources
    const pubmedDocs = await searchPubmed("cardiovascular risk factors");
    const internalDocs = await searchInternalKb("cardiovascular risk factors");

    // Step 3: Combine and set documents
    const allDocs = [...pubmedDocs, ...internalDocs];
    r.setDocuments(allDocs);
} finally {
    r.end();
}

Document

A dataclass (Python) or interface (TypeScript) representing a retrieved document with OpenInference-compatible fields.

Signature

@dataclass
class Document:
    content: str
    id: str | None = None
    score: float | None = None
    metadata: dict[str, Any] = field(default_factory=dict)
interface Document {
    content: string;
    id?: string;
    score?: number;
    metadata?: Record<string, unknown>;
}

Fields

Field Type Default Description
content str required The document text content.
id str \| None None Document identifier (e.g., PubMed ID, vector store ID).
score float \| None None Relevance score (e.g., cosine similarity, BM25 score).
metadata dict[str, Any] {} Arbitrary metadata (e.g., title, source, date). Serialized as JSON.

Methods

Method Description
to_openinference_dict() Convert to an OpenInference-compatible dictionary.

Example

from coalex.ext.retrieval import Document

doc = Document(
    content="Metformin is a first-line treatment for type 2 diabetes.",
    id="pmid-28849587",
    score=0.94,
    metadata={"title": "Metformin Review", "year": 2023, "source": "PubMed"},
)

# Convert to OpenInference format
oi_dict = doc.to_openinference_dict()
# {
#     "document.content": "Metformin is a first-line treatment...",
#     "document.id": "pmid-28849587",
#     "document.score": 0.94,
#     "document.metadata": '{"title": "Metformin Review", "year": 2023, "source": "PubMed"}'
# }
import type { Document } from "@coalex-ai/sdk/ext";

const doc: Document = {
    content: "Metformin is a first-line treatment for type 2 diabetes.",
    id: "pmid-28849587",
    score: 0.94,
    metadata: { title: "Metformin Review", year: 2023, source: "PubMed" },
};

encode_documents() / encodeDocuments()

Serialize a list of Document objects to an OpenInference-compatible JSON string.

Signature

def encode_documents(docs: list[Document]) -> str
function encodeDocuments(docs: Document[]): string

Example

from coalex.ext.retrieval import Document, encode_documents

docs = [
    Document(content="First result", id="doc-1", score=0.95),
    Document(content="Second result", id="doc-2", score=0.88),
]

json_str = encode_documents(docs)
# '[{"document.content": "First result", "document.id": "doc-1", "document.score": 0.95}, ...]'
import { encodeDocuments, type Document } from "@coalex-ai/sdk/ext";

const docs: Document[] = [
    { content: "First result", id: "doc-1", score: 0.95 },
    { content: "Second result", id: "doc-2", score: 0.88 },
];

const jsonStr = encodeDocuments(docs);
// '[{"document.content":"First result","document.id":"doc-1","document.score":0.95},...]'

Full PubMed Retrieval Example

import coalex
from coalex.ext.retrieval import retrieval_span, Document
from openai import OpenAI

coalex.register(api_key="your-key")
coalex.auto_instrument()

client = OpenAI()


@retrieval_span(name="pubmed_retrieval", query_arg="query")
def retrieve_pubmed(query: str, max_results: int = 5) -> list[Document]:
    """Search PubMed and return relevant abstracts as Documents."""
    from Bio import Entrez

    Entrez.email = "agent@example.com"
    handle = Entrez.esearch(db="pubmed", term=query, retmax=max_results)
    record = Entrez.read(handle)
    ids = record["IdList"]

    handle = Entrez.efetch(db="pubmed", id=ids, rettype="abstract", retmode="xml")
    articles = Entrez.read(handle)["PubmedArticle"]

    return [
        Document(
            content=str(a["MedlineCitation"]["Article"].get("Abstract", {}).get("AbstractText", [""])[0]),
            id=str(a["MedlineCitation"]["PMID"]),
            metadata={"title": str(a["MedlineCitation"]["Article"]["ArticleTitle"])},
        )
        for a in articles
    ]


with coalex.coalex_context(agent_id="medical-qa", request_id="req-001"):
    docs = retrieve_pubmed(query="metformin side effects in elderly patients")

    context = "\n\n".join(d.content for d in docs)
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": f"Answer based on these abstracts:\n{context}"},
            {"role": "user", "content": "What are the side effects of metformin in elderly patients?"},
        ],
    )
    print(response.choices[0].message.content)

API Reference

coalex.ext.retrieval

Retrieval span decorator and context manager for RAG pipelines.

Classes

Document dataclass

A retrieved document with OpenInference-compatible fields.

Source code in coalex/ext/retrieval.py
@dataclasses.dataclass
class Document:
    """A retrieved document with OpenInference-compatible fields."""

    content: str
    id: str | None = None
    score: float | None = None
    metadata: dict[str, Any] = dataclasses.field(default_factory=dict)

    def to_openinference_dict(self) -> dict[str, Any]:
        d: dict[str, Any] = {"document.content": self.content}
        if self.id is not None:
            d["document.id"] = self.id
        if self.score is not None:
            d["document.score"] = self.score
        if self.metadata:
            d["document.metadata"] = json.dumps(self.metadata)
        return d

RetrievalSpan

Context manager for instrumenting retrieval logic as a RETRIEVER span.

Use when you need fine-grained control over when documents and query are attached (e.g. streaming or multi-step retrieval).

Example::

with RetrievalSpan("my_retrieval") as r:
    r.set_query(query)
    docs = fetch_documents(query)
    r.set_documents(docs)
Source code in coalex/ext/retrieval.py
class RetrievalSpan:
    """Context manager for instrumenting retrieval logic as a RETRIEVER span.

    Use when you need fine-grained control over when documents and query
    are attached (e.g. streaming or multi-step retrieval).

    Example::

        with RetrievalSpan("my_retrieval") as r:
            r.set_query(query)
            docs = fetch_documents(query)
            r.set_documents(docs)
    """

    def __init__(self, name: str = "retrieval") -> None:
        self._name = name
        self._span: Any = None
        self._ctx: Any = None

    def __enter__(self) -> RetrievalSpan:
        tracer = _get_tracer()
        parent_ctx = _get_parent_context()
        self._ctx = tracer.start_as_current_span(self._name, context=parent_ctx)
        self._span = self._ctx.__enter__()
        self._span.set_attribute("openinference.span.kind", "RETRIEVER")
        return self

    def set_query(self, query: str) -> None:
        self._span.set_attribute("input.value", query)

    def set_documents(self, docs: list[Document]) -> None:
        self._span.set_attribute("retrieval.documents", encode_documents(docs))
        self._span.set_attribute("output.value", _document_summary(docs))

    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool | None:
        if exc_val is None:
            self._span.set_status(Status(StatusCode.OK))
        return self._ctx.__exit__(exc_type, exc_val, exc_tb)

Functions

encode_documents

encode_documents(docs: list[Document]) -> str

Serialize documents to OpenInference retrieval.documents JSON string.

Source code in coalex/ext/retrieval.py
def encode_documents(docs: list[Document]) -> str:
    """Serialize documents to OpenInference retrieval.documents JSON string."""
    return json.dumps([d.to_openinference_dict() for d in docs])

retrieval_span

retrieval_span(
    name: str | None = None, query_arg: str = "query"
) -> Callable[[F], F]

Decorator that wraps a retrieval function with an OpenInference RETRIEVER span.

Supports both sync and async functions. The decorated function must return list[Document]. The span captures: - openinference.span.kind = RETRIEVER - input.value — query string (from query_arg kwarg or first str positional) - retrieval.documents — JSON-encoded list of documents

Parameters:

Name Type Description Default
name str | None

Span name. Defaults to the function name.

None
query_arg str

Name of the kwarg containing the query string.

'query'
Source code in coalex/ext/retrieval.py
def retrieval_span(name: str | None = None, query_arg: str = "query") -> Callable[[F], F]:
    """Decorator that wraps a retrieval function with an OpenInference RETRIEVER span.

    Supports both sync and async functions. The decorated function must return
    ``list[Document]``. The span captures:
    - ``openinference.span.kind`` = ``RETRIEVER``
    - ``input.value`` — query string (from ``query_arg`` kwarg or first ``str`` positional)
    - ``retrieval.documents`` — JSON-encoded list of documents

    Args:
        name: Span name. Defaults to the function name.
        query_arg: Name of the kwarg containing the query string.
    """

    def decorator(fn: F) -> F:
        span_name = name or fn.__name__

        if asyncio.iscoroutinefunction(fn):

            @functools.wraps(fn)
            async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
                tracer = _get_tracer()
                parent_ctx = _get_parent_context()
                with tracer.start_as_current_span(span_name, context=parent_ctx) as span:
                    span.set_attribute("openinference.span.kind", "RETRIEVER")
                    q = kwargs.get(query_arg) or next((a for a in args if isinstance(a, str)), None)
                    if q:
                        span.set_attribute("input.value", str(q))
                    result = await fn(*args, **kwargs)
                    if isinstance(result, list) and result and isinstance(result[0], Document):
                        span.set_attribute("retrieval.documents", encode_documents(result))
                        span.set_attribute("output.value", _document_summary(result))
                    span.set_status(Status(StatusCode.OK))
                    return result

            return async_wrapper  # type: ignore[return-value]

        @functools.wraps(fn)
        def wrapper(*args: Any, **kwargs: Any) -> Any:
            tracer = _get_tracer()
            parent_ctx = _get_parent_context()
            with tracer.start_as_current_span(span_name, context=parent_ctx) as span:
                span.set_attribute("openinference.span.kind", "RETRIEVER")
                q = kwargs.get(query_arg) or next((a for a in args if isinstance(a, str)), None)
                if q:
                    span.set_attribute("input.value", str(q))
                result = fn(*args, **kwargs)
                if isinstance(result, list) and result and isinstance(result[0], Document):
                    span.set_attribute("retrieval.documents", encode_documents(result))
                    span.set_attribute("output.value", _document_summary(result))
                span.set_status(Status(StatusCode.OK))
                return result

        return wrapper  # type: ignore[return-value]

    return decorator