Building Reliable Tool Calling in AI Agents with Message Queues

Learn how to implement reliable distributed tool calling for AI agents using message queues. Ensure at-least-once execution with no duplicates.

Nadeesha Cabral on 27-12-2024

Building Reliable Tool Calling in AI Agents with Message Queues

In this post, we'll explore how to implement at-least-once execution guarantees for AI agent tools using message queues, focusing on distributed tool calling patterns that prevent duplicate operation, making your agents more resilient and reliable.

The Challenge of Reliable AI Agent Tool Execution

When building AI agents that interact with real-world systems through tool calls, reliability and idempotency are important if you ever hope to deploy them in production. You want the tool calls to execute reliably, and sometimes idempotency is important to prevent duplicate operations. A common pitfall in distributed AI systems is implementing tool calls directly in the request-response cycle, which can lead to issues like:

  • Process crashes causing failed or partial executions
  • Duplicate executions of destructive operations
  • Lost requests due to timeouts
  • Resource leaks in distributed environments
  • Race conditions in concurrent tool execution

Let's explore why message queues are essential for reliable distributed tool calling in AI agent architectures.

The Problem with Direct Tool Execution in AI Agents

Here's a typical example of implementing an OpenAI tool call directly in process.

import OpenAI from 'openai';
import { raiseExpensivePurchaseOrder } from './purchase-service';

const openai = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY,
});

async function handlePurchaseRequest(userId: string, userMessage: string) {
  const completion = await openai.chat.completions.create({
    messages: [
      { role: "user", content: userMessage }
    ],
    model: "gpt-4-turbo-preview",
    tools: [{
      type: "function",
      function: {
        name: "raise_purchase_order",
        description: "Raise an expensive purchase order that needs approval",
        parameters: {
          type: "object",
          properties: {
            purchaseOrderId: { type: "string" },
            amount: { type: "number" },
            description: { type: "string" }
          },
          required: ["purchaseOrderId", "amount"]
        }
      }
    }]
  });

  const toolCall = completion.choices[0].message.tool_calls?.[0];
  
  if (toolCall?.type === 'function' && toolCall.function.name === 'raise_purchase_order') {
    const params = JSON.parse(toolCall.function.arguments);
    
    // This is dangerous in distributed systems:
    // - No at-most-once execution guarantee - could raise duplicate POs!
    // - No distributed locking
    // - No handling of partial failures
    await raiseExpensivePurchaseOrder(params.purchaseOrderId, params.amount, params.description);
  }
}

Distributed System Architecture for AI Agent Tool Calling

The above code is fine as long as you're running your entire application in a single process, in a single machine. But, if you're running your application in a distributed environment, you're going to run into issues.

Here's how a proper message queue-based system should work in a distributed environment:

sequenceDiagram
    participant Requester
    participant LLM
    participant Tool Call
    participant Message Queue
    participant Database

    Requester->>LLM: Send request
    LLM->>Tool Call: Identify tool to execute
    Tool Call->>Message Queue: Queue PO with unique ID
    Message Queue-->>Tool Call: Acknowledge message
    Tool Call->>Database: Record attempt with unique ID
    Note over Database: Essential for preventing<br/>duplicate executions
    Tool Call-->>LLM: Return status
    LLM-->>Requester: Return response

Implementing At-Most-Once Execution with SQS MessageId

One of the key advantages of using AWS SQS is that each message comes with a guaranteed-unique MessageId. We can use this MessageId as our deduplication key in our database, eliminating the need to generate our own identifiers. The database is crucial here - without it, we have no way to prevent duplicate purchase orders from being raised in a distributed system.

NOTE: Although we're using SQS in this example, the same pattern can be applied to any message queue system including any AMQP-based message queue system.

import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs';
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { DynamoDBDocument } from '@aws-sdk/lib-dynamodb';

const sqs = new SQSClient({ region: 'us-east-1' });
const dynamodb = DynamoDBDocument.from(new DynamoDBClient({ region: 'us-east-1' }));

// Worker process handling distributed tool execution
async function processToolExecutionQueue() {
  while (true) {
    try {
      const messages = await sqs.receiveMessage({
        QueueUrl: process.env.TOOL_EXECUTION_QUEUE_URL,
        MaxNumberOfMessages: 1,
        WaitTimeSeconds: 20,
      });

      for (const message of messages.Messages || []) {
        const { tool, params, userId } = JSON.parse(message.Body);

        if (tool === 'raise_purchase_order') {
          try {
            // Check if this PO has already been raised
            const executionRecord = await dynamodb.get({
              TableName: 'ToolExecutions',
              Key: {
                purchaseOrderId: params.purchaseOrderId,
              },
            });

            if (executionRecord.Item) {
              console.log(`Skipping duplicate PO ${params.purchaseOrderId}`);
              await sqs.deleteMessage({
                QueueUrl: process.env.TOOL_EXECUTION_QUEUE_URL,
                ReceiptHandle: message.ReceiptHandle,
              });
              continue;
            }

            await raiseExpensivePurchaseOrder(params.purchaseOrderId, params.amount, params.description);
            
            // Record successful execution using purchaseOrderId as the key
            await dynamodb.put({
              TableName: 'ToolExecutions',
              Item: {
                purchaseOrderId: params.purchaseOrderId,
                tool,
                params,
                userId,
                executedAt: new Date().toISOString(),
              },
              // Ensure we never raise the same PO twice
              ConditionExpression: 'attribute_not_exists(purchaseOrderId)',
            });
            
            await sqs.deleteMessage({
              QueueUrl: process.env.TOOL_EXECUTION_QUEUE_URL,
              ReceiptHandle: message.ReceiptHandle,
            });
          } catch (error) {
            if (error.name === 'ConditionalCheckFailedException') {
              // Another worker processed this MessageId first
              console.log(`Concurrent execution detected for MessageId ${message.MessageId}`);
            } else {
              console.error('Tool execution failed:', error);
            }
          }
        }
      }
    } catch (error) {
      console.error('Queue processing error:', error);
      await new Promise(resolve => setTimeout(resolve, 5000));
    }
  }
}

With this pattern, you can let the LLM call the tool multiple times, and the tool will only be executed once. In distributed systems, this is a common pattern to ensure idempotency via combining at-least-once delivery with at-most-once execution. At the end of the day, the subscription will only be cancelled once.

Best Practices for Distributed AI Agent Tool Execution

  1. Use Business Identifiers for Deduplication: When possible, use natural business identifiers (like subscriptionId) as your deduplication key rather than generating new IDs.

  2. Implement Conditional Writes: Use DynamoDB's conditional expressions to prevent race conditions in distributed environments.

  3. Handle Partial Failures: Ensure your system can recover from partial failures without leaving inconsistent state.

  4. Monitor Distributed System Metrics: Track message age, queue depth, and execution success rates across your distributed workers.

  5. Implement Dead Letter Queues: Handle persistent failures in your distributed system by moving problematic messages to a DLQ.

Inferable's Approach to Distributed Tool Calling

Inferable handles this complexity for you with its built-in distributed message queue system. When you register functions with Inferable, they are automatically executed through a durable message queue that ensures at-most-once execution across your distributed system. You can fine-tune the behaviour at the function level through the options parameter. See Inferable functions for more details.

import { Inferable } from 'inferable';
import { z } from 'zod';

const client = new Inferable();

// Register the function with Inferable's distributed execution system
client.default.register({
  name: "raise_purchase_order",
  description: "Raise an expensive purchase order that needs approval",
  schema: {
    input: z.object({
      purchaseOrderId: z.string(),
      amount: z.number(),
      description: z.string()
    })
  },
  func: async (input) => {
    // Inferable handles distributed execution, deduplication, and retries
    const result = await raiseExpensivePurchaseOrder(input.purchaseOrderId, input.amount, input.description);
    return { success: true, ...result };
  }
});

// Start the service to begin processing distributed messages
client.default.start();

Inferable's Control Plane manages the entire distributed execution environment:

  • Reliable message routing across distributed workers
  • At-most-once execution guarantees using internal message IDs
  • Automatic retries with exponential backoff in distributed scenarios
  • Comprehensive observability for distributed tool execution

This allows you to focus on implementing your AI agent's business logic while Inferable ensures reliable, distributed execution of your agent's tools across your entire system.

Subscribe to our newsletter for high signal updates from the cross section of AI agents, LLMs, and distributed systems.

Maximum one email per week.

Subscribe to Newsletter