Physical Address
304 North Cardinal St.
Dorchester Center, MA 02124
Physical Address
304 North Cardinal St.
Dorchester Center, MA 02124
Hey there! Today, I’m going to walk you through a pretty neat AWS Lambda function that connects to Salesforce and queries object data. This function is particularly cool because it uses dynamic field mapping stored in S3 and keeps all the sensitive stuff secure in AWS Secrets Manager. Let’s break it down!
This Lambda function does four main things:
Setup → App Manager → New Connected App
Basic Info:
- Connected App Name: Lambda Integration
- API Name: Lambda_Integration
- Contact Email: [Your email]
API Settings:
✓ Enable OAuth Settings
✓ Enable for Device Flow
- Callback URL: http://localhost:8080/callback
- Selected OAuth Scopes:
• Access and manage your data (api)
• Perform requests at any time (refresh_token, offline_access)
Setup → App Manager → Find your app → Manage → Edit Policies
OAuth Policies:
- Permitted Users: Admin approved users are pre-authorized
- IP Relaxation: Relax IP restrictions
- Refresh Token Policy: Valid until revoked
Profiles:
- Click "Manage Profiles"
- Add "System Administrator"
- Save
// Copy Consumer Key & Secret from Connected App page
// Store in AWS Secrets Manager as:
{
"SECRET_STRING_auth_url": "https://login.salesforce.com/services/oauth2/token",
"SECRET_STRING_client_id": "YOUR_CONSUMER_KEY",
"SECRET_STRING_client_secret": "YOUR_CONSUMER_SECRET",
"SECRET_STRING_username": "YOUR_SALESFORCE_USERNAME",
"SECRET_STRING_password": "YOUR_PASSWORD_WITH_SECURITY_TOKEN"
}
The code uses the requests
library, which needs to be added as a Lambda Layer.
First, here’s the IAM role permissions you’ll need to make this work:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"secretsmanager:GetSecretValue"
],
"Resource": "arn:aws:secretsmanager:us-west-2:*:secret:tso/alpha-*"
},
{
"Effect": "Allow",
"Action": [
"s3:GetObject"
],
"Resource": "arn:aws:s3:::tp-query-records-salesforce-field-map/*"
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "*"
}
]
}
get_secret()
function pulls Salesforce credentials from AWS Secrets Manager. Super secure way to handle sensitive data!connect_to_salesforce()
handles the OAuth flow with Salesforce. It uses the credentials to get an access token and sets up a session for subsequent requests.get_field_mapping_from_s3()
reads a CSV file from S3 that contains the field mapping configuration. This makes the function super flexible – you can change what fields to query just by updating the CSV!query_salesforce_record()
constructs and executes the SOQL query using the field mapping and filters.import csv
import json
import requests
import os
import boto3
import io
import logging
from botocore.exceptions import ClientError
# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def get_secret():
"""
Retrieve secrets from AWS Secrets Manager
"""
secret_name = "tso/alpha"
region_name = "us-west-2"
logger.info(f"Retrieving secret {secret_name} from {region_name}")
try:
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name
)
get_secret_value_response = client.get_secret_value(
SecretId=secret_name
)
if 'SecretString' in get_secret_value_response:
secret = json.loads(get_secret_value_response['SecretString'])
logger.info("Successfully retrieved secret")
return secret
except ClientError as e:
logger.error(f"Error retrieving secret: {str(e)}")
raise e
def connect_to_salesforce(credentials):
"""
Establish connection to Salesforce using provided credentials
"""
auth_url = credentials['SECRET_STRING_token_url']
logger.info("Attempting to connect to Salesforce")
params = {
'grant_type': 'password',
'client_id': credentials['SECRET_STRING_client_id'],
'client_secret': credentials['SECRET_STRING_client_secret'],
'username': credentials['SECRET_STRING_username'],
'password': credentials['SECRET_STRING_password']
}
try:
response = requests.post(auth_url, data=params)
response.raise_for_status()
auth_data = response.json()
access_token = auth_data['access_token']
instance_url = auth_data['instance_url']
session = requests.Session()
session.headers.update({
'Authorization': f'Bearer {access_token}',
'Content-Type': 'application/json'
})
logger.info("Successfully connected to Salesforce")
return {
'success': True,
'session': session,
'instance_url': instance_url
}
except requests.exceptions.RequestException as e:
logger.error(f"Failed to connect to Salesforce: {str(e)}")
return {
'success': False,
'error': str(e)
}
def get_field_mapping_from_s3(object_api_name: str) -> dict:
"""
Reads field mapping from S3 CSV file for given object
"""
bucket_name = 'BUCKET_NAME_HERE'
file_key = 'FILE_NAME_HERE'
logger.info(f"Retrieving field mapping for {object_api_name} from S3")
try:
s3_client = boto3.client('s3')
response = s3_client.get_object(Bucket=bucket_name, Key=file_key)
csv_content = response['Body'].read().decode('utf-8')
csv_reader = csv.DictReader(io.StringIO(csv_content))
for row in csv_reader:
if row['\ufeffOBJECT_API_NAME'] == object_api_name:
logger.info(f"Found mapping for {object_api_name}")
return {
'fields': row['FIELD_API_NAME'].replace(' ', ''),
'where_clause': row['WHERE_CLAUSE']
}
logger.error(f"No mapping found for object: {object_api_name}")
raise ValueError(f"No mapping found for object: {object_api_name}")
except Exception as e:
logger.error(f"Error reading S3 file: {str(e)}")
raise
def query_salesforce_record(session, instance_url, object_api_name: str, record_id: str) -> dict:
"""
Query Salesforce record based on object API name and record ID
"""
logger.info(f"Querying Salesforce record: {record_id} from {object_api_name}")
try:
field_mapping = get_field_mapping_from_s3(object_api_name)
query = f"""
SELECT {field_mapping['fields']}
FROM {object_api_name}
{field_mapping['where_clause']}'{record_id}'
"""
logger.debug(f"SOQL Query: {query}")
url = f"{instance_url}/services/data/v57.0/query"
response = session.get(url, params={'q': query})
response.raise_for_status()
logger.info("Successfully queried Salesforce record")
return {
'success': True,
'data': response.json()
}
except requests.exceptions.RequestException as e:
logger.error(f"Failed to query Salesforce record: {str(e)}")
return {
'success': False,
'error': str(e)
}
def lambda_handler(event, context):
logger.info(f"Lambda invocation started with event: {json.dumps(event)}")
try:
# Validate input parameters
if 'SalesforceRecordId' not in event or 'ObjectAPIName' not in event:
logger.error("Missing required parameters")
return {
'statusCode': 400,
'body': json.dumps({
'error': 'Missing required parameters: SalesforceRecordId or ObjectAPIName'
})
}
record_id = event['SalesforceRecordId']
object_api_name = event['ObjectAPIName']
# Connect to Salesforce
credentials = get_secret()
sf_connection = connect_to_salesforce(credentials)
if not sf_connection['success']:
logger.error("Failed to connect to Salesforce")
return {
'statusCode': 500,
'body': json.dumps({
'error': 'Failed to connect to Salesforce',
'details': sf_connection['error']
})
}
# Query the specific record
record_result = query_salesforce_record(
sf_connection['session'],
sf_connection['instance_url'],
object_api_name,
record_id
)
if not record_result['success']:
logger.error("Failed to query record")
return {
'statusCode': 500,
'body': json.dumps({
'error': 'Failed to query record',
'details': record_result['error']
})
}
# Process the results
records = record_result['data']['records']
logger.info(f"Successfully retrieved {len(records)} records")
return {
'statusCode': 200,
'body': json.dumps({
'message': 'Successfully retrieved record',
'record_count': len(records),
'records': records
})
}
except Exception as e:
logger.error(f"Internal server error: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps({
'error': 'Internal server error',
'details': str(e)
})
}
{
"SalesforceRecordId": "a0b1c2d3e4f5",
"ObjectAPIName": "Apttus__APTS_Agreement__c"
}
This will query the specified Salesforce Agreement record and return all the fields configured in the mapping file.
That’s it! A pretty slick way to handle Salesforce queries while keeping everything secure and maintainable. The S3-based field mapping is particularly neat as it allows business users to modify what fields get queried without touching any code.
Remember to set up your AWS resources (Secrets Manager and S3) before deploying this function, and make sure your Salesforce connected app is configured correctly!
If you encounter the error “Unable to import module ‘requests'”, check that:
python/lib/python3.9/site-packages/requests
)