当前位置:  开发笔记 > 大数据 > 正文

如何使用lambda函数处理SQS队列(不是通过预定事件)?

如何解决《如何使用lambda函数处理SQS队列(不是通过预定事件)?》经验,为你挑选了5个好方法。

这是我正在尝试的简化方案:

http请求 - >(网关API + lambda A) - > SQS - >(lambda B ?????) - > DynamoDB

所以它应该如图所示:来自许多http请求的数据(例如,每秒高达500)被我的lambda函数A放入SQS队列.然后另一个函数B处理队列:最多读取10个项目(在某些期刊的基础上)并使用BatchWriteItem将它们写入DynamoDB.

问题是我无法弄清楚如何触发第二个lambda函数.它应该被频繁调用,每秒多次(或至少每秒一次),因为我需要队列中的所有数据进入DynamoDB ASAP(这就是为什么通过这里描述的预定事件调用lambda函数B 不是一个选项)


为什么我不想在没有SQS的情况下直接写入DynamoDB?

那对我来说完全避免使用SQS会很棒.我试图用SQS解决的问题是DynamoDB限制.甚至没有限制自己,而是在使用AWS SDK将数据写入DynamoDB时处理它的方式:当逐个编写记录并限制它们时,AWS SDK会静默重试写入,导致从http客户端的点开始增加请求处理时间视图.

所以我想暂时将数据存储在队列中,将响应"200 OK"发送回客户端,然后通过单独的函数处理队列,使用一个DynamoDB的BatchWriteItem调用写入多个记录(返回未处理的项目而不是自动重试)限制).我甚至宁愿丢失一些记录,而不是增加接收和存储在DynamoDB中的记录之间的延迟

UPD:如果有人感兴趣,我已经找到了如何在节流的情况下使aws-sdk跳过自动重试:有一个特殊参数maxRetries.无论如何,将按照以下建议使用Kinesis



1> Eric Hammond..:

[这并没有直接回答你的明确问题,所以根据我的经验,它会被贬低:)但是,我会回答你试图解决的根本问题.

我们采用大量传入请求并将其提供给AWS Lambda函数以便以节奏的方式写入DynamoDB的方式是使用Amazon Kinesis流替换建议架构中的SQS.

Kinesis流可以驱动AWS Lambda功能.

Kinesis流保证为任何给定密钥排序所传递的消息(对于有序数据库操作很好).

通过Kinesis流,您可以指定可以并行运行的AWS Lambda函数(每个分区一个),这可以与您的DynamoDB写入容量协调.

Kinesis流可以在一个AWS Lambda函数调用中传递多个可用消息,从而允许进一步优化.

注意:实际上从Amazon Kinesis流中读取的AWS Lambda服务然后调用该函数,而不是直接调用AWS Lambda的Kinesis流; 但有时可以更容易想象出Kinesis驾驶它.对用户的结果几乎相同.


问题是Kinesis流没有SQS每条消息的"至少一次"保证的高可用性.例如,在处理大数据时在消防站中丢失一些消息是可以接受的,但在某些应用程序中丢失单个请求是不可接受的.
这很棒.我一直遇到SNS限制从Spark群集到S3再到Lambda的巨大问题(因为AWS将我们的Lambda函数从100增加到6,000!)并且一直在尝试限制我们传递消息的速率.你使用Lambda和Kinesis的解决方案似乎正是我正在寻找的.
AWS定价总是在变化(向下),但我对上面的成本评论感到困惑.对于大多数用途,如讨论的那些,亚马逊Kinesis应该比亚马逊SQS明显便宜.

2> Chris Frankl..:

不幸的是,你无法直接集成SQS和Lambda.但是不要担心太多.有一个解决方案!您需要在混合中添加另一个亚马逊服务,您的所有问题都将得到解决.

http requests --> (Gateway API + lambda A) --> SQS + SNS --> lambda B --> DynamoDB

您可以触发第二个lambda服务的SNS通知以启动它.启动后,它可以排空队列并将所有结果写入DynamoDB.为了更好地理解Lambda的可能事件源,请查看这些文档.


我没有说它可以.我建议lambda触发SNS,同时将消息放入SQS.因为问题是如何使用lambda处理SQS中的事件我不认为我必须重申这一点.

3> Trenton..:

截至2018年6月28日,您现在可以使用SQS本机触发AWS Lambda函数.不再需要解决方法!

https://aws.amazon.com/blogs/aws/aws-lambda-adds-amazon-simple-queue-service-to-supported-event-sources/


目前,Lambda触发器只能使用标准队列,而不能使用FIFO队列.

4> loopingz..:

另一种解决方案是将项添加到SQS,使用Event调用目标Lambda函数,使其成为异步.

然后,异步Lambda可以从SQS获取任意数量的项目并进行处理.

我还会向异步Lambda添加一个调度调用来处理队列中出错的任何项目.

[更新]您现在可以在队列上的新消息上设置Lambda触发器



5> Denis Mysenk..:

也许一个更具成本效益的解决方案是将所有内容保存在SQS中(原样),然后运行调度事件,该事件调用处理队列中项目的多线程Lambda函数?

这样,您的队列工作人员可以完全匹配您的限制.如果队列为空,则函数可以提前完成或在单线程中开始轮询.

对于这种情况,Kinesis听起来像是一种过度杀戮 - 例如,你不需要原始订单.另外同时运行多个Lambdas肯定比仅运行一个多线程Lambda更昂贵.

您的Lambda将全部关于I/O,对AWS服务进行外部调用,因此一个函数可能非常适合.

推荐阅读
郑谊099_448
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有