Skip to content

Extensions

The extensions module provides OpenInference-compatible span instrumentation for custom pipeline steps -- retrieval, embedding, reranking, tool calls, and guardrails -- without depending on LlamaIndex or LangChain auto-instrumentors.

  • Python: coalex.ext module with decorator syntax (@retrieval_span, @tool_span, etc.)
  • TypeScript: @coalex-ai/sdk/ext with wrapper function syntax (retrievalSpan(), toolSpan(), etc.)

When to Use Extensions

Use extension decorators when:

  • You have custom retrieval, embedding, or reranking logic that is not covered by auto-instrumentation.
  • You want to instrument tool calls or guardrail checks as first-class spans.
  • You are building a RAG pipeline from scratch without LlamaIndex or LangChain.
  • You need fine-grained control over span attributes (via context managers).

Extensions complement auto-instrumentation

You can use coalex.auto_instrument() for LLM calls and extension decorators for your custom steps. They work together -- all spans appear in the same trace.


Available Extensions

Decorators (Python) / Wrapper Functions (TypeScript)

Each extension wraps a function and creates an OpenInference-compatible span with the appropriate openinference.span.kind.

Python Decorator TypeScript Wrapper Span Kind Use Case
@retrieval_span retrievalSpan() RETRIEVER Document retrieval from a knowledge base, vector store, or API
@embedding_span embeddingSpan() EMBEDDING Text-to-vector embedding calls
@reranker_span rerankerSpan() RERANKER Re-scoring and reordering retrieved documents
@tool_span toolSpan() TOOL External tool or function calls (APIs, databases, calculators)
@guardrail_span guardrailSpan() GUARDRAIL Input/output validation and safety checks

Context Managers

For cases where you need fine-grained control over when attributes are set (e.g., streaming, multi-step operations), three extensions provide context manager alternatives:

Context Manager Decorator Equivalent Reference
RetrievalSpan @retrieval_span Details
EmbeddingSpan @embedding_span Details
RerankerSpan @reranker_span Details

No context managers for @tool_span and @guardrail_span

@tool_span and @guardrail_span are decorator-only. They do not have context manager alternatives because their attribute-setting logic is straightforward and does not benefit from deferred attribute assignment.

Data Classes

Class Module Description
Document coalex.ext.retrieval A retrieved document with content, id, score, and metadata fields. Used by @retrieval_span and @reranker_span.

Utility Functions

Function Module Description
encode_documents() coalex.ext.retrieval Serializes a list[Document] to an OpenInference-compatible JSON string.

Import Patterns

# Import individual decorators
from coalex.ext.retrieval import retrieval_span, Document
from coalex.ext.embedding import embedding_span
from coalex.ext.reranker import reranker_span
from coalex.ext.tool import tool_span
from coalex.ext.guardrail import guardrail_span

# Or import everything from the ext package
from coalex.ext import (
    retrieval_span, RetrievalSpan, Document, encode_documents,
    embedding_span, EmbeddingSpan,
    reranker_span, RerankerSpan,
    tool_span,
    guardrail_span,
)
// Import individual wrappers
import { retrievalSpan, type Document } from "@coalex-ai/sdk/ext";
import { embeddingSpan, EmbeddingSpan } from "@coalex-ai/sdk/ext";
import { rerankerSpan, RerankerSpan } from "@coalex-ai/sdk/ext";
import { toolSpan } from "@coalex-ai/sdk/ext";
import { guardrailSpan } from "@coalex-ai/sdk/ext";

// Or import everything from the ext entrypoint
import {
    retrievalSpan, RetrievalSpan, encodeDocuments,
    embeddingSpan, EmbeddingSpan,
    rerankerSpan, RerankerSpan,
    toolSpan,
    guardrailSpan,
} from "@coalex-ai/sdk/ext";

Full RAG Pipeline Example

import coalex
from coalex.ext.retrieval import retrieval_span, Document
from coalex.ext.embedding import embedding_span
from coalex.ext.reranker import reranker_span
from coalex.ext.guardrail import guardrail_span
from coalex.ext.tool import tool_span
from openai import OpenAI

coalex.register(api_key="your-key")
coalex.auto_instrument()

client = OpenAI()


@embedding_span(name="embed_query", model_name="text-embedding-3-small")
def embed(text: str) -> list[float]:
    resp = client.embeddings.create(model="text-embedding-3-small", input=text)
    return resp.data[0].embedding


@retrieval_span(name="search_knowledge_base", query_arg="query")
def retrieve(query: str) -> list[Document]:
    vector = embed(text=query)
    # ... search your vector store with `vector` ...
    return [
        Document(content="...", id="doc-1", score=0.95),
        Document(content="...", id="doc-2", score=0.87),
    ]


@reranker_span(name="rerank_results", model_name="rerank-v3", top_k=3)
def rerank(query: str, documents: list[Document]) -> list[Document]:
    # ... call your reranker API ...
    return sorted(documents, key=lambda d: d.score or 0, reverse=True)[:3]


@guardrail_span(guardrail_name="pii_check")
def check_pii(text: str) -> float:
    # Return a confidence score; raise to signal "fail"
    if "SSN" in text:
        raise ValueError("PII detected: SSN")
    return 1.0


@tool_span(name="format_citation", description="Format a document as a citation")
def format_citation(doc_id: str, title: str) -> str:
    return f"[{doc_id}] {title}"


with coalex.coalex_context(agent_id="rag-agent", request_id="req-001"):
    docs = retrieve(query="What is the refund policy?")
    ranked = rerank(query="What is the refund policy?", documents=docs)

    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": f"Context: {ranked[0].content}"},
            {"role": "user", "content": "What is the refund policy?"},
        ],
    )
    answer = response.choices[0].message.content

    check_pii(answer)
    citation = format_citation(doc_id=ranked[0].id, title="Refund Policy")
import { register, coalexContext, autoInstrument } from "@coalex-ai/sdk";
import {
    retrievalSpan, embeddingSpan, rerankerSpan,
    guardrailSpan, toolSpan, type Document,
} from "@coalex-ai/sdk/ext";
import OpenAI from "openai";

register({ apiKey: "your-key" });
autoInstrument();

const client = new OpenAI();

const embed = embeddingSpan(
    { name: "embed_query", modelName: "text-embedding-3-small" },
    async (text: string): Promise<number[]> => {
        const resp = await client.embeddings.create({ model: "text-embedding-3-small", input: text });
        return resp.data[0].embedding;
    },
);

const retrieve = retrievalSpan(
    { name: "search_knowledge_base" },
    async (query: string): Promise<Document[]> => {
        const vector = await embed(query);
        // ... search your vector store with `vector` ...
        return [
            { content: "...", id: "doc-1", score: 0.95 },
            { content: "...", id: "doc-2", score: 0.87 },
        ];
    },
);

const rerank = rerankerSpan(
    { name: "rerank_results", modelName: "rerank-v3", topK: 3 },
    async (query: string, documents: Document[]): Promise<Document[]> => {
        // ... call your reranker API ...
        return [...documents].sort((a, b) => (b.score ?? 0) - (a.score ?? 0)).slice(0, 3);
    },
);

const checkPii = guardrailSpan(
    { guardrailName: "pii_check" },
    async (text: string): Promise<number> => {
        if (text.includes("SSN")) throw new Error("PII detected: SSN");
        return 1.0;
    },
);

const formatCitation = toolSpan(
    { name: "format_citation", description: "Format a document as a citation" },
    async (docId: string, title: string): Promise<string> => {
        return `[${docId}] ${title}`;
    },
);

await coalexContext({ agentId: "rag-agent", requestId: "req-001" }, async () => {
    const docs = await retrieve("What is the refund policy?");
    const ranked = await rerank("What is the refund policy?", docs);

    const response = await client.chat.completions.create({
        model: "gpt-4o",
        messages: [
            { role: "system", content: `Context: ${ranked[0].content}` },
            { role: "user", content: "What is the refund policy?" },
        ],
    });
    const answer = response.choices[0].message.content!;

    await checkPii(answer);
    const citation = await formatCitation(ranked[0].id!, "Refund Policy");
});

Async Support

All decorators support both synchronous and asynchronous functions. The decorator automatically detects whether the wrapped function is a coroutine and handles it correctly:

@retrieval_span(name="async_search")
async def search(query: str) -> list[Document]:
    results = await vector_store.asearch(query)
    return [Document(content=r.text, id=r.id, score=r.score) for r in results]

All wrapper functions support both synchronous and asynchronous functions. The wrapped function always returns a Promise:

const search = retrievalSpan(
    { name: "async_search" },
    async (query: string): Promise<Document[]> => {
        const results = await vectorStore.search(query);
        return results.map(r => ({ content: r.text, id: r.id, score: r.score }));
    },
);

API Reference

coalex.ext

coalex.ext — OpenInference-compatible span decorators for RAG pipelines.

Use these decorators to instrument custom retrieval, embedding, reranking, tool, and guardrail logic without depending on LlamaIndex or LangChain auto-instrumentors.

Example::

from coalex.ext.retrieval import retrieval_span, Document

@retrieval_span(name="pubmed_retrieval", query_arg="query")
def retrieve(query: str) -> list[Document]:
    ...
    return [Document(content=abstract, id=pmid)]

Classes

EmbeddingSpan

Context manager for instrumenting an embedding call as an EMBEDDING span.

Example::

with EmbeddingSpan("embed", model_name="text-embedding-3-small") as e:
    e.set_text(text)
    vector = embed(text)
    e.set_vector(vector, text)
Source code in coalex/ext/embedding.py
class EmbeddingSpan:
    """Context manager for instrumenting an embedding call as an EMBEDDING span.

    Example::

        with EmbeddingSpan("embed", model_name="text-embedding-3-small") as e:
            e.set_text(text)
            vector = embed(text)
            e.set_vector(vector, text)
    """

    def __init__(self, name: str = "embedding", model_name: str | None = None) -> None:
        self._name = name
        self._model_name = model_name
        self._span: Any = None
        self._ctx: Any = None

    def __enter__(self) -> EmbeddingSpan:
        tracer = _get_tracer()
        parent_ctx = _get_parent_context()
        self._ctx = tracer.start_as_current_span(self._name, context=parent_ctx)
        self._span = self._ctx.__enter__()
        self._span.set_attribute("openinference.span.kind", "EMBEDDING")
        if self._model_name:
            self._span.set_attribute("embedding.model_name", self._model_name)
        return self

    def set_text(self, text: str) -> None:
        self._span.set_attribute("input.value", text)

    def set_vector(self, vector: list[float], text: str = "") -> None:
        encoded = json.dumps([{"embedding.vector": vector, "embedding.text": text}])
        self._span.set_attribute("embedding.embeddings", encoded)

    def __exit__(self, *exc_info: Any) -> bool | None:
        return self._ctx.__exit__(*exc_info)

RerankerSpan

Context manager for instrumenting a reranking step as a RERANKER span.

Example::

with RerankerSpan("rerank", model_name="rerank-v3", top_k=5) as r:
    r.set_query(query)
    r.set_input_documents(docs)
    ranked = reranker.rerank(query, docs, top_k=5)
    r.set_output_documents(ranked)
Source code in coalex/ext/reranker.py
class RerankerSpan:
    """Context manager for instrumenting a reranking step as a RERANKER span.

    Example::

        with RerankerSpan("rerank", model_name="rerank-v3", top_k=5) as r:
            r.set_query(query)
            r.set_input_documents(docs)
            ranked = reranker.rerank(query, docs, top_k=5)
            r.set_output_documents(ranked)
    """

    def __init__(self, name: str = "reranker", model_name: str | None = None, top_k: int | None = None) -> None:
        self._name = name
        self._model_name = model_name
        self._top_k = top_k
        self._span: Any = None
        self._ctx: Any = None

    def __enter__(self) -> RerankerSpan:
        tracer = _get_tracer()
        parent_ctx = _get_parent_context()
        self._ctx = tracer.start_as_current_span(self._name, context=parent_ctx)
        self._span = self._ctx.__enter__()
        self._span.set_attribute("openinference.span.kind", "RERANKER")
        if self._model_name:
            self._span.set_attribute("reranker.model_name", self._model_name)
        if self._top_k is not None:
            self._span.set_attribute("reranker.top_k", self._top_k)
        return self

    def set_query(self, query: str) -> None:
        self._span.set_attribute("reranker.query", query)

    def set_input_documents(self, docs: list[Document]) -> None:
        self._span.set_attribute("reranker.input_documents", encode_documents(docs))

    def set_output_documents(self, docs: list[Document]) -> None:
        self._span.set_attribute("reranker.output_documents", encode_documents(docs))

    def __exit__(self, *exc_info: Any) -> bool | None:
        return self._ctx.__exit__(*exc_info)

Document dataclass

A retrieved document with OpenInference-compatible fields.

Source code in coalex/ext/retrieval.py
@dataclasses.dataclass
class Document:
    """A retrieved document with OpenInference-compatible fields."""

    content: str
    id: str | None = None
    score: float | None = None
    metadata: dict[str, Any] = dataclasses.field(default_factory=dict)

    def to_openinference_dict(self) -> dict[str, Any]:
        d: dict[str, Any] = {"document.content": self.content}
        if self.id is not None:
            d["document.id"] = self.id
        if self.score is not None:
            d["document.score"] = self.score
        if self.metadata:
            d["document.metadata"] = json.dumps(self.metadata)
        return d

RetrievalSpan

Context manager for instrumenting retrieval logic as a RETRIEVER span.

Use when you need fine-grained control over when documents and query are attached (e.g. streaming or multi-step retrieval).

Example::

with RetrievalSpan("my_retrieval") as r:
    r.set_query(query)
    docs = fetch_documents(query)
    r.set_documents(docs)
Source code in coalex/ext/retrieval.py
class RetrievalSpan:
    """Context manager for instrumenting retrieval logic as a RETRIEVER span.

    Use when you need fine-grained control over when documents and query
    are attached (e.g. streaming or multi-step retrieval).

    Example::

        with RetrievalSpan("my_retrieval") as r:
            r.set_query(query)
            docs = fetch_documents(query)
            r.set_documents(docs)
    """

    def __init__(self, name: str = "retrieval") -> None:
        self._name = name
        self._span: Any = None
        self._ctx: Any = None

    def __enter__(self) -> RetrievalSpan:
        tracer = _get_tracer()
        parent_ctx = _get_parent_context()
        self._ctx = tracer.start_as_current_span(self._name, context=parent_ctx)
        self._span = self._ctx.__enter__()
        self._span.set_attribute("openinference.span.kind", "RETRIEVER")
        return self

    def set_query(self, query: str) -> None:
        self._span.set_attribute("input.value", query)

    def set_documents(self, docs: list[Document]) -> None:
        self._span.set_attribute("retrieval.documents", encode_documents(docs))
        self._span.set_attribute("output.value", _document_summary(docs))

    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool | None:
        if exc_val is None:
            self._span.set_status(Status(StatusCode.OK))
        return self._ctx.__exit__(exc_type, exc_val, exc_tb)

Functions

embedding_span

embedding_span(
    name: str | None = None,
    model_name: str | None = None,
    query_arg: str = "text",
) -> Callable[[F], F]

Decorator that wraps an embedding function with an OpenInference EMBEDDING span.

Supports both sync and async functions. The span captures: - openinference.span.kind = EMBEDDING - embedding.model_name — if provided - input.value — text being embedded - embedding.embeddings — JSON-encoded vector if function returns list[float]

Parameters:

Name Type Description Default
name str | None

Span name. Defaults to the function name.

None
model_name str | None

Embedding model name to record.

None
query_arg str

Name of the kwarg containing the text to embed.

'text'
Source code in coalex/ext/embedding.py
def embedding_span(
    name: str | None = None,
    model_name: str | None = None,
    query_arg: str = "text",
) -> Callable[[F], F]:
    """Decorator that wraps an embedding function with an OpenInference EMBEDDING span.

    Supports both sync and async functions. The span captures:
    - ``openinference.span.kind`` = ``EMBEDDING``
    - ``embedding.model_name`` — if provided
    - ``input.value`` — text being embedded
    - ``embedding.embeddings`` — JSON-encoded vector if function returns ``list[float]``

    Args:
        name: Span name. Defaults to the function name.
        model_name: Embedding model name to record.
        query_arg: Name of the kwarg containing the text to embed.
    """

    def decorator(fn: F) -> F:
        span_name = name or fn.__name__

        if asyncio.iscoroutinefunction(fn):

            @functools.wraps(fn)
            async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
                tracer = _get_tracer()
                parent_ctx = _get_parent_context()
                with tracer.start_as_current_span(span_name, context=parent_ctx) as span:
                    span.set_attribute("openinference.span.kind", "EMBEDDING")
                    if model_name:
                        span.set_attribute("embedding.model_name", model_name)
                    text = kwargs.get(query_arg) or next((a for a in args if isinstance(a, str)), None)
                    if text:
                        span.set_attribute("input.value", str(text))
                    result = await fn(*args, **kwargs)
                    if isinstance(result, list) and result and isinstance(result[0], float):
                        encoded = json.dumps([{"embedding.vector": result, "embedding.text": str(text or "")}])
                        span.set_attribute("embedding.embeddings", encoded)
                    return result

            return async_wrapper  # type: ignore[return-value]

        @functools.wraps(fn)
        def wrapper(*args: Any, **kwargs: Any) -> Any:
            tracer = _get_tracer()
            parent_ctx = _get_parent_context()
            with tracer.start_as_current_span(span_name, context=parent_ctx) as span:
                span.set_attribute("openinference.span.kind", "EMBEDDING")
                if model_name:
                    span.set_attribute("embedding.model_name", model_name)
                text = kwargs.get(query_arg) or next((a for a in args if isinstance(a, str)), None)
                if text:
                    span.set_attribute("input.value", str(text))
                result = fn(*args, **kwargs)
                if isinstance(result, list) and result and isinstance(result[0], float):
                    encoded = json.dumps([{"embedding.vector": result, "embedding.text": str(text or "")}])
                    span.set_attribute("embedding.embeddings", encoded)
                return result

        return wrapper  # type: ignore[return-value]

    return decorator

guardrail_span

guardrail_span(
    name: str | None = None,
    guardrail_name: str | None = None,
) -> Callable[[F], F]

Decorator that wraps a guardrail function with an OpenInference GUARDRAIL span.

Supports both sync and async functions. The span captures: - openinference.span.kind = GUARDRAIL - guardrail.name — from guardrail_name or the function name - guardrail.result"pass" if no exception raised, "fail" otherwise - guardrail.score — set this via the return value if it is a float

Parameters:

Name Type Description Default
name str | None

Span name. Defaults to the function name.

None
guardrail_name str | None

Logical guardrail name recorded as an attribute.

None
Source code in coalex/ext/guardrail.py
def guardrail_span(
    name: str | None = None,
    guardrail_name: str | None = None,
) -> Callable[[F], F]:
    """Decorator that wraps a guardrail function with an OpenInference GUARDRAIL span.

    Supports both sync and async functions. The span captures:
    - ``openinference.span.kind`` = ``GUARDRAIL``
    - ``guardrail.name`` — from ``guardrail_name`` or the function name
    - ``guardrail.result`` — ``"pass"`` if no exception raised, ``"fail"`` otherwise
    - ``guardrail.score`` — set this via the return value if it is a ``float``

    Args:
        name: Span name. Defaults to the function name.
        guardrail_name: Logical guardrail name recorded as an attribute.
    """

    def decorator(fn: F) -> F:
        span_name = name or fn.__name__
        logical_name = guardrail_name or span_name

        if asyncio.iscoroutinefunction(fn):

            @functools.wraps(fn)
            async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
                tracer = _get_tracer()
                parent_ctx = _get_parent_context()
                with tracer.start_as_current_span(span_name, context=parent_ctx) as span:
                    span.set_attribute("openinference.span.kind", "GUARDRAIL")
                    span.set_attribute("guardrail.name", logical_name)
                    try:
                        result = await fn(*args, **kwargs)
                        span.set_attribute("guardrail.result", "pass")
                        if isinstance(result, float):
                            span.set_attribute("guardrail.score", result)
                        span.set_status(Status(StatusCode.OK))
                        return result
                    except Exception:
                        span.set_attribute("guardrail.result", "fail")
                        raise

            return async_wrapper  # type: ignore[return-value]

        @functools.wraps(fn)
        def wrapper(*args: Any, **kwargs: Any) -> Any:
            tracer = _get_tracer()
            parent_ctx = _get_parent_context()
            with tracer.start_as_current_span(span_name, context=parent_ctx) as span:
                span.set_attribute("openinference.span.kind", "GUARDRAIL")
                span.set_attribute("guardrail.name", logical_name)
                try:
                    result = fn(*args, **kwargs)
                    span.set_attribute("guardrail.result", "pass")
                    if isinstance(result, float):
                        span.set_attribute("guardrail.score", result)
                    span.set_status(Status(StatusCode.OK))
                    return result
                except Exception:
                    span.set_attribute("guardrail.result", "fail")
                    raise

        return wrapper  # type: ignore[return-value]

    return decorator

reranker_span

reranker_span(
    name: str | None = None,
    model_name: str | None = None,
    query_arg: str = "query",
    docs_arg: str = "documents",
    top_k: int | None = None,
) -> Callable[[F], F]

Decorator that wraps a reranker function with an OpenInference RERANKER span.

Supports both sync and async functions. The decorated function must accept a list[Document] as input and return list[Document] as output. The span captures: - openinference.span.kind = RERANKER - reranker.model_name — if provided - reranker.query — from query_arg kwarg or first str positional - reranker.top_k — if provided - reranker.input_documents — JSON-encoded input docs - reranker.output_documents — JSON-encoded output docs

Parameters:

Name Type Description Default
name str | None

Span name. Defaults to the function name.

None
model_name str | None

Reranker model name to record.

None
query_arg str

Name of the kwarg containing the query string.

'query'
docs_arg str

Name of the kwarg containing the input documents list.

'documents'
top_k int | None

Number of top documents to return (recorded as attribute).

None
Source code in coalex/ext/reranker.py
def reranker_span(
    name: str | None = None,
    model_name: str | None = None,
    query_arg: str = "query",
    docs_arg: str = "documents",
    top_k: int | None = None,
) -> Callable[[F], F]:
    """Decorator that wraps a reranker function with an OpenInference RERANKER span.

    Supports both sync and async functions. The decorated function must accept a
    ``list[Document]`` as input and return ``list[Document]`` as output. The span captures:
    - ``openinference.span.kind`` = ``RERANKER``
    - ``reranker.model_name`` — if provided
    - ``reranker.query`` — from ``query_arg`` kwarg or first ``str`` positional
    - ``reranker.top_k`` — if provided
    - ``reranker.input_documents`` — JSON-encoded input docs
    - ``reranker.output_documents`` — JSON-encoded output docs

    Args:
        name: Span name. Defaults to the function name.
        model_name: Reranker model name to record.
        query_arg: Name of the kwarg containing the query string.
        docs_arg: Name of the kwarg containing the input documents list.
        top_k: Number of top documents to return (recorded as attribute).
    """

    def _set_span_attrs(span: Any, args: tuple[Any, ...], kwargs: dict[str, Any]) -> None:
        span.set_attribute("openinference.span.kind", "RERANKER")
        if model_name:
            span.set_attribute("reranker.model_name", model_name)
        if top_k is not None:
            span.set_attribute("reranker.top_k", top_k)
        q = kwargs.get(query_arg) or next((a for a in args if isinstance(a, str)), None)
        if q:
            span.set_attribute("reranker.query", str(q))
        input_docs = kwargs.get(docs_arg) or next((a for a in args if isinstance(a, list)), None)
        if input_docs and input_docs and isinstance(input_docs[0], Document):
            span.set_attribute("reranker.input_documents", encode_documents(input_docs))

    def decorator(fn: F) -> F:
        span_name = name or fn.__name__

        if asyncio.iscoroutinefunction(fn):

            @functools.wraps(fn)
            async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
                tracer = _get_tracer()
                parent_ctx = _get_parent_context()
                with tracer.start_as_current_span(span_name, context=parent_ctx) as span:
                    _set_span_attrs(span, args, kwargs)
                    result = await fn(*args, **kwargs)
                    if isinstance(result, list) and result and isinstance(result[0], Document):
                        span.set_attribute("reranker.output_documents", encode_documents(result))
                    return result

            return async_wrapper  # type: ignore[return-value]

        @functools.wraps(fn)
        def wrapper(*args: Any, **kwargs: Any) -> Any:
            tracer = _get_tracer()
            parent_ctx = _get_parent_context()
            with tracer.start_as_current_span(span_name, context=parent_ctx) as span:
                _set_span_attrs(span, args, kwargs)
                result = fn(*args, **kwargs)
                if isinstance(result, list) and result and isinstance(result[0], Document):
                    span.set_attribute("reranker.output_documents", encode_documents(result))
                return result

        return wrapper  # type: ignore[return-value]

    return decorator

encode_documents

encode_documents(docs: list[Document]) -> str

Serialize documents to OpenInference retrieval.documents JSON string.

Source code in coalex/ext/retrieval.py
def encode_documents(docs: list[Document]) -> str:
    """Serialize documents to OpenInference retrieval.documents JSON string."""
    return json.dumps([d.to_openinference_dict() for d in docs])

retrieval_span

retrieval_span(
    name: str | None = None, query_arg: str = "query"
) -> Callable[[F], F]

Decorator that wraps a retrieval function with an OpenInference RETRIEVER span.

Supports both sync and async functions. The decorated function must return list[Document]. The span captures: - openinference.span.kind = RETRIEVER - input.value — query string (from query_arg kwarg or first str positional) - retrieval.documents — JSON-encoded list of documents

Parameters:

Name Type Description Default
name str | None

Span name. Defaults to the function name.

None
query_arg str

Name of the kwarg containing the query string.

'query'
Source code in coalex/ext/retrieval.py
def retrieval_span(name: str | None = None, query_arg: str = "query") -> Callable[[F], F]:
    """Decorator that wraps a retrieval function with an OpenInference RETRIEVER span.

    Supports both sync and async functions. The decorated function must return
    ``list[Document]``. The span captures:
    - ``openinference.span.kind`` = ``RETRIEVER``
    - ``input.value`` — query string (from ``query_arg`` kwarg or first ``str`` positional)
    - ``retrieval.documents`` — JSON-encoded list of documents

    Args:
        name: Span name. Defaults to the function name.
        query_arg: Name of the kwarg containing the query string.
    """

    def decorator(fn: F) -> F:
        span_name = name or fn.__name__

        if asyncio.iscoroutinefunction(fn):

            @functools.wraps(fn)
            async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
                tracer = _get_tracer()
                parent_ctx = _get_parent_context()
                with tracer.start_as_current_span(span_name, context=parent_ctx) as span:
                    span.set_attribute("openinference.span.kind", "RETRIEVER")
                    q = kwargs.get(query_arg) or next((a for a in args if isinstance(a, str)), None)
                    if q:
                        span.set_attribute("input.value", str(q))
                    result = await fn(*args, **kwargs)
                    if isinstance(result, list) and result and isinstance(result[0], Document):
                        span.set_attribute("retrieval.documents", encode_documents(result))
                        span.set_attribute("output.value", _document_summary(result))
                    span.set_status(Status(StatusCode.OK))
                    return result

            return async_wrapper  # type: ignore[return-value]

        @functools.wraps(fn)
        def wrapper(*args: Any, **kwargs: Any) -> Any:
            tracer = _get_tracer()
            parent_ctx = _get_parent_context()
            with tracer.start_as_current_span(span_name, context=parent_ctx) as span:
                span.set_attribute("openinference.span.kind", "RETRIEVER")
                q = kwargs.get(query_arg) or next((a for a in args if isinstance(a, str)), None)
                if q:
                    span.set_attribute("input.value", str(q))
                result = fn(*args, **kwargs)
                if isinstance(result, list) and result and isinstance(result[0], Document):
                    span.set_attribute("retrieval.documents", encode_documents(result))
                    span.set_attribute("output.value", _document_summary(result))
                span.set_status(Status(StatusCode.OK))
                return result

        return wrapper  # type: ignore[return-value]

    return decorator

tool_span

tool_span(
    name: str | None = None, description: str | None = None
) -> Callable[[F], F]

Decorator that wraps a tool function with an OpenInference TOOL span.

Supports both sync and async functions. The span captures: - openinference.span.kind = TOOL - tool.name — span name - tool.description — if provided - tool.parameters — JSON-encoded kwargs passed to the function

Parameters:

Name Type Description Default
name str | None

Tool name and span name. Defaults to the function name.

None
description str | None

Human-readable tool description.

None
Source code in coalex/ext/tool.py
def tool_span(
    name: str | None = None,
    description: str | None = None,
) -> Callable[[F], F]:
    """Decorator that wraps a tool function with an OpenInference TOOL span.

    Supports both sync and async functions. The span captures:
    - ``openinference.span.kind`` = ``TOOL``
    - ``tool.name`` — span name
    - ``tool.description`` — if provided
    - ``tool.parameters`` — JSON-encoded kwargs passed to the function

    Args:
        name: Tool name and span name. Defaults to the function name.
        description: Human-readable tool description.
    """

    def decorator(fn: F) -> F:
        span_name = name or fn.__name__

        if asyncio.iscoroutinefunction(fn):

            @functools.wraps(fn)
            async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
                tracer = _get_tracer()
                parent_ctx = _get_parent_context()
                with tracer.start_as_current_span(span_name, context=parent_ctx) as span:
                    span.set_attribute("openinference.span.kind", "TOOL")
                    span.set_attribute("tool.name", span_name)
                    if description:
                        span.set_attribute("tool.description", description)
                    if kwargs:
                        with contextlib.suppress(TypeError, ValueError):
                            span.set_attribute("tool.parameters", json.dumps(kwargs))
                    return await fn(*args, **kwargs)

            return async_wrapper  # type: ignore[return-value]

        @functools.wraps(fn)
        def wrapper(*args: Any, **kwargs: Any) -> Any:
            tracer = _get_tracer()
            parent_ctx = _get_parent_context()
            with tracer.start_as_current_span(span_name, context=parent_ctx) as span:
                span.set_attribute("openinference.span.kind", "TOOL")
                span.set_attribute("tool.name", span_name)
                if description:
                    span.set_attribute("tool.description", description)
                if kwargs:
                    with contextlib.suppress(TypeError, ValueError):
                        span.set_attribute("tool.parameters", json.dumps(kwargs))
                return fn(*args, **kwargs)

        return wrapper  # type: ignore[return-value]

    return decorator