Physical Address

304 North Cardinal St.
Dorchester Center, MA 02124

Salesforce Data Query AWS Lambda Function

Hey there! Today, I’m going to walk you through a pretty neat AWS Lambda function that connects to Salesforce and queries object data. This function is particularly cool because it uses dynamic field mapping stored in S3 and keeps all the sensitive stuff secure in AWS Secrets Manager. Let’s break it down!

Overview

This Lambda function does four main things:

  1. Grabs credentials from AWS Secrets Manager
  2. Connects to Salesforce using those credentials
  3. Reads field mapping configurations from an S3 bucket
  4. Queries Salesforce records based on the object API name and record ID

The Important Bits

Creating a Salesforce Connected App (Quick Setup)

1. Create Connected App

    Setup → App Manager → New Connected App

Basic Info:
- Connected App Name: Lambda Integration
- API Name: Lambda_Integration
- Contact Email: [Your email]

API Settings:
✓ Enable OAuth Settings
✓ Enable for Device Flow
- Callback URL: http://localhost:8080/callback
- Selected OAuth Scopes:
• Access and manage your data (api)
• Perform requests at any time (refresh_token, offline_access)

2. Configure OAuth & Profiles

    Setup → App Manager → Find your app → Manage → Edit Policies

OAuth Policies:
- Permitted Users: Admin approved users are pre-authorized
- IP Relaxation: Relax IP restrictions
- Refresh Token Policy: Valid until revoked

Profiles:
- Click "Manage Profiles"
- Add "System Administrator"
- Save

    

    

3. Get Credentials & Store in AWS Secrets

    // Copy Consumer Key & Secret from Connected App page
// Store in AWS Secrets Manager as:
{
    "SECRET_STRING_auth_url": "https://login.salesforce.com/services/oauth2/token",
    "SECRET_STRING_client_id": "YOUR_CONSUMER_KEY",
    "SECRET_STRING_client_secret": "YOUR_CONSUMER_SECRET",
    "SECRET_STRING_username": "YOUR_SALESFORCE_USERNAME",
    "SECRET_STRING_password": "YOUR_PASSWORD_WITH_SECURITY_TOKEN"
}

    

    

Required Lambda Layer

The code uses the requests library, which needs to be added as a Lambda Layer.

IAM Role Configuration

First, here’s the IAM role permissions you’ll need to make this work:

    {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetSecretValue"
            ],
            "Resource": "arn:aws:secretsmanager:us-west-2:*:secret:tso/alpha-*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject"
            ],
            "Resource": "arn:aws:s3:::tp-query-records-salesforce-field-map/*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "*"
        }
    ]
}

    

    

Key Components Explained

  1. Secrets Management: The get_secret() function pulls Salesforce credentials from AWS Secrets Manager. Super secure way to handle sensitive data!
  2. Salesforce Connection: connect_to_salesforce() handles the OAuth flow with Salesforce. It uses the credentials to get an access token and sets up a session for subsequent requests.
  3. Field Mapping: get_field_mapping_from_s3() reads a CSV file from S3 that contains the field mapping configuration. This makes the function super flexible – you can change what fields to query just by updating the CSV!
  4. Query Building: query_salesforce_record() constructs and executes the SOQL query using the field mapping and filters.

Cool Features

  • Error Handling: The code has comprehensive error handling with logging at every step
  • Dynamic Field Selection: Fields to query are configured via S3, so no code changes needed to modify queries
  • Clean Response Format: Returns well-structured JSON responses with appropriate HTTP status codes

The Complete Code

import csv
import json
import requests
import os
import boto3
import io
import logging
from botocore.exceptions import ClientError

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def get_secret():
    """
    Retrieve secrets from AWS Secrets Manager
    """
    secret_name = "tso/alpha"
    region_name = "us-west-2"
    
    logger.info(f"Retrieving secret {secret_name} from {region_name}")
    try:
        session = boto3.session.Session()
        client = session.client(
            service_name='secretsmanager',
            region_name=region_name
        )
        
        get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
        
        if 'SecretString' in get_secret_value_response:
            secret = json.loads(get_secret_value_response['SecretString'])
            logger.info("Successfully retrieved secret")
            return secret
            
    except ClientError as e:
        logger.error(f"Error retrieving secret: {str(e)}")
        raise e

def connect_to_salesforce(credentials):
    """
    Establish connection to Salesforce using provided credentials
    """
    auth_url = credentials['SECRET_STRING_token_url']
    logger.info("Attempting to connect to Salesforce")
    
    params = {
        'grant_type': 'password',
        'client_id': credentials['SECRET_STRING_client_id'],
        'client_secret': credentials['SECRET_STRING_client_secret'],
        'username': credentials['SECRET_STRING_username'],
        'password': credentials['SECRET_STRING_password']
    }
    
    try:
        response = requests.post(auth_url, data=params)
        response.raise_for_status()
        auth_data = response.json()
        access_token = auth_data['access_token']
        instance_url = auth_data['instance_url']
        
        session = requests.Session()
        session.headers.update({
            'Authorization': f'Bearer {access_token}',
            'Content-Type': 'application/json'
        })
        
        logger.info("Successfully connected to Salesforce")
        return {
            'success': True,
            'session': session,
            'instance_url': instance_url
        }
        
    except requests.exceptions.RequestException as e:
        logger.error(f"Failed to connect to Salesforce: {str(e)}")
        return {
            'success': False,
            'error': str(e)
        }

def get_field_mapping_from_s3(object_api_name: str) -> dict:
    """
    Reads field mapping from S3 CSV file for given object
    """
    bucket_name = 'BUCKET_NAME_HERE'
    file_key = 'FILE_NAME_HERE'
    
    logger.info(f"Retrieving field mapping for {object_api_name} from S3")
    try:
        s3_client = boto3.client('s3')
        response = s3_client.get_object(Bucket=bucket_name, Key=file_key)
        csv_content = response['Body'].read().decode('utf-8')
        csv_reader = csv.DictReader(io.StringIO(csv_content))
        
        for row in csv_reader:
            if row['\ufeffOBJECT_API_NAME'] == object_api_name:
                logger.info(f"Found mapping for {object_api_name}")
                return {
                    'fields': row['FIELD_API_NAME'].replace(' ', ''),
                    'where_clause': row['WHERE_CLAUSE']
                }
        
        logger.error(f"No mapping found for object: {object_api_name}")
        raise ValueError(f"No mapping found for object: {object_api_name}")
        
    except Exception as e:
        logger.error(f"Error reading S3 file: {str(e)}")
        raise

def query_salesforce_record(session, instance_url, object_api_name: str, record_id: str) -> dict:
    """
    Query Salesforce record based on object API name and record ID
    """
    logger.info(f"Querying Salesforce record: {record_id} from {object_api_name}")
    try:
        field_mapping = get_field_mapping_from_s3(object_api_name)
        
        query = f"""
            SELECT {field_mapping['fields']}
            FROM {object_api_name}
            {field_mapping['where_clause']}'{record_id}'
        """
        logger.debug(f"SOQL Query: {query}")
        url = f"{instance_url}/services/data/v57.0/query"
        response = session.get(url, params={'q': query})
        response.raise_for_status()
        
        logger.info("Successfully queried Salesforce record")
        return {
            'success': True,
            'data': response.json()
        }
        
    except requests.exceptions.RequestException as e:
        logger.error(f"Failed to query Salesforce record: {str(e)}")
        return {
            'success': False,
            'error': str(e)
        }

def lambda_handler(event, context):
    logger.info(f"Lambda invocation started with event: {json.dumps(event)}")
    try:
        # Validate input parameters
        if 'SalesforceRecordId' not in event or 'ObjectAPIName' not in event:
            logger.error("Missing required parameters")
            return {
                'statusCode': 400,
                'body': json.dumps({
                    'error': 'Missing required parameters: SalesforceRecordId or ObjectAPIName'
                })
            }
            
        record_id = event['SalesforceRecordId']
        object_api_name = event['ObjectAPIName']
        
        # Connect to Salesforce
        credentials = get_secret()
        sf_connection = connect_to_salesforce(credentials)
        
        if not sf_connection['success']:
            logger.error("Failed to connect to Salesforce")
            return {
                'statusCode': 500,
                'body': json.dumps({
                    'error': 'Failed to connect to Salesforce',
                    'details': sf_connection['error']
                })
            }
        
        # Query the specific record
        record_result = query_salesforce_record(
            sf_connection['session'],
            sf_connection['instance_url'],
            object_api_name,
            record_id
        )
        
        if not record_result['success']:
            logger.error("Failed to query record")
            return {
                'statusCode': 500,
                'body': json.dumps({
                    'error': 'Failed to query record',
                    'details': record_result['error']
                })
            }
        
        # Process the results
        records = record_result['data']['records']
        logger.info(f"Successfully retrieved {len(records)} records")
        
        return {
            'statusCode': 200,
            'body': json.dumps({
                'message': 'Successfully retrieved record',
                'record_count': len(records),
                'records': records
            })
        }
        
    except Exception as e:
        logger.error(f"Internal server error: {str(e)}")
        return {
            'statusCode': 500,
            'body': json.dumps({
                'error': 'Internal server error',
                'details': str(e)
            })
        }

Usage Example

    {
    "SalesforceRecordId": "a0b1c2d3e4f5",
    "ObjectAPIName": "Apttus__APTS_Agreement__c"
}

    

    

This will query the specified Salesforce Agreement record and return all the fields configured in the mapping file.

That’s it! A pretty slick way to handle Salesforce queries while keeping everything secure and maintainable. The S3-based field mapping is particularly neat as it allows business users to modify what fields get queried without touching any code.

Remember to set up your AWS resources (Secrets Manager and S3) before deploying this function, and make sure your Salesforce connected app is configured correctly!

Troubleshooting Tips

If you encounter the error “Unable to import module ‘requests'”, check that:

  1. The Lambda layer is properly attached to your function
  2. The layer contains the correct directory structure (python/lib/python3.9/site-packages/requests)
  3. The Python runtime version matches the one used in the layer

Leave a Reply

Your email address will not be published. Required fields are marked *