修复将记录添加到 Kinesis Stream 时的 AWS Lambda 超时问题

修复将记录添加到 Kinesis Stream 时的 AWS Lambda 超时问题
修复将记录添加到 Kinesis Stream 时的 AWS Lambda 超时问题

对 Kinesis Data Streams 的 AWS Lambda 超时进行故障排除

想象一下,您正在 AWS 上构建实时数据管道,并通过设置将消息从 SQS 传递到 Lambda 函数,并最终传递到 Kinesis Data Stream。 📨 这个流程在理论上是无缝的,但有时现实却有其他计划。正当您准备放松时,您的 Lambda 函数日志中突然出现 ETIMEDOUT 错误。

看到此错误可能会令人沮丧,尤其是当您多次验证权限并测试该函数时。事实上,Kinesis 流中的这种间歇性 ETIMEDOUT 问题通常会意外发生,从而导致您的进度停止。重新部署后,Lambda 可能会完美运行,但随后会再次失败,似乎没有任何原因。

在这种情况下,许多开发人员都被诸如“Runtime.UnhandledPromiseRejection”和“ERR_HTTP2_STREAM_CANCEL”之类的神秘消息所困扰。当您的代码依赖于可靠且即时的数据处理时,这些超时问题可能会让人感觉像是一个路障。

在这里,我们将讨论导致这些超时的原因、处理这些超时的实用方法,以及对 AWS 配置的调整,这可能是稳定流的关键。 🛠️ 最后,您将了解如何排查和解决 ETIMEDOUT 错误,并保持 Lambda 和 Kinesis 流程顺利运行。

命令 描述
KinesisClient 初始化新的客户端实例以与 AWS Kinesis 交互。该客户端管理特定于 AWS SDK for JavaScript 的区域、重试和超时等配置,确保请求正确发送到 Kinesis。
PutRecordCommand 表示将单个记录放入 Kinesis 流中的命令。此命令接受字节数据并需要分区键,这对于在流中跨分片分配记录至关重要。
TextEncoder().encode() 将字符串数据编码为 Uint8Array 格式,这是 Kinesis 中数据的预期格式。此转换对于确保将 JSON 数据发送到 Kinesis 流时的兼容性至关重要。
Promise.allSettled() 并行处理多个异步请求并提供每个 Promise 的状态(已完成或已拒绝)。即使某些请求失败,它对于单独记录或处理每个结果特别有用。
generatePartitionKey 根据消息属性生成动态分区键的辅助函数。它确保数据分布在 Kinesis 分片上,从而有可能减少热分片并优化数据吞吐量。
processEvent 一个自定义异步函数,用于处理 SQS 消息的解析、编码和发送到 Kinesis。这种模块化功能提高了可重用性并处理发送记录时的特定错误情况。
jest.mock() 模仿 Jest 测试中特定模块或功能的行为,在本例中,这有助于模拟 Kinesis 客户端行为,而无需实际的 AWS 基础设施。这对于依赖 AWS SDK 方法的单元测试代码至关重要。
await Promise.allSettled(promises) 执行一系列承诺,确保收集所有结果,无论单个承诺结果如何。此模式对于处理数据流操作中的部分成功场景非常有价值。
console.warn() 此处用于记录特定的警告消息,例如网络超时。这种方法可以轻松调试和监控,尤其是无服务器环境中的重试逻辑和瞬态错误。
process.env 访问环境变量,这些变量可以动态设置 AWS 区域等值或 Lambda 函数中的超时设置。这对于安全地处理主代码库之外的配置数据至关重要。

使用 Kinesis Stream 增强 AWS Lambda 可靠性

提供的 JavaScript 脚本旨在创建高效的 AWS Lambda 函数,该函数从 SQS 队列检索消息,然后将其发布到 Amazon Kinesis Data Stream。该解决方案的核心在于 Lambda 函数能够异步处理消息,同时解决经常导致的连接问题 超时时间 错误。该脚本的一个关键部分是初始化 Kinesis客户端,它配置区域、重试次数和连接超时等基本属性。这些配置在云设置中至关重要,因为它们控制应用程序的响应能力以及超时前尝试连接的时间。通过设置更高的 连接超时 或者调整重试次数,我们可以帮助该函数更有效地处理网络延迟。

在 Lambda 处理程序中,脚本利用 Promise.allSettled(),处理多个异步请求时的宝贵工具。当同时处理多条记录时,必须确保每条记录都完成,无论是成功还是出错。 Promise.allSettled() 确保函数在一个请求失败时不会停止处理;相反,它单独记录每个结果。这种方法在网络连接可能不可预测的情况下特别有用。例如,如果一条记录由于网络问题而失败,但其他记录成功,则该功能可以单独记录失败的记录,从而允许开发人员隔离问题实例,而不是使整批消息失败。 🛠️

处理事件 脚本内的功能是模块化的,处理主要的数据转换和发送过程。该函数接收 SQS 消息,对其进行解析,并将其编码为 Kinesis 所需的字节格式。在这里, TextEncoder().encode() 方法至关重要,因为 Kinesis 仅接受二进制数据; JSON 必须转换为兼容的格式。这部分功能确保 Lambda 正确发送数据,减少因数据格式不匹配而出现错误的可能性。该函数还使用自定义分区键生成器函数,该函数将记录分布在 Kinesis 流的分片中。通过使用动态分区键(例如随机键),脚本可以最大限度地减少重复命中同一分片的机会,这可以防止“热分片”导致瓶颈。

最后,为了确保此设置在各种情况下都能正确运行,脚本包含 单元测试 使用笑话。单元测试可以模拟 Kinesis 客户端的行为,而无需实时 AWS 资源,从而提供了一种可靠的方法来测试 Lambda 在受控环境中处理超时或数据转换问题的能力。例如,如果 Kinesis 客户端无法连接,Jest 模拟可以模拟超时错误,验证内部的错误处理是否正确 处理事件 按预期工作。该策略可以进行稳健的验证,确保 Lambda 在多种网络条件下都是可靠的。 🧪 结合这些元素,Lambda 函数可以高效处理从 SQS 到 Kinesis 的数据,同时最大限度地减少超时和其他常见的流错误。

排查 AWS Lambda for Kinesis 流处理中的超时问题

方法 1:使用具有优化重试和自定义错误处理功能的 AWS 开发工具包的 JavaScript 解决方案

import { KinesisClient, PutRecordCommand } from "@aws-sdk/client-kinesis";
const KINESIS_CLIENT = new KinesisClient({
  region: 'us-west-2',
  maxRetries: 3,
  requestHandler: {
    connectionTimeout: 5000, // Lower timeout for quicker retries
  },
});
export const handler = async (event) => {
  const promises = event.Records.map(record => processEvent(record.body));
  const results = await Promise.allSettled(promises);
  results.forEach(result => {
    if (result.status !== "fulfilled") {
      console.error("Failed:", result.reason);
    }
  });
};
async function processEvent(body) {
  const data = JSON.parse(body);
  const partitionKey = generatePartitionKey(data);
  const encodedData = new TextEncoder().encode(JSON.stringify(data));
  try {
    await KINESIS_CLIENT.send(new PutRecordCommand({
      Data: encodedData,
      StreamName: 'InputEventStream',
      PartitionKey: partitionKey
    }));
  } catch (error) {
    console.error("Error putting record:", error);
    throw error;
  }
}
function generatePartitionKey(data) {
  return data.userId ? data.userId.toString() : Date.now().toString();
}

替代 Lambda 配置可提高网络调用的弹性

方法 2:具有可调整超时和重试机制的增强型 JavaScript 解决方案

import { KinesisClient, PutRecordCommand } from "@aws-sdk/client-kinesis";
const KINESIS_CLIENT = new KinesisClient({
  region: 'us-west-2',
  maxRetries: 5,
  httpOptions: {
    connectTimeout: 15000, // Extended timeout
    timeout: 20000 // Total request timeout
  }
});
export const handler = async (event) => {
  const results = await Promise.allSettled(event.Records.map(async (record) => {
    await processEvent(record.body);
  }));
  results.forEach((result) => {
    if (result.status !== "fulfilled") {
      console.log("Unsuccessful attempt:", result.reason);
    }
  });
};
async function processEvent(body) {
  const parsedData = JSON.parse(body);
  const partitionKey = `pk-${Math.random()}`;
  try {
    await KINESIS_CLIENT.send(new PutRecordCommand({
      StreamName: "InputEventStream",
      Data: new TextEncoder().encode(JSON.stringify(parsedData)),
      PartitionKey: partitionKey
    }));
  } catch (err) {
    if (err.name === "TimeoutError") {
      console.warn("Retry on timeout:", err);
    }
    throw err;
  }
}

针对不同环境对 Lambda 函数进行单元测试

方法 3:使用 Jest 进行 JavaScript 单元测试来验证 Kinesis 流集成

import { handler, processEvent } from './your-lambda-file.js';
import { KinesisClient } from "@aws-sdk/client-kinesis";
jest.mock("@aws-sdk/client-kinesis");
describe('Lambda Handler and Kinesis Integration', () => {
  it('should call processEvent for each record in the event', async () => {
    const mockEvent = {
      Records: [{ body: '{"userId": 1, "data": "test"}' }]
    };
    await handler(mockEvent);
    expect(KinesisClient.prototype.send).toHaveBeenCalledTimes(1);
  });
  it('should handle timeout errors gracefully', async () => {
    KinesisClient.prototype.send.mockRejectedValueOnce(new Error('TimeoutError'));
    await expect(processEvent('{"userId": 2}')).rejects.toThrow('TimeoutError');
  });
});

了解 AWS Lambda-Kinesis 集成中的超时错误

超时错误,例如 超时时间 AWS Lambda 函数中的功能通常会令人沮丧,尤其是在涉及与 Amazon Kinesis 进行数据流的集成时。在大多数情况下,这些错误是由于 Lambda 函数超出网络连接时间限制而发生的,通常是在 KinesisClient 要求。 Lambda 中的默认设置可能并不总是能够满足此类网络请求,特别是在处理高吞吐量流或大量数据时。例如,调整 connectTimeout 或者 maxRetries 配置可以帮助缓解此问题,让 Lambda 有更多时间尝试成功连接到 Kinesis。在网络延迟变化或高需求的场景中,这种优化通常是必要的。 🛠️

减少超时错误的另一个关键方面是有效管理数据编码和分区。 AWS Kinesis 需要二进制格式的数据,这可以通过 TextEncoder().encode()。此转换确保了 Kinesis 数据传输的兼容性和简化。此外,深思熟虑的分区键管理也至关重要。使用一致或动态生成的分区键有助于在 Kinesis 分片之间均匀分布数据,避免出现“热分片”,即接收不成比例的记录数量的分片。在高频流场景中,动态密钥可以防止瓶颈并降低连接问题的可能性,在处理大型数据集时特别有用。

为了排除故障并提高这些 Lambda-Kinesis 交互的可靠性,添加单元测试至关重要。单元测试允许您模拟潜在的网络问题、验证数据编码并确保函数可以正确处理重试。例如,通过嘲笑 KinesisClient 在单元测试中,您可以模拟 Kinesis 的一系列响应,例如 暂停 错误或成功案例,这有助于微调 Lambda 代码中的错误处理和连接管理。在开发过程中对此类错误情况进行测试可以提高部署的弹性,减少生产中超时的可能性,并更轻松地识别配置中的薄弱环节。

有关 AWS Lambda 和 Kinesis 超时问题的常见问题解答

  1. 是什么原因造成的 ETIMEDOUT 连接到 Kinesis 时 AWS Lambda 出现错误?
  2. 这些错误通常在 Lambda 连接到 Kinesis 所需时间过长时发生,通常是由于网络问题、连接超时设置或 Kinesis 流上的高流量造成的。
  3. 怎样才能调整 connectTimeout 有助于防止超时错误?
  4. 设置更高 connectTimeout 允许 Lambda 等待响应的时间更长,这在网络延迟较高或数据流量较大的情况下很有帮助。
  5. 为什么是 TextEncoder().encode() 此 Lambda 函数中使用的方法?
  6. Kinesis 要求数据采用二进制格式。这 TextEncoder().encode() 方法将 JSON 数据转换为所需的格式,使其能够被 Kinesis 正确处理。
  7. 在 Kinesis 中使用动态分区键有何重要性?
  8. 动态密钥在分片之间更均匀地分配记录,避免瓶颈并减少“热分片”的机会,“热分片”可能导致流问题。
  9. 单元测试可以模拟超时错误吗?
  10. 是的,通过嘲笑 KinesisClient 在测试环境中,您可以模拟超时错误以验证 Lambda 函数中的错误处理是否正常工作。
  11. 为什么这样做 Promise.allSettled()Promise.all() 表现不同?
  12. Promise.allSettled() 无论结果如何,都会等待所有承诺,这使其成为处理部分失败的多个请求的理想选择,这与 Promise.all(),在第一次失败时停止。
  13. Lambda 中的重试次数是否有限制?
  14. 是的, maxRetries 设置控制 Lambda 重试失败请求的次数,这可以减少网络负载,但应谨慎设置。
  15. 区域选择在减少超时方面发挥什么作用?
  16. 选择距离数据源较近的区域可以减少延迟,使与 Kinesis 的连接更快并且不易出现超时错误。
  17. 怎么样 Promise.allSettled() 协助处理 Lambda 错误?
  18. 它允许函数单独处理每个 Promise 结果,因此如果一个请求失败,其余请求仍会继续。这种方法有利于管理批量记录处理。
  19. Lambda 可以处理流数据的部分成功吗?
  20. 是的,使用 Promise.allSettled() 记录失败的记录使 Lambda 能够继续处理,即使某些记录遇到错误。

使用 AWS Lambda 和 Kinesis 克服常见挑战

对 Lambda 和 Kinesis 超时进行有效故障排除需要分析连接和配置问题。调整设置如 连接超时最大重试次数以及周到的分区密钥管理,有助于维护可靠的连接并防止常见的超时。通过这些策略,处理高吞吐量数据流变得更加顺畅。 🚀

通过了解如何处理错误和优化配置,开发人员可以解决发布到 KinesisLambda 函数中持续出现的 ETIMEDOUT 错误。遵循网络设置、编码和分区的最佳实践有助于提高数据管道的弹性和效率,确保更少的中断和更好的性能。

进一步阅读和参考资料
  1. 本文基于 AWS 文档中有关 Lambda 超时故障排除的见解: AWS Lambda 故障排除
  2. 有关管理 Kinesis 流连接的详细信息改编自 AWS 关于 Kinesis 最佳实践的指南: Amazon Kinesis 数据流最佳实践
  3. 对于 JavaScript SDK 的使用,AWS 提供了全面的文档,其中介绍了此处使用的示例: 适用于 JavaScript 的 AWS 开发工具包
  4. Mozilla 关于 JavaScript Promise 处理的 Web 文档中回顾了其他错误处理策略和异步处理技​​巧: 使用 Promise - MDN Web 文档