Fixing Async Timing In LLM Observability
· Technincal · 12 min read
Most LLM Observability tools get timing all wrong, lets fix that.
In the LLM landscape most inference providers have started to standardize on the OpenAI api specification. I personally think this is great. It makes it easy to adapt existing code to swap between inference providers and as long as we stick to the more stable apis things will more or less work just the same. However, there is unfortunate feature (or lack there of) in the Open AI SDK.
Take a look at the following code:
def example_request() -> float:
start_time = datetime.now()
response = openai_client.chat.completions.create(
messages=[
{"role": "system", "content": "You are a friendly assistant!"},
{"role": "user", "content": "Say Hello!"}
],
model="latest-and-greatest-model-string-here",
)
end_time = datetime.now()
elapsed_time = (end_time - start_time).total_seconds()
return elapsed_time
example_request()
Is there anything wrong with the above code for capturing the elapsed time of the OpenAI call - other than the fact that we are capturing relative time differences and not using time.perf_counter()
?
Well like most things it depends, if you want to approximately capture the wall clock time of how long it takes to get your response back from OpenAI, then I would say its probably sufficient. Considering that LLM calls are pretty much always > 1 second we aren’t so concerned with the time it takes to construct that list and the two dictionaries within the messages.
However what if instead of using this code in a syncronous context we actually wanted to make a bunch of these calls asyncronously like so:
async def example_request() -> float:
start_time = datetime.now()
response = openai_client.chat.completions.create(
messages=[
{"role": "system", "content": "You are a friendly assistant!"},
{"role": "user", "content": "Say Hello!"}
],
model="latest-and-greatest-model-string-here",
)
end_time = datetime.now()
elapsed_time = (end_time - start_time).total_seconds()
return elapsed_time
await asyncio.gather(*(example_request() for _ in range(100)))
Would you still agree that the timing there is valid?
Well I mean I would, but only because the code is not efficient. Even though we marked our function as async
we actually don’t have an await
anywhere within the function that yields back to the event loop. So really we are doing something pretty similar to a for loop with a sprinkle unnecessary magic.
We are going to need some basic understanding of how the event loop works, and honestly I am no expert, but lets walk through the following example to give you a working mental model.
The async event loop is just a scheduler, it doesn’t run tasks all at once, it just decides who gets to run next. Every time an async function hits an await, it yields control back to the event loop, saying, “hey I’m waiting on something, feel free to run someone else for now.” The event loop then scans through its list of tasks and resumes another one that’s ready. It keeps doing this in a loop one task at a time bouncing between them as they give up control.
So what’s actually happening here?
In the timeline above, you can literally see that in action. Task 1 starts running, hits an await, and the event loop switches to Task 2. That one yields, so Task 1 gets resumed, then we kick off Task 3, and so on. The yellow sections are where tasks are suspended they’ve yielded but aren’t done — and gray means they haven’t even started yet. That’s the real magic of async: cooperative concurrency where tasks play nice with each other by yielding explicitly.
A simple example that could have possibly led to this sort of even loop behavior is constructed here.
import asyncio
import httpx
import time
from datetime import datetime
async def task_1():
"""Simulates a task that runs, waits for I/O, then runs again"""
print(f"Task 1 started at {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
# Initial work (running - green in diagram)
await asyncio.sleep(0.1) # Simulate some computation
# Long I/O operation (suspended - yellow in diagram)
print(f"Task 1 making API call at {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
async with httpx.AsyncClient() as client:
response = await client.get('https://httpbin.org/delay/2')
data = response.json()
# Final work (running - green in diagram)
print(f"Task 1 resumed at {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
await asyncio.sleep(0.1)
print(f"Task 1 completed at {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
async def task_2():
"""Simulates a task that waits, runs, gets suspended, then runs"""
# Initial wait (waiting - gray in diagram)
await asyncio.sleep(0.5)
print(f"Task 2 started work at {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
# Some work (running - green)
await asyncio.sleep(0.2)
# Shorter I/O operation (suspended - yellow)
print(f"Task 2 making API call at {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
async with httpx.AsyncClient() as client:
response = await client.get('https://httpbin.org/delay/1')
data = response.json()
# Final work (running - green)
print(f"Task 2 resumed at {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
await asyncio.sleep(0.1)
print(f"Task 2 completed at {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
async def task_3():
"""Simulates a task that waits longer, runs, then has a long suspension"""
# Longer initial wait (waiting - gray in diagram)
await asyncio.sleep(1.0)
print(f"Task 3 started work at {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
# Some work (running - green)
await asyncio.sleep(0.3)
# Very long I/O operation (long suspended - yellow)
print(f"Task 3 making long API call at {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
async with httpx.AsyncClient() as client:
response = await client.get('https://httpbin.org/delay/3')
data = response.json()
# Final work (running - green)
print(f"Task 3 resumed at {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
await asyncio.sleep(0.1)
print(f"Task 3 completed at {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
async def main():
"""Run all tasks concurrently"""
print(f"Event loop started at {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
# Start all tasks concurrently
await asyncio.gather(
task_1(),
task_2(),
task_3()
)
print(f"Event loop finished at {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
if __name__ == "__main__":
asyncio.run(main())
It’s kind of like multi-threading, but a bit more lightweight as the scheduling stays fully in user space rather than letting the OS handle the scheduling in kernel space. If that sounds like gibberish to you, don’t worry about it.
Ok so now imagine we update our code example to actually yield after we make our network request to OpenAI by using the AsyncOpenAI
client and a corresponding await
on the create completions call. This now means after we fire off the network request we are going to yield back to the event loop.
async def example_request() -> float:
start_time = datetime.now()
response = await openai_client.chat.completions.create(
messages=[
{"role": "system", "content": "You are a friendly assistant!"},
{"role": "user", "content": "Say Hello!"}
],
model="latest-and-greatest-model-string-here",
)
end_time = datetime.now()
elapsed_time = (end_time - start_time).total_seconds()
return elapsed_time
await asyncio.gather(*(example_request() for _ in range(100)))
Now let’s consider our mental model, what if after executing the first OpenAI call we start the next one. While the next one is being started the original one finishes. When the second task yields back to the event loop the event loop has two options: does it start executing task 3 or does it go back to finish off task 1? Are you still feeling confident in our timing calculation?
Okay okay, you might be saying “but we will eventually get back to taking down end_time
and these calls are firing off pretty quickly, how much error is there really in our timing calculation?“. That’s true, but now for the last piece to really show off why this doesn’t work. What if we had some long blocking code at the end of the task?
async def example_request() -> float:
start_time = datetime.now()
response = await openai_client.chat.completions.create(
messages=[
{"role": "system", "content": "You are a friendly assistant!"},
{"role": "user", "content": "Say Hello!"}
],
model="latest-and-greatest-model-string-here",
)
end_time = datetime.now()
elapsed_time = (end_time - start_time).total_seconds()
"""
This is meant to simulate blocking code this could be
anything that doesn't yield back to the event loop
(this is why asyncio.sleep exists, but that wouldn't prove the point here)
"""
time.sleep(0.5)
return elapsed_time
await asyncio.gather(*(example_request() for _ in range(100)))
Now in the last scenario I described we can in theory be off from calculating our end time by a substantial amount. Hopefully I have described the problem well enough so that you understand it.
Now most of the LLM observability tools that I am familiar with do some variant of the above to time the LLM calls. Now the question to ask yourself is do I care?
Maybe you really only care to monitor the wall clock time experienced by your user and this sort of minutia is meaningless in which case you can stop here.
However in my case, knowing the exact time it took for a request to go out to our LLM Inference Provider and come back was important. At least from my code’s perspective it should be invariant to the amount of work being done concurrently; maybe it matters on the inference providers side, but this is actually what I wanted to understand.
So I am not just going to leave you here, I will provide you a solution albeit not one I love, but it does work and was sufficent to fix this problem for me.
OpenAI, Anthropic and a few other inference providers use httpx as it’s underlying library for creating requests to the api. According to their api the elapsed
parameter in the response
would allow us to retrieve the true time that the request took from start to finish.
Now my first thought wasn’t to spin this wheel myself, we actually reached out to OpenAI to see if they could offer any solution. The crux of their response was that if we use the with_raw_response
then we would be able to access this elapsed
parameter that would give us this true time. Now this does work for OpenAI, but the first is that it changes the behavior of the api a bit & I wanted to continue to see if I could come up with a solution that would work across more providers; especially since at the time I was trying to consider how to get this feature into litellm.
So ideally we would have had a more generic solution, but where I landed did have two contraints (while I’m sure more tricks can be done to make this work for clients that don’t meet these constraints).
- The client must use httpx as it’s underlying request library
- The client must allow us to pass in extra headers into the request
With that in mind let’s get into the solution. The first part is to wrap the httpx client and override the low level send
function definition. In here we will look at the request to retrieve an extra header here we use call_id
but it can be anything really since we control both sides of this interaction.
from openai import DefaultAsyncHttpxClient
"""
OpenAI changes some properties of the httpx client around timeouts and other things, so use their changes implicitly
by inheriting from their client and not the standard httpx client
"""
class WrappedHttpxAsync(DefaultAsyncHttpxClient):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__response_metadata = {} # This could be a TTL or max size dictionary to prevent any leaks
self.pop_response_metadata = self.__response_metadata.pop # We bind this method to give external access to retrieve data
async def send(self, request: Request, **kwargs):
response = await super().send(request, **kwargs)
if call_id := request.headers.get("call-id", None):
self.__response_metadata[call_id] = {"elapsed_seconds": response.elapsed.total_seconds()}
return response
Then when we create our OpenAI client we have to ensure we have created an instance of this httpx client and pass it into it’s respective constructor.
httpx_client = WrappedHttpxAsync()
AsyncOpenAI(http_client=httpx_client)
Now the last two parts to this trick are simple, when we make requests we need to pass in an extra header so that we can correlate the request to it’s corresponding response_metadata
.
from uuid import uuid4
call_id = str(uuid4()).lower()
response = await openai_client.chat.completions.create(
messages=[
{"role": "system", "content": "You are a friendly assistant!"},
{"role": "user", "content": "Say Hello!"}
],
model="latest-and-greatest-model-string-here",
extra_headers={"call-id": call_id},
)
Now let’s tie it all together and test the difference in the captured time it takes for the requests to succeed.
from datetime import datetime
from uuid import uuid4
import numpy as np
from openai import AsyncOpenAI, DefaultAsyncHttpxClient
from httpx import Request
from tqdm.auto import tqdm
import asyncio
from time import sleep
from typing import Tuple
"""
OpenAI changes some properties of the httpx client around timeouts and other things, so use their changes implicitly
by inheriting from their client and not the standard httpx client
"""
class WrappedHttpxAsync(DefaultAsyncHttpxClient):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__response_metadata = {}
self.pop_response_metadata = self.__response_metadata.pop
async def send(self, request: Request, **kwargs):
response = await super().send(request, **kwargs)
if call_id := request.headers.get("call-id", None):
self.__response_metadata[call_id] = {"elapsed_seconds": response.elapsed.total_seconds()}
return response
async def call_openai(progress: tqdm, httpx_client: WrappedHttpxAsync, openai_client: AsyncOpenAI) -> Tuple[float, float]:
call_id = str(uuid4()).lower()
start_time = datetime.now()
response = await openai_client.chat.completions.create(
messages=[
{"role": "system", "content": "You are a friendly assistant!"},
{"role": "user", "content": "Say Hello!"}
],
model="latest-and-greatest-model-string-here",
extra_headers={"call-id": call_id},
)
end_time = datetime.now()
sleep(0.5) # This is a blocking call, meant to represent computation that does not yield control back to the event loop
progress.update(1)
return httpx_client.pop_response_metadata(call_id)["elapsed_seconds"], (end_time - start_time).total_seconds()
async def main():
total_requests = 300
progress = tqdm(desc="Processing Requests", total=total_requests)
async with (WrappedHttpxAsync() as httpx_client,
AsyncOpenAI(http_client=httpx_client) as openai_client):
response_times = await asyncio.gather(*(call_openai(progress, httpx_client, openai_client) for _ in range(total_requests)))
diffs = np.array([approximate_request_latency - true_request_latency for true_request_latency, approximate_request_latency in response_times])
print(f"Mean: {diffs.mean()}, Median: {np.median(diffs)}, Std: {diffs.std()}")
print(f"95th percentile: {np.percentile(diffs, 95)} ")
print(f"99th percentile: {np.percentile(diffs, 99)} ")
print(f"Max: {diffs.max()}, Min: {diffs.min()}")
if __name__ == "__main__":
asyncio.run(main())
"""
Example output of the above:
Processing Requests: 100%
Difference in seconds between outer elapsed calculation and using httpx.Response elapsed property
Mean: 46.60197113666666, Median: 40.34159100000001, Std: 36.355706944280044
95th percentile: 108.8644228
99th percentile: 114.96621911999998
Max: 116.49648100000002, Min: 0.08809099999999859
"""
In our minimal example we were able to show that the average difference in the naive method for retreiving the response time for the LLM call in an async context is 46 seconds with the worst of the offenders have a difference of almost 2 minutes.
The differences are exacerbated by a few things but the main ones are:
- The number of concurrent requests processed
- The amount of blocking code after a request has been received before the next await (next time the event loop has a chance to choose a different task).
I’d be interested to learn of other solutions in this space. I’m certain there is a way to avoid the clunkiness of having to pass around the hybrid of an httpx client as well as the inference provider client. I don’t love the need to maintain a dictionary, nor’ the need to keep track of this randomly generated string for the request. I have not tested, but I also believe my solution would not work with streaming responses. Ideally somehow we could bind the elapsed parameter to the response itself, but I find this approach unlikely without monkeypatching.
Though imperfect, this approach gets us real timing data where it matters most. I hope this sparks discussion about better patterns for async observability in the LLM ecosystem.