Invocar a la función AWS Lambda
Puede utilizar Operaciones por lotes de Amazon S3 para realizar operaciones por lotes a gran escala en objetos de Amazon S3. Las operación Invocar función de AWS Lambda de Operaciones por lotes inicia las funciones de AWS Lambda para que realicen acciones personalizadas en objetos que aparecen en un manifiesto. En esta sección, se describe cómo crear una función Lambda para usarla con Operaciones por lotes de S3 y cómo crear un trabajo para invocar la función. El trabajo de Operaciones por lotes de S3 utiliza la operación LambdaInvoke
para ejecutar una función Lambda en cada objeto que aparece en un manifiesto.
Puede trabajar con Operaciones por lotes de S3 mediante la consola de Amazon S3, la AWS Command Line Interface (AWS CLI), los AWS SDK o la API de REST de Amazon S3. Para obtener más información acerca del uso de Lambda, consulte Introducción a AWS Lambda en la Guía para desarrolladores de AWS Lambda.
En las siguientes secciones, se explica cómo puede comenzar a usar Operaciones por lotes de S3 con Lambda.
Temas
Uso de Lambda con Operaciones por lotes
Cuando utilice la herramienta de operaciones por lotes de S3 con AWS Lambda, debe crear nuevas funciones de Lambda específicamente para utilizarlas con la herramienta de operaciones por lotes de S3. No puede reutilizar funciones basadas en eventos de Amazon S3 existentes con Operaciones por lotes de S3. Las funciones de eventos solo pueden recibir mensajes; no devuelven mensajes. Las funciones Lambda que se utilizan con Operaciones por lotes de S3 deben aceptar y devolver mensajes. Para obtener más información acerca del uso de Lambda con los eventos de Amazon S3, consulte Uso de AWS Lambda con Amazon S3 en la Guía para desarrolladores de AWS Lambda.
Debe crear un trabajo de Operaciones por lotes de S3 que invoque a su función Lambda. El trabajo ejecuta la misma función Lambda en todos los objetos que aparecen en el manifiesto. Puede controlar qué versiones de su función Lambda se deben usar mientras procesa los objetos de su manifiesto. Operaciones por lotes de S3 admite nombres de recursos de Amazon (ARN) no calificados, alias y versiones específicas. Para obtener más información, consulte Introducción al control de versiones de AWS Lambda en la Guía para desarrolladores de AWS Lambda.
Si proporciona el trabajo de Operaciones por lotes de S3 con un ARN de función que utiliza un alias o el calificador $LATEST
, y actualiza la versión a la que apunta cualquiera de ellos, Operaciones por lotes de S3 comenzará a llamar a la nueva versión de su función Lambda. Esto puede resultar útil cuando se desea actualizar la funcionalidad en medio de un trabajo grande. Si no quiere que Operaciones por lotes de S3 cambie la versión que se utiliza, facilite la versión específica en el parámetro FunctionARN
al crear el trabajo.
Un solo trabajo de copia de AWS Lambda con Operaciones por lotes de S3 puede admitir un manifiesto con hasta 20 000 millones de objetos.
Uso de Lambda y Operaciones por lotes con buckets de directorio
Los buckets de directorio son un tipo de bucket de Amazon S3 que está diseñado para cargas de trabajo o aplicaciones de rendimiento crítico que requieren una latencia constante de milisegundos de un solo dígito. Para obtener más información, consulte Buckets de directorio.
Existen requisitos especiales para utilizar las operaciones por lotes para invocar funciones de Lambda que actúan en los buckets de directorio. Por ejemplo, debe estructurar la solicitud de Lambda mediante un esquema JSON actualizado y especificar InvocationSchemaVersion 2.0 (no 1.0) cuando se crea el trabajo. Este esquema actualizado le permite especificar pares clave-valor opcionales para UserArguments, lo que puede modificar determinados parámetros de las funciones de Lambda existentes. Para obtener más información, consulte Automate object processing in Amazon S3 directory buckets with S3 Batch Operations and AWS Lambda
Códigos de respuesta y de resultados
Operaciones por lotes de S3 invoca la función de Lambda con una o más claves, cada una de las cuales tiene un TaskID
asociado. Operaciones por lotes de S3 espera un código de resultados por clave de las funciones de Lambda. Cualquier ID de tarea enviado en la solicitud que no se devuelva con un código de resultado por clave recibirá el código de resultado del campo treatMissingKeysAs
. treatMissingKeysAs
es un campo de solicitud opcional y su valor predeterminado es TemporaryFailure
. La siguiente tabla contiene los demás códigos de resultado y valores posibles para el campo treatMissingKeysAs
.
Código de respuesta | Descripción |
---|---|
Succeeded |
La tarea se completó normalmente. Si solicitó un informe de finalización de trabajos, la cadena de resultados de la tarea se incluye en el informe. |
TemporaryFailure |
Se detectó un error temporal en la tarea y esta se redirigirá antes de que se complete el trabajo. La cadena de resultados se pasa por alto. Si este es el último redireccionamiento, el mensaje de error se incluye en el informe final. |
PermanentFailure |
Se detectó un error permanente en la tarea. Si solicitó un informe de finalización de trabajos, la tarea se marca como Failed e incluye la cadena del mensaje de error. Las cadenas de resultados de tareas con error se pasan por alto. |
Creación de una función Lambda para utilizarla con Operaciones por lotes de S3
En esta sección, se proporcionan ejemplos de permisos de AWS Identity and Access Management (IAM) que debe utilizar con su función de Lambda. También contiene una función Lambda de ejemplo que se puede utilizar con Operaciones por lotes de S3. Si nunca ha creado una función de Lambda, consulte el Tutorial: uso de AWS Lambda con Amazon S3 en la Guía para desarrolladores de AWS Lambda.
Debe crear funciones Lambda para utilizarlas específicamente con Operaciones por lotes de S3. No puede reutilizar funciones de Lambda basadas en eventos de Amazon S3 existentes porque las funciones de Lambda que se utilizan para Operaciones por lotes de S3 deben aceptar y devolver campos de datos especiales.
importante
Las funciones de AWS Lambda escritas en Java aceptan interfaces de controlador RequestHandlerRequestStreamHandler
para la serialización y la deserialización personalizadas de una solicitud y su respuesta. Esta interfaz permite a Lambda pasar un InputStream y OutputStream al método handleRequest
de Java.
Asegúrese de usar la interfaz RequestStreamHandler
cuando utilice funciones Lambda con Operaciones por lotes de S3. Si utiliza una interfaz RequestHandler
, el trabajo por lotes producirá un error y, en el informe de finalización, aparecerá un mensaje que indica que se ha devuelto un JSON no válido en la carga útil de Lambda.
Para obtener más información, consulte Interfaces de controlador en la Guía del usuario de AWS Lambda.
Permisos de IAM de ejemplo
A continuación, se muestran ejemplos de los permisos de IAM necesarios para utilizar una función Lambda con Operaciones por lotes de S3.
ejemplo - Política de confianza de Operaciones por lotes de S3
Este es un ejemplo de la política de confianza que puede usar para el rol de IAM de Operaciones por lotes. Este rol de IAM se especifica al crear el trabajo y ofrece a Operaciones por lotes permiso para asumirlo.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "batchoperations.s3.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }
ejemplo - Política de IAM de Lambda
A continuación, se muestra un ejemplo de una política de IAM que da permiso a Operaciones por lotes de S3 para invocar a la función Lambda y leer el manifiesto de entrada.
{ "Version": "2012-10-17", "Statement": [ { "Sid": "BatchOperationsLambdaPolicy", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion", "s3:PutObject", "lambda:InvokeFunction" ], "Resource": "*" } ] }
Solicitud y respuesta de ejemplo
Esta sección contiene ejemplos de solicitudes y respuestas para la función Lambda.
ejemplo Solicitud
A continuación, se muestra un ejemplo de JSON de una solicitud para la función Lambda.
{ "invocationSchemaVersion": "1.0", "invocationId": "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo", "job": { "id": "f3cc4f60-61f6-4a2b-8a21-d07600c373ce" }, "tasks": [ { "taskId": "dGFza2lkZ29lc2hlcmUK", "s3Key": "customerImage1.jpg", "s3VersionId": "1", "s3BucketArn": "arn:aws:s3:us-east-1:0123456788:amzn-s3-demo-bucket1" } ] }
ejemplo Respuesta
A continuación, se muestra un ejemplo de JSON de una respuesta para la función Lambda.
{ "invocationSchemaVersion": "1.0", "treatMissingKeysAs" : "PermanentFailure", "invocationId" : "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo", "results": [ { "taskId": "dGFza2lkZ29lc2hlcmUK", "resultCode": "Succeeded", "resultString": "[\"Mary Major", \"John Stiles\"]" } ] }
Ejemplo de una función Lambda para Operaciones por lotes de S3
El siguiente ejemplo Python Lambda elimina un marcador de eliminación de un objeto versionado.
Como se muestra en el ejemplo, las claves de Operaciones por lotes de S3 están codificadas por URL. Para utilizar Amazon S3 con otros servicios de AWS, es importante que decodifique mediante la URL la clave que se pasa desde la herramienta de operaciones por lotes de S3.
import logging from urllib import parse import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) logger.setLevel("INFO") s3 = boto3.client("s3") def lambda_handler(event, context): """ Removes a delete marker from the specified versioned object. :param event: The S3 batch event that contains the ID of the delete marker to remove. :param context: Context about the event. :return: A result structure that Amazon S3 uses to interpret the result of the operation. When the result code is TemporaryFailure, S3 retries the operation. """ # Parse job parameters from Amazon S3 batch operations invocation_id = event["invocationId"] invocation_schema_version = event["invocationSchemaVersion"] results = [] result_code = None result_string = None task = event["tasks"][0] task_id = task["taskId"] try: obj_key = parse.unquote(task["s3Key"], encoding="utf-8") obj_version_id = task["s3VersionId"] bucket_name = task["s3BucketArn"].split(":")[-1] logger.info( "Got task: remove delete marker %s from object %s.", obj_version_id, obj_key ) try: # If this call does not raise an error, the object version is not a delete # marker and should not be deleted. response = s3.head_object( Bucket=bucket_name, Key=obj_key, VersionId=obj_version_id ) result_code = "PermanentFailure" result_string = ( f"Object {obj_key}, ID {obj_version_id} is not " f"a delete marker." ) logger.debug(response) logger.warning(result_string) except ClientError as error: delete_marker = error.response["ResponseMetadata"]["HTTPHeaders"].get( "x-amz-delete-marker", "false" ) if delete_marker == "true": logger.info( "Object %s, version %s is a delete marker.", obj_key, obj_version_id ) try: s3.delete_object( Bucket=bucket_name, Key=obj_key, VersionId=obj_version_id ) result_code = "Succeeded" result_string = ( f"Successfully removed delete marker " f"{obj_version_id} from object {obj_key}." ) logger.info(result_string) except ClientError as error: # Mark request timeout as a temporary failure so it will be retried. if error.response["Error"]["Code"] == "RequestTimeout": result_code = "TemporaryFailure" result_string = ( f"Attempt to remove delete marker from " f"object {obj_key} timed out." ) logger.info(result_string) else: raise else: raise ValueError( f"The x-amz-delete-marker header is either not " f"present or is not 'true'." ) except Exception as error: # Mark all other exceptions as permanent failures. result_code = "PermanentFailure" result_string = str(error) logger.exception(error) finally: results.append( { "taskId": task_id, "resultCode": result_code, "resultString": result_string, } ) return { "invocationSchemaVersion": invocation_schema_version, "treatMissingKeysAs": "PermanentFailure", "invocationId": invocation_id, "results": results, }
Creación de un trabajo de Operaciones por lotes de S3 que invoca a una función Lambda
Al crear un trabajo de Operaciones por lotes de S3 para invocar a una función Lambda, debe proporcionar lo siguiente:
-
El ARN de la función Lambda (que puede incluir el alias de la función o un número de versión específico)
-
Un rol de IAM con permiso para invocar la función
-
El parámetro de acción
LambdaInvokeFunction
Para obtener más información acerca de cómo crear un trabajo de Operaciones por lotes de S3, consulte Creación de trabajos de operaciones por lotes de S3 y Operaciones compatibles con las operaciones por lotes de S3.
En el siguiente ejemplo, se crea un trabajo de Operaciones por lotes de S3 que invoca una función de Lambda mediante la AWS CLI. Para utilizar este ejemplo, reemplace los
con su propia información.user input
placeholders
aws s3control create-job --account-id
account-id
--operation '{"LambdaInvoke": { "FunctionArn": "arn:aws:lambda:region
:account-id
:function:LambdaFunctionName
" } }' --manifest '{"Spec":{"Format":"S3BatchOperations_CSV_20180820","Fields":["Bucket","Key"]},"Location":{"ObjectArn":"arn:aws:s3:::amzn-s3-demo-manifest-bucket
","ETag":"ManifestETag
"}}' --report '{"Bucket":"arn:aws:s3:::amzn-s3-demo-bucket
","Format":"Report_CSV_20180820","Enabled":true,"Prefix":"ReportPrefix
","ReportScope":"AllTasks"}' --priority2
--role-arn arn:aws:iam::account-id
:role/BatchOperationsRole
--regionregion
--description "Lambda Function
"
Proporcionar información de tareas en manifiestos de Lambda
Cuando utiliza funciones de AWS Lambda con Operaciones por lotes de S3, es posible que desee que se adjunten datos adicionales a cada tarea o clave en la que se opera. Por ejemplo, es posible que desee tener una clave de objeto de origen y una nueva clave de objeto. Su función Lambda podría copiar la clave de origen en un nuevo bucket de S3 con un nuevo nombre. De forma predeterminada, Operaciones por lotes le permite especificar solo el bucket de destino y una lista de claves de origen en el manifiesto de entrada para su trabajo. En los siguientes ejemplos, se describe cómo puede incluir datos adicionales en su manifiesto para ejecutar funciones de Lambda más complejas.
Para especificar parámetros por clave en el manifiesto de Operaciones por lotes de S3 para utilizarlos en el código de la función Lambda, use el siguiente formato JSON codificado mediante URL. El campo key
se pasa a su función Lambda como si fuera una clave de objeto de Amazon S3. Pero la función de Lambda puede interpretar que contiene otros valores o claves múltiples, como se muestra en los siguientes ejemplos.
nota
El número máximo de caracteres para el campo key
del manifiesto es 1024.
ejemplo - Manifiesto que sustituye las “claves de Amazon S3” por cadenas JSON
La versión codificada mediante URL debe facilitarse a Operaciones por lotes de S3.
amzn-s3-demo-bucket
,{"origKey": "object1key
", "newKey": "newObject1Key
"}amzn-s3-demo-bucket
,{"origKey": "object2key
", "newKey": "newObject2Key
"}amzn-s3-demo-bucket
,{"origKey": "object3key
", "newKey": "newObject3Key
"}
ejemplo - Manifiesto codificado mediante URL
Esta versión codificada mediante URL debe facilitarse a Operaciones por lotes de S3. La versión no codificada en URL no funciona.
amzn-s3-demo-bucket
,%7B%22origKey%22%3A%20%22object1key
%22%2C%20%22newKey%22%3A%20%22newObject1Key
%22%7Damzn-s3-demo-bucket
,%7B%22origKey%22%3A%20%22object2key
%22%2C%20%22newKey%22%3A%20%22newObject2Key
%22%7Damzn-s3-demo-bucket
,%7B%22origKey%22%3A%20%22object3key
%22%2C%20%22newKey%22%3A%20%22newObject3Key
%22%7D
ejemplo - Función Lambda con formato de manifiesto que escribe resultados en el informe del trabajo
Este ejemplo de manifiesto codificado en URL contiene claves de objeto delimitadas por barra vertical para que las analice la siguiente función de Lambda.
amzn-s3-demo-bucket
,object1key
%7Cloweramzn-s3-demo-bucket
,object2key
%7Cupperamzn-s3-demo-bucket
,object3key
%7Creverseamzn-s3-demo-bucket
,object4key
%7Cdelete
Esta función de Lambda muestra cómo analizar una tarea delimitada por barra vertical que está codificada en el manifiesto de Operaciones por lotes de S3. La tarea indica qué operación de revisión se aplica al objeto especificado.
import logging from urllib import parse import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) logger.setLevel("INFO") s3 = boto3.resource("s3") def lambda_handler(event, context): """ Applies the specified revision to the specified object. :param event: The Amazon S3 batch event that contains the ID of the object to revise and the revision type to apply. :param context: Context about the event. :return: A result structure that Amazon S3 uses to interpret the result of the operation. """ # Parse job parameters from Amazon S3 batch operations invocation_id = event["invocationId"] invocation_schema_version = event["invocationSchemaVersion"] results = [] result_code = None result_string = None task = event["tasks"][0] task_id = task["taskId"] # The revision type is packed with the object key as a pipe-delimited string. obj_key, revision = parse.unquote(task["s3Key"], encoding="utf-8").split("|") bucket_name = task["s3BucketArn"].split(":")[-1] logger.info("Got task: apply revision %s to %s.", revision, obj_key) try: stanza_obj = s3.Bucket(bucket_name).Object(obj_key) stanza = stanza_obj.get()["Body"].read().decode("utf-8") if revision == "lower": stanza = stanza.lower() elif revision == "upper": stanza = stanza.upper() elif revision == "reverse": stanza = stanza[::-1] elif revision == "delete": pass else: raise TypeError(f"Can't handle revision type '{revision}'.") if revision == "delete": stanza_obj.delete() result_string = f"Deleted stanza {stanza_obj.key}." else: stanza_obj.put(Body=bytes(stanza, "utf-8")) result_string = ( f"Applied revision type '{revision}' to " f"stanza {stanza_obj.key}." ) logger.info(result_string) result_code = "Succeeded" except ClientError as error: if error.response["Error"]["Code"] == "NoSuchKey": result_code = "Succeeded" result_string = ( f"Stanza {obj_key} not found, assuming it was deleted " f"in an earlier revision." ) logger.info(result_string) else: result_code = "PermanentFailure" result_string = ( f"Got exception when applying revision type '{revision}' " f"to {obj_key}: {error}." ) logger.exception(result_string) finally: results.append( { "taskId": task_id, "resultCode": result_code, "resultString": result_string, } ) return { "invocationSchemaVersion": invocation_schema_version, "treatMissingKeysAs": "PermanentFailure", "invocationId": invocation_id, "results": results, }
Tutorial operaciones por lotes de S3
En el siguiente tutorial se presentan procedimientos integrales completos para algunas tareas de operaciones por lotes con Lambda. En este tutorial, aprenderá a configurar Operaciones por lotes para invocar una función de Lambda para la transcodificación por lotes de vídeos almacenados en un bucket de origen de S3. La función de Lambda llama a AWS Elemental MediaConvert para transcodificar los vídeos.