Accelerating Machine Learning Pipelines with AWS Step Functions

8 minute read

August 2022

by cybergrx

“Work it harder, make it better. Do it faster, makes us stronger. More than ever, hour after hour. Work is never over.” – Daft Punk from “Harder, Better, Faster, Stronger.”

Daft Punk must’ve had CyberGRX in mind when writing that song. Let me explain. 

CyberGRX was founded in 2015 to facilitate sharing Third-Party Cyber Risk (TPCRM) data between companies. Part of that journey has been launching our predictive assessment capability, where we can ingest a vendor onto our platform and predict how they will answer a risk assessment with a high degree of confidence. 

Our goal is to provide our customers with timely data on third-party vendors. A crucial part of our model is processing every company in our platform through the predictive analytics system. This data needs to be provided in a timely fashion, and after several iterations, AWS Step Functions with AWS Lambda were selected as the technologies to accomplish the processing.

Challenges to Overcome

As we approached this project, we had a few obstacles to overcome. Namely:

  1. How do we convert our existing Docker, Kubernetes, and Argo Workflows based process into AWS Step and Lambda with minimal changes to the code?
  2. How do we solve the singular case, the subset case, and the full Exchange case for processing?
  3. Where are the scaling bottlenecks that need to be overcome? There are currently over 150,000 companies in our Exchange, growing constantly.

Challenge 1: Building the Containers

Our team had an existing build process based on AWS Codebuild for generating Docker artifacts that are then orchestrated on EKS. The first iteration of this process ran on this platform, but we quickly ran up against the limits of what Kubernetes and Argo Workflows could handle effectively. We were limited by the number of items that could be processed, had to deal with many errors, and the process required a lot of babysitting to finish. To overcome this, we wanted to adapt the current process with minimal changes to our current build process. It turns out that adjusting the code was relatively straightforward. Our first implementation was geared towards Argo Workflows and using input/output files on disk:

 

Since Lambda provides an input, we decided it would be best to just pass the parameters directly to Lambda as an event and process it that way. Since the input files exceed the 256Kb limit in Step function parameters, we store them in S3 and pass references to them so each step can pull the data from S3 and process it. This provides a means for us to review the inputs and persistence if the process fails. Pulling the data this way required minimal code changes as we’re still reading files and only changing the location. Note that this was somewhat fortuitous as there are limits to the size of files you can pass between steps in step functions, and some of the inputs are quite large. As it turns out, setting this up was pretty straightforward:

 

 

Now that we could handle a Lambda using an almost identical code path, we needed to update the build process to generate a container that Lambda understands. Again, this was pretty easy to do. We are using Docker to generate our artifacts and just needed to add another build step:

 

 

 

When using an image with Lambda it lets you override the COMMAND parameter.  We just passed in “predictive-service.api.lambda.generate_sample_block_handler” and it worked.  Rinse and repeat for the remaining services we need to use in this process.

Challenge 2: Creating a Cohesive Input

Creating a cohesive input was more of a conceptual problem. We needed this system to be able to compute predictive data when a company was added or changed in the Exchange and when we updated the model. The former was a singular case or a small batch, and the latter was every company currently on our platform. These events are distinct, and we settled on using SQS to drive the input to the pipeline. This lets us add each company to the system as a discrete event and acts as a buffer for processing as many as it can handle. It also provides a cohesive input to the predictive engine; rather than having separate invocation paths, we now have a single place to drop inputs to start the process. Initially, we attempted to use Kinesis but realized that the semantics of Kinesis with shards, streams, and order guarantees were more overhead that we didn’t really need to deal with. Which led to the final challenge, how do we get this thing to scale?

Challenge 3: Scalability

Tuning all the moving parts for scalability was the final and most interesting hurdle. At this point, everything is wired up and running. Submitting a job fires the pipeline and runs it to completion. The following diagram shows how data traverses the pipeline:

 

 

Each step generates an S3 object and passes a reference to that object into the next step. Finally, the objects are loaded, processed, and the results pushed to S3 for further processing. This process runs pretty well for a single unit of work; it’s when you want to run thousands of them at a time that it starts to fall apart.

When you open an AWS account, you are given a maximum concurrency of 1,000 running Lambda functions. Thinking about the whole use case of 150,000 companies, even if we could run 1,000 at a time, it would take around 24 hours to process the entire pipeline. That doesn’t seem so bad except that the Exchange is growing every day, and as we add companies, that would get longer and longer. We want this to be fast enough to run in a relatively short time to give us room to grow.

The first parameter we tuned was asking for more concurrency. We asked for and received a maximum concurrency of 20,000. To test this, we ran 21,000 jobs through the system and immediately discovered several problems. First, you’re limited to a burst rate of 3,000 requests across your account, adding 500 per minute after that. This had some side effects on the rest of our system, with other Lambda functions struggling to start while we ran this. Definitely not a good thing; other critical business Lambda functions need to run. The other problem we discovered was that jobs were not being re-queued when they failed once they passed the Fanout Transform function. So when they failed, the job was dropped entirely and lost.

To solve the problem of side effects, we went through the other functions in the system and gave them reserved concurrency and at least one provisioned concurrency. That ensured that they could still run while this system was processing a large job. So, one problem is solved: how do we ensure messages are not lost and keep the pipe full while it ramps up the concurrency?

It turns out that it takes attacking the concurrency issues from multiple angles. The first one we realized is that the internal state of the step function in the figure above makes it almost impossible to re-queue jobs once they make it that far. Although initially, we had the first step run a parallel map of 10 (the maximum) separate pipes, we reduced that to three to slow down the parallelism for each step execution. This lets the system have more work in flight while at the same time using as much parallelism as we could. After adjusting the parallelism, we updated the Step pipeline itself to catch all Lambda-related errors and retry steps if they failed with very generous amounts of retry in the pipeline:

 

 

 

At this point, we could ensure that once a step execution was started, it should finish. We now needed to figure out how to create step executions in such a way to keep about 19,000 Lambdas running at a time (something we still haven’t actually achieved, but we’re getting closer). The number 19,000 was chosen rather than 20,000, so the original account limit of 1,000 was reserved for other processes.

Now we’re close to having it work at the scale we need. There’s just one more thing we need to put in place. We need logic in the Fanout function that understands what is going on and the limits to ensure that the Step functions stay full while simultaneously ensuring that messages aren’t lost. It turns out that was a relatively simple bit of code. For example, we could query the Step API for how many Step executions were currently in flight, multiply that by 3 for the maximum potential concurrency, then subtract that from the actual maximum of 19,000. If that number is greater than 3, we have room to start another; otherwise, re-queue the message and delay for 30 seconds before retrying:

 

 

All of this looks good so far; how does it look in practice? Are we getting the concurrency we want? It turns out that we’re getting pretty close out of the gate. Initially, we hit a bottleneck that you can see around the 13:30 mark in the next graph. Then, we tuned the producer Lambda to give it more reserved concurrent executions, and around the 14:10 mark, the system took off.

 

 

 

Conclusion

Once all of this was tied together, it worked very well. There’s still some tuning to keep the pipe full, such as better leveraging provisioned concurrency and using target tracking policies to preemptively start scaling up the functions, but we’re happy with the results. The first full run took about 95 minutes which is far faster than the 5 days of our old system. It could only use about 80% of the total capacity, so we expect better numbers in the future with a little more parameter tuning. Best of all, it solves every use-case we have and runs without babysitting.

Updating the model is a simple process of just filling SQS with all the data we need to process and letting it work. The door is also now open for our team to build a parallel pipe to speed up the development of new models. Overall, we’re pleased with this first iteration of our new machine learning pipeline.

Charles Burton is the Linux/Unix wizard in residence at CyberGRX.  In his role, he designs and builds most of the infrastructure for the CyberGRX analytics team.  In addition, he is also a strong advocate for Infrastructure as Code, building systems that look to the horizon of what needs to be accomplished now and in the future.

 

If you’d like to see the CyberGRX Exchange in action and how it can improve your TPCRM, we invite you to book a demo now.

Book Your Demo

Related Articles

About Us

ProcessUnity is a leading provider of cloud-based applications for risk and compliance management. The company’s software as a service (SaaS) platform gives organizations the control to assess, measure, and mitigate risk and to ensure the optimal performance of key business processes. ProcessUnity’s flagship solution, ProcessUnity Vendor Risk Management, protects companies and their brands by reducing risks from third-party vendors and suppliers. ProcessUnity helps customers effectively and efficiently assess and monitor both new and existing vendors – from initial due diligence and onboarding through termination. Headquartered outside of Boston, Massachusetts, ProcessUnity is used by the world’s leading financial service firms and commercial enterprises. For more information, visit www.processunity.com.