Modern AI applications often require integration with multiple large language model (LLM) APIs for leveraging different offerings from different LLM. The closed-source LLMs offer APIs that can readily be used for many real-world business problems. However, for large enterprises where data volumes are immense and key metrics such as reduced latency and improved throughput are crucial, it becomes essential to make parallel API calls to LLMs. Besides, these calls must be secure especially when dealing with sensitive enterprise information. This article explores various approaches to make secure parallel API calls using Apache Beam and Langchain framework on Dataflow.
What is Langchain?
Langchain is an open-source framework designed to help developers build applications powered by large language models (LLMs). It simplifies the integration of LLMs like GPT into workflows by providing tools for prompt management, chaining of multiple model calls, and working with external data sources such as databases and APIs.
What is Apache Beam?
Apache Beam is an open-source, unified programming model used for defining both batch and stream data processing pipelines. It allows developers to write processing logic once and execute it across multiple processing engines, such as Google Cloud Dataflow, Apache Spark, and Flink. Beam simplifies large-scale data processing by offering flexible APIs for transforming, aggregating, and analyzing data in real-time or through scheduled batch jobs, making it useful for ETL (Extract, Transform and Load operations), event processing, and machine learning pipelines.
What is Dataflow?
Google Cloud Dataflow is a fully managed cloud service that offers a robust platform for data processing. Its distributed architecture enables asynchronous API requests to be executed in parallel across multiple workers, automatically scaling to accommodate demand. Dataflow manages retries, failure handling, and resource optimization, ensuring high throughput and reliability for tasks, making it suitable for use cases that require concurrency at scale.
How to Make Secure API Calls?
One simple approach to make API calls is by embedding secrets in the code. However, this can lead to unauthorized access by malicious users, potentially incurring significant usage charges and breaching quota limits. To avoid these consequences, there are several secret management solutions (e.g., google secret manager, Kubernetes secrets, Azure key vault), each with its own strengths and ideal use cases. For this blog, we shall limit our focus to the following two approaches for managing secrets using a beam pipeline and Langchain framework to process LLM API requests.
- Environment Variables: Storing secrets in environment variables ensures that the secrets are not hard coded in your application. This is a recommended approach for managing secrets securely.
- Google Cloud Secret Manager: It is a service provided by Google Cloud Platform (GCP) to securely store and manage sensitive information (e.g., API keys, passwords, certificates, and other secrets). It offers a more advanced and secure way to store and manage secrets, with tight access controls and auditing capabilities.
Let us look at the details of each approach and the implementation.
Using Environment Variables
This approach is suitable for simpler use cases where security and scalability requirements are minimal. It provides direct access to secrets without additional API calls, hence reducing latency. However, it poses challenges in ensuring consistent secret distribution and updates across distributed systems.
This approach requires following two libraries:
- Setuptools: It is a Python library used for packaging and distributing Python projects, providing tools to define dependencies, create source distributions, and install packages.
- dotenv: This library is used to load environment variables from a .env file into the environment and can be installed using the following command.
pip install python-dotenv
Following are the steps when environment variables approach is used:
- Create a .env file in the working project directory and set the environment variables.
A dotenv file (typically named .env) is a straightforward way to manage environment variables for a Python application. It helps in configuring environment-specific settings, which can be especially useful in development and production environments. It is not only easy to use but also an effective way to avert hardcoding sensitive information in the codebase. For example, the following API keys to make LLM calls can be set in the file.
CHATGPT_API_KEY=your_chatgpt_api_key
AZURE_END_POINT=your_api_end_point
- Pass the .env file as values for the “package_data” argument.
from setuptools import find_packages
from setuptools import setup
setup(
name="secure-llm-call",
version="0.1",
packages=find_packages(),
include_package_data=True,
package_data= {
'': ['.env'] },
description="Dataflow Pipeline for making closed-source LLMs API calls",
)
- Retrieve the environment variables in Dataflow pipeline. Use the `python-dotenv` library to load the environment variables from the .env file into your application. Modify your Beam pipeline code to load these environment variables.
def llm_model_inference(element):
import os
import openai
from langchain_openai.chat_models import AzureChatOpenAI, ChatOpenAI
from langchain.schema import HumanMessage, SystemMessage
from dotenv import load_dotenv
load_dotenv()
api_key = os.getenv('CHATGPT_API_KEY')
end_point = os.getenv('AZURE_END_POINT')
chat = AzureChatOpenAI(openai_api_key=api_key,
model="gpt-4",
api_version="2023-05-15",
azure_endpoint=end_point)
messages = [HumanMessage(content=f''' {element['prompt']}''')]
response = chat(messages)
return response
Using Google Secrets Manager
This approach is better suited for complex, distributed systems with higher security and scalability demands. Secrets are not only encrypted at rest but also provide fine-grained access control with IAM (Identity and Access Management) policies. It provides robust security features, centralized management, and better audit capabilities but requires additional integration effort and may introduce some latency due to network calls.
To access secrets (environment variables and files) in a distributed manner using Google Secret Manager, you can follow these steps,
- Set Up Google Secret Manager:
Create a new secret in Google Secret Manager and add the secret data. After this, enable the Secret Manager API in your Google Cloud project.
echo -n "your-api-key-value" | gcloud secrets create your-chatgpt-api-key-secret --data-file=-
gcloud services enable secretmanager.googleapis.com
- Grant Permissions:
Assign appropriate permissions to the service accounts used by your applications or worker nodes to access the secrets. You can use roles like Secret Manager Secret Accessor.
gcloud secrets add-iam-policy-binding your-chatgpt-api-key-secret --member=serviceAccount:[email protected] --role=roles/secretmanager.secretAccessor
- Access Secrets in Your Application:
Use the Google Cloud Secret Manager client library to access secrets programmatically.
from google.cloud import secretmanager
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
def get_secret(secret_id, version_id="latest"):
client = secretmanager.SecretManagerServiceClient()
name=f"projects/your_project/secrets/{secret_id}/versions/{version_id}"
response = client.access_secret_version(name=name)
return response.payload.data.decode("UTF-8")
class CreatePrompts(beam.DoFn):
def process(self, element):
prompt = f"""
Summarize the content of the provided details of the product in one line.
{element['product_details']}
"""
yield {**element, "prompt": prompt}
def llm_model_inference(element, secrets):
from langchain_openai.chat_models import AzureChatOpenAI, ChatOpenAI
from langchain.schema import HumanMessage, SystemMessage
api_key = secrets['your-chatgpt-api-key-secret']
end_point = secrets['your-azure-end-point-secret']
chat = AzureChatOpenAI(openai_api_key=api_key,
model="gpt-4",
api_version="2023-05-15",
azure_endpoint=end_point)
messages = [HumanMessage(content=f''' {element['prompt']}''')]
response = chat(messages)
return response
class GetInferences(beam.DoFn):
def __init__(self, secrets):
self.secrets = secrets
def process(self, element):
# Get inference from the LLM model
inference = llm_model_inference(element,self.secrets)
yield inference
def run():
options = PipelineOptions()
pipeline = beam.Pipeline(options=options)
output_table_spec = bigquery.TableReference(
projectId='your-gcp-project',
datasetId='your-dataset',
tableId='your-table')
secrets = {}
secret_ids = ['your-chatgpt-api-key-secret','your-azure-end-point-secret']
for secret_id in secret_ids:
secret_value = get_secret(secret_id)
secrets[secret_id] = secret_value
(pipeline
|'ReadData'>> beam.io.ReadFromText('gs://your-bucket/data/input.txt')
| 'CreatePrompts' >> beam.ParDo(CreatePrompts())
| 'GetInferences' >> beam.ParDo(GetInferences(secrets))
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
output_table_spec,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
custom_gcs_temp_location="gs://your-bucket/tmp/")
)
pipeline_results = pipeline.run()
pipeline_results.wait_until_finish()
if __name__ == '__main__':
run()
Conclusion
By leveraging the methods described above, you can make API requests in distributed systems secure and efficient. Selecting the right method depends on your specific use case and security requirements. When you are interested in ease of use, you may choose the environment variables approach. On the contrary, if the need is for enhanced security or better access control or audit, Google Secret Manager can be used. By adopting the right practice, you can ensure that your sensitive information is protected, minimize the risks of unauthorized access and potential breaches.
For deeper understanding of the tools and technologies discussed in the blog, you can explore the following resources:
Tags
Google Secret Manager, Dataflow, Apache Beam, LLM, Environment Variables, Security, Distributed Systems, GenAI
Managing Secure API Access to LLMs in Distributed Systems with Dataflow was originally published in Walmart Global Tech Blog on Medium, where people are continuing the conversation by highlighting and responding to this story.