Fixing Async Timing In LLM Observability

· Technincal  · 12 min read

Most LLM Observability tools get timing all wrong, lets fix that.

Most LLM Observability tools get timing all wrong, lets fix that.

In the LLM landscape most inference providers have started to standardize on the OpenAI api specification. I personally think this is great. It makes it easy to adapt existing code to swap between inference providers and as long as we stick to the more stable apis things will more or less work just the same. However, there is unfortunate feature (or lack there of) in the Open AI SDK.

Take a look at the following code:

def example_request() -> float:
    start_time = datetime.now()
    response = openai_client.chat.completions.create(
        messages=[
            {"role": "system", "content": "You are a friendly assistant!"}, 
            {"role": "user", "content": "Say Hello!"}
        ],
        model="latest-and-greatest-model-string-here",
    )
    end_time = datetime.now()
    elapsed_time = (end_time - start_time).total_seconds()
    return elapsed_time

example_request()

Is there anything wrong with the above code for capturing the elapsed time of the OpenAI call - other than the fact that we are capturing relative time differences and not using time.perf_counter()?

Well like most things it depends, if you want to approximately capture the wall clock time of how long it takes to get your response back from OpenAI, then I would say its probably sufficient. Considering that LLM calls are pretty much always > 1 second we aren’t so concerned with the time it takes to construct that list and the two dictionaries within the messages.

However what if instead of using this code in a syncronous context we actually wanted to make a bunch of these calls asyncronously like so:

async def example_request() -> float:
    start_time = datetime.now()
    response = openai_client.chat.completions.create(
        messages=[
            {"role": "system", "content": "You are a friendly assistant!"}, 
            {"role": "user", "content": "Say Hello!"}
        ],
        model="latest-and-greatest-model-string-here",
    )
    end_time = datetime.now()
    elapsed_time = (end_time - start_time).total_seconds()
    return elapsed_time

await asyncio.gather(*(example_request() for _ in range(100)))

Would you still agree that the timing there is valid?

Well I mean I would, but only because the code is not efficient. Even though we marked our function as async we actually don’t have an await anywhere within the function that yields back to the event loop. So really we are doing something pretty similar to a for loop with a sprinkle unnecessary magic.

We are going to need some basic understanding of how the event loop works, and honestly I am no expert, but lets walk through the following example to give you a working mental model.

The async event loop is just a scheduler, it doesn’t run tasks all at once, it just decides who gets to run next. Every time an async function hits an await, it yields control back to the event loop, saying, “hey I’m waiting on something, feel free to run someone else for now.” The event loop then scans through its list of tasks and resumes another one that’s ready. It keeps doing this in a loop one task at a time bouncing between them as they give up control.

Async Event Loop Timeline

So what’s actually happening here?

In the timeline above, you can literally see that in action. Task 1 starts running, hits an await, and the event loop switches to Task 2. That one yields, so Task 1 gets resumed, then we kick off Task 3, and so on. The yellow sections are where tasks are suspended they’ve yielded but aren’t done — and gray means they haven’t even started yet. That’s the real magic of async: cooperative concurrency where tasks play nice with each other by yielding explicitly.

A simple example that could have possibly led to this sort of even loop behavior is constructed here.

import asyncio
import httpx
import time
from datetime import datetime

async def task_1():
    """Simulates a task that runs, waits for I/O, then runs again"""
    print(f"Task 1 started at {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
    
    # Initial work (running - green in diagram)
    await asyncio.sleep(0.1)  # Simulate some computation
    
    # Long I/O operation (suspended - yellow in diagram)
    print(f"Task 1 making API call at {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
    async with httpx.AsyncClient() as client:
        response = await client.get('https://httpbin.org/delay/2')
        data = response.json()
    
    # Final work (running - green in diagram)
    print(f"Task 1 resumed at {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
    await asyncio.sleep(0.1)
    print(f"Task 1 completed at {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")

async def task_2():
    """Simulates a task that waits, runs, gets suspended, then runs"""
    # Initial wait (waiting - gray in diagram)
    await asyncio.sleep(0.5)
    
    print(f"Task 2 started work at {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
    
    # Some work (running - green)
    await asyncio.sleep(0.2)
    
    # Shorter I/O operation (suspended - yellow)
    print(f"Task 2 making API call at {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
    async with httpx.AsyncClient() as client:
        response = await client.get('https://httpbin.org/delay/1')
        data = response.json()
    
    # Final work (running - green)
    print(f"Task 2 resumed at {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
    await asyncio.sleep(0.1)
    print(f"Task 2 completed at {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")

async def task_3():
    """Simulates a task that waits longer, runs, then has a long suspension"""
    # Longer initial wait (waiting - gray in diagram)
    await asyncio.sleep(1.0)
    
    print(f"Task 3 started work at {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
    
    # Some work (running - green)
    await asyncio.sleep(0.3)
    
    # Very long I/O operation (long suspended - yellow)
    print(f"Task 3 making long API call at {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
    async with httpx.AsyncClient() as client:
        response = await client.get('https://httpbin.org/delay/3')
        data = response.json()
    
    # Final work (running - green)
    print(f"Task 3 resumed at {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
    await asyncio.sleep(0.1)
    print(f"Task 3 completed at {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")

async def main():
    """Run all tasks concurrently"""
    print(f"Event loop started at {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
    
    # Start all tasks concurrently
    await asyncio.gather(
        task_1(),
        task_2(), 
        task_3()
    )
    
    print(f"Event loop finished at {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")

if __name__ == "__main__":
    asyncio.run(main())

It’s kind of like multi-threading, but a bit more lightweight as the scheduling stays fully in user space rather than letting the OS handle the scheduling in kernel space. If that sounds like gibberish to you, don’t worry about it.

Ok so now imagine we update our code example to actually yield after we make our network request to OpenAI by using the AsyncOpenAI client and a corresponding await on the create completions call. This now means after we fire off the network request we are going to yield back to the event loop.

async def example_request() -> float:
    start_time = datetime.now()
    response = await openai_client.chat.completions.create(
        messages=[
            {"role": "system", "content": "You are a friendly assistant!"}, 
            {"role": "user", "content": "Say Hello!"}
        ],
        model="latest-and-greatest-model-string-here",
    )
    end_time = datetime.now()
    elapsed_time = (end_time - start_time).total_seconds()
    return elapsed_time

await asyncio.gather(*(example_request() for _ in range(100)))

Now let’s consider our mental model, what if after executing the first OpenAI call we start the next one. While the next one is being started the original one finishes. When the second task yields back to the event loop the event loop has two options: does it start executing task 3 or does it go back to finish off task 1? Are you still feeling confident in our timing calculation?

Okay okay, you might be saying “but we will eventually get back to taking down end_time and these calls are firing off pretty quickly, how much error is there really in our timing calculation?“. That’s true, but now for the last piece to really show off why this doesn’t work. What if we had some long blocking code at the end of the task?

async def example_request() -> float:
    start_time = datetime.now()
    response = await openai_client.chat.completions.create(
        messages=[
            {"role": "system", "content": "You are a friendly assistant!"}, 
            {"role": "user", "content": "Say Hello!"}
        ],
        model="latest-and-greatest-model-string-here",
    )
    end_time = datetime.now()
    elapsed_time = (end_time - start_time).total_seconds()
    """
    This is meant to simulate blocking code this could be 
    anything that doesn't yield back to the event loop 
    (this is why asyncio.sleep exists, but that wouldn't prove the point here)
    """
    time.sleep(0.5) 
    return elapsed_time

await asyncio.gather(*(example_request() for _ in range(100)))

Now in the last scenario I described we can in theory be off from calculating our end time by a substantial amount. Hopefully I have described the problem well enough so that you understand it.

Now most of the LLM observability tools that I am familiar with do some variant of the above to time the LLM calls. Now the question to ask yourself is do I care?

Maybe you really only care to monitor the wall clock time experienced by your user and this sort of minutia is meaningless in which case you can stop here.

However in my case, knowing the exact time it took for a request to go out to our LLM Inference Provider and come back was important. At least from my code’s perspective it should be invariant to the amount of work being done concurrently; maybe it matters on the inference providers side, but this is actually what I wanted to understand.

So I am not just going to leave you here, I will provide you a solution albeit not one I love, but it does work and was sufficent to fix this problem for me.

OpenAI, Anthropic and a few other inference providers use httpx as it’s underlying library for creating requests to the api. According to their api the elapsed parameter in the response would allow us to retrieve the true time that the request took from start to finish.

Now my first thought wasn’t to spin this wheel myself, we actually reached out to OpenAI to see if they could offer any solution. The crux of their response was that if we use the with_raw_response then we would be able to access this elapsed parameter that would give us this true time. Now this does work for OpenAI, but the first is that it changes the behavior of the api a bit & I wanted to continue to see if I could come up with a solution that would work across more providers; especially since at the time I was trying to consider how to get this feature into litellm.

So ideally we would have had a more generic solution, but where I landed did have two contraints (while I’m sure more tricks can be done to make this work for clients that don’t meet these constraints).

  • The client must use httpx as it’s underlying request library
  • The client must allow us to pass in extra headers into the request

With that in mind let’s get into the solution. The first part is to wrap the httpx client and override the low level send function definition. In here we will look at the request to retrieve an extra header here we use call_id but it can be anything really since we control both sides of this interaction.

from openai import DefaultAsyncHttpxClient

"""
OpenAI changes some properties of the httpx client around timeouts and other things, so use their changes implicitly 
by inheriting from their client and not the standard httpx client
"""
class WrappedHttpxAsync(DefaultAsyncHttpxClient): 

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.__response_metadata = {} # This could be a TTL or max size dictionary to prevent any leaks
        self.pop_response_metadata = self.__response_metadata.pop # We bind this method to give external access to retrieve data

    async def send(self, request: Request, **kwargs):
        response = await super().send(request, **kwargs)
        if call_id := request.headers.get("call-id", None):
            self.__response_metadata[call_id] = {"elapsed_seconds": response.elapsed.total_seconds()}
        return response

Then when we create our OpenAI client we have to ensure we have created an instance of this httpx client and pass it into it’s respective constructor.

httpx_client = WrappedHttpxAsync()
AsyncOpenAI(http_client=httpx_client)

Now the last two parts to this trick are simple, when we make requests we need to pass in an extra header so that we can correlate the request to it’s corresponding response_metadata.

from uuid import uuid4

call_id = str(uuid4()).lower()
response = await openai_client.chat.completions.create(
    messages=[
        {"role": "system", "content": "You are a friendly assistant!"}, 
        {"role": "user", "content": "Say Hello!"}
    ],
    model="latest-and-greatest-model-string-here",
    extra_headers={"call-id": call_id},
)

Now let’s tie it all together and test the difference in the captured time it takes for the requests to succeed.

from datetime import datetime
from uuid import uuid4
import numpy as np
from openai import AsyncOpenAI, DefaultAsyncHttpxClient
from httpx import Request
from tqdm.auto import tqdm

import asyncio
from time import sleep
from typing import Tuple


"""
OpenAI changes some properties of the httpx client around timeouts and other things, so use their changes implicitly 
by inheriting from their client and not the standard httpx client
"""
class WrappedHttpxAsync(DefaultAsyncHttpxClient): 

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.__response_metadata = {}
        self.pop_response_metadata = self.__response_metadata.pop

    async def send(self, request: Request, **kwargs):
        response = await super().send(request, **kwargs)
        if call_id := request.headers.get("call-id", None):
            self.__response_metadata[call_id] = {"elapsed_seconds": response.elapsed.total_seconds()}
        return response


async def call_openai(progress: tqdm, httpx_client: WrappedHttpxAsync, openai_client: AsyncOpenAI) -> Tuple[float, float]:
    call_id = str(uuid4()).lower()
    start_time = datetime.now()
    response = await openai_client.chat.completions.create(
        messages=[
            {"role": "system", "content": "You are a friendly assistant!"}, 
            {"role": "user", "content": "Say Hello!"}
        ],
        model="latest-and-greatest-model-string-here",
        extra_headers={"call-id": call_id},
    )
    end_time = datetime.now()
    sleep(0.5) # This is a blocking call, meant to represent computation that does not yield control back to the event loop
    progress.update(1)
    return httpx_client.pop_response_metadata(call_id)["elapsed_seconds"], (end_time - start_time).total_seconds()
        

async def main():
    total_requests = 300
    progress = tqdm(desc="Processing Requests", total=total_requests)
    async with (WrappedHttpxAsync() as httpx_client, 
            AsyncOpenAI(http_client=httpx_client) as openai_client):
        response_times = await asyncio.gather(*(call_openai(progress, httpx_client, openai_client) for _ in range(total_requests)))
    diffs = np.array([approximate_request_latency - true_request_latency for true_request_latency, approximate_request_latency in response_times])
    print(f"Mean: {diffs.mean()}, Median: {np.median(diffs)}, Std: {diffs.std()}")
    print(f"95th percentile: {np.percentile(diffs, 95)} ")
    print(f"99th percentile: {np.percentile(diffs, 99)} ")
    print(f"Max: {diffs.max()}, Min: {diffs.min()}")
    
    
if __name__ == "__main__":
    asyncio.run(main())

    """
    Example output of the above:

    Processing Requests: 100%
    Difference in seconds between outer elapsed calculation and using httpx.Response elapsed property
    Mean: 46.60197113666666, Median: 40.34159100000001, Std: 36.355706944280044
    95th percentile: 108.8644228 
    99th percentile: 114.96621911999998 
    Max: 116.49648100000002, Min: 0.08809099999999859
    """

In our minimal example we were able to show that the average difference in the naive method for retreiving the response time for the LLM call in an async context is 46 seconds with the worst of the offenders have a difference of almost 2 minutes.

The differences are exacerbated by a few things but the main ones are:

  • The number of concurrent requests processed
  • The amount of blocking code after a request has been received before the next await (next time the event loop has a chance to choose a different task).

I’d be interested to learn of other solutions in this space. I’m certain there is a way to avoid the clunkiness of having to pass around the hybrid of an httpx client as well as the inference provider client. I don’t love the need to maintain a dictionary, nor’ the need to keep track of this randomly generated string for the request. I have not tested, but I also believe my solution would not work with streaming responses. Ideally somehow we could bind the elapsed parameter to the response itself, but I find this approach unlikely without monkeypatching.

Though imperfect, this approach gets us real timing data where it matters most. I hope this sparks discussion about better patterns for async observability in the LLM ecosystem.

Back to Blog

Related Posts

View All Posts »