Parser Module¶
The parser module provides streaming-aware data types and parsing capabilities for processing structured LLM outputs in real-time.
Core Types¶
langdiff.Object
¶
Bases: StreamingValue[dict]
Represents a JSON object that is streamed.
The keys of the object are determined from the type annotations of the class. It is assumed that the keys will be received in the order they are defined. When a new key is encountered in the stream, the previous key's value is considered complete.
on_update
¶
Register a callback that is called whenever the object is updated.
langdiff.List
¶
Bases: Generic[T], StreamingValue[list]
Represents a JSON array that is streamed.
This class can handle a list of items that are themselves StreamingValues
(like langdiff.Object or langdiff.String) or complete values. It provides
an on_append callback that is fired when a new item is added to the list.
langdiff.String
¶
Bases: StreamingValue[str | None]
Represents a string that is streamed incrementally.
This class assumes that the string value is built up by concatenating chunks.
It provides an on_append callback that is fired with each new chunk of the
string.
langdiff.Atom
¶
Bases: Generic[T], StreamingValue[T]
Represents a value that is not streamed incrementally but received whole.
This is useful for types like numbers, booleans, or even entire objects/lists
that are not streamed part-by-part but are present completely once available.
The on_complete callback is triggered when the parent langdiff.Object or
langdiff.List determines that this value is complete.
value: T | None
property
¶
Returns the complete value.
Parser¶
Field Configuration¶
Usage Examples¶
Basic Object Streaming¶
import langdiff as ld
class BlogPost(ld.Object):
title: ld.String
content: ld.String
tags: ld.List[ld.String]
post = BlogPost()
# Set up event handlers
@post.title.on_append
def on_title_chunk(chunk: str):
print(f"Title chunk: {chunk}")
@post.tags.on_append
def on_tag_append(tag: ld.String, index: int):
@tag.on_complete
def on_tag_complete(final_tag: str):
print(f"New tag: {final_tag}")
# Parse streaming JSON
with ld.Parser(post) as parser:
for token in json_stream:
parser.push(token)
Nested Structures¶
class Comment(ld.Object):
author: ld.String
text: ld.String
class Article(ld.Object):
title: ld.String
comments: ld.List[Comment]
article = Article()
@article.comments.on_append
def on_comment_append(comment: Comment, index: int):
@comment.author.on_complete
def on_author_complete(author: str):
print(f"Comment {index} by {author}")
@comment.text.on_append
def on_text_chunk(chunk: str):
print(f"Comment {index} text: {chunk}")
OpenAI Integration¶
import openai
# Convert to Pydantic for OpenAI SDK
response_format = BlogPost.to_pydantic()
client = openai.OpenAI()
with client.chat.completions.stream(
model="gpt-5-mini",
messages=[{"role": "user", "content": "Write a blog post"}],
response_format=response_format,
) as stream:
post = BlogPost()
with ld.Parser(post) as parser:
for event in stream:
if event.type == "content.delta":
parser.push(event.delta)
Event System¶
All streaming types support common events, with type-specific additional events:
on_start()¶
Called when streaming begins for a value:
on_append()¶
Called as new data is appended (supported by String and List types):
@response.content.on_append # String type
def on_content_chunk(chunk: str):
print(f"New content: {chunk}")
@response.items.on_append # List type
def on_item_append(item: ld.String, index: int):
print(f"New item at index {index}")
on_update()¶
Called when an object is updated (supported by Object type only):
@response.on_update # Object type
def on_object_update(data: dict):
print(f"Object updated: {data}")
on_complete()¶
Called when a value is fully received: