I worry about inference a lot. This is the term that covers the process that makes predictions from a machine learning model(s). While working on customer projects, I come across different types of inference:
- High throughput and low latency (real time)
- Low throughput and low latency (real time)
- Periodic processing of high volumes (batch)
- Periodic processing of small volumes (micro batch)
In 2019, I wrote this blog (machine learning performance is more than skin deep) about the high throughput “real time” option, and it consistently gets thousands of views a month and it is normally the top hit in Google after the AWS content in this area. I am very proud of that. However, for most projects I work on, batch or micro batch are far more common as inference types. These processes require an orchestrator; I normally go serverless and use AWS Step Functions. My love for AWS Step Functions can be seen here (synchronous express workflows in real-life!) and here (a model is for life, not just for Christmas!).
However, some of our customers prefer to choose alternatives, and one very popular alternative is the open-source orchestrator Apache Airflow. In this blog, I will look at ‘Amazon Managed Workflows for Apache Airflow’, the AWS Managed Service Airflow offering and how we used it on an extreme use case that I was recently involved in.
Background:
The use case required the execution of nearly 5000 models, which is an extreme amount by any standard. Each model was a simple linear regressor to forecast product demand that had to be trained on historical data and then predict the next month’s demand. The overall process would need to happen at the end of each month and be completed in around 3 hrs.
Analysis indicated that most models took 10 mins to train and predict but some product lines took up to 1.5 hours. To meet the time constraints and reliability requirements, we needed to run as many product combinations in parallel as possible, which meant we’d need to run thousands of models at once.
The Architecture:
The client had already chosen Apache Airflow as their preferred orchestrator and had selected Amazon Managed Workflows for Apache Airflow (let’s call it MWAA for short) to host the workload. MWAA is like RDS MySQL, Managed Kafka or OpenSearch from Amazon. This is where AWS take an open-source offering, then commercialise it for you. In this case, this includes maintaining, patching, and scaling Airflow, managing the underlying EC2 instances and providing commercial support.
The trade-off is that there is a lot less configuration that you can customise and MWAA only supports certain Airflow versions: 2.2, 2.0 and 1.0. For this post, I’ll focus on version 2.0.
So, let’s come back to the customer use case. The architecture of the solution in Diagram 1 required Airflow to orchestrate between three components:
- Run a PySpark job for data preparation in Databricks (hosted on EC2) for the 5000 models.
- Queue 5000 jobs in AWS Batch. Each AWS Batch job was an execution of a Docker Container. The container would train the model and the predict the next month’s sales.
- Run another PySpark job for data reporting in Databricks to merge the results of the 5000 models into a consolidated report.

I will focus on Airflow and AWS Batch. To run containers, AWS Batch uses an ECS cluster and in our case, the ECS cluster uses EC2 instances (not Fargate). The capacity was managed by AWS Batch as part of its compute environment. AWS Batch scaled the number of instances based on the number of jobs in the queue. So for this use case, it had to run the 5000 containers, with each container being a job.
For Airflow to be able to submit and track jobs in AWS Batch, we needed to use a Provider. A Provider allows Airflow to invoke the APIs of another component and track their progress. Apache offers official Providers for AWS that cover all it services (apache-airflow-providers-amazon), as well as for Databricks (apache-airflow-providers-databricks). Providers are Python packages that need to be stated in a requirements.txt file, which is then loaded from S3 at start-up of the MWAA so that Airflow installs the associated Python packages.
The DAG
In Airflow, workflows are called directed acyclic graphs, or DAGs – ‘directed’ meaning each vertex in the graph is connected by an edge, which has a defined direction, and ‘acyclic’ meaning there are no cycles (i.e. loops) in the graph. In our case, the DAG defines the relationship between tasks (each task being a unit of work). Normally a DAG would contain a small number of tasks, something like this:

Each step is defined in a Python file, like this:
@dag(
schedule_interval=None,
start_date=days_ago(2),
tags=["Inawisdom"],
)
def simple_dag():
start_task = DummyOperator(
task_id=f"Start"
)
end_task = DummyOperator(
task_id=f"End"
)
task_a = AwsBatchOperator(
task_id="Task A",
job_name="Task A",
job_queue="iaw-airflow-test-queue",
job_definition="IAWBatchJobDefinitionCA-49f5321cd86b5a1",
overrides={}
)
task_b = AwsBatchOperator(
task_id="Task A",
job_name="Task A",
job_queue="iaw-airflow-test-queue",
job_definition="IAWBatchJobDefinitionCA-49f5321cd86b5a1",
overrides={}
)
task_c = AwsBatchOperator(
task_id="Task A",
job_name="Task A",
job_queue="iaw-airflow-test-queue",
job_definition="IAWBatchJobDefinitionCA-49f5321cd86b5a1",
overrides={}
)
start_task >> task_a >> task_b >> end_task
start_task >> task_a >> task_c >> end_task
# This is needed for dag to be refreshed correctly. do not uncomment below
dag_flow = simple_dag()
In our case, however, we needed to define 5000 tasks in a DAG. To do this we had to use a Dynamic DAG, which is a term for a DAG that changes based on its input, before it is executed. To create a Dynamic DAG, firstly, we had to define each product combination – including its hyperparameters and the location of its dataset in S3 – in a JSON file.
Next, from within the DAG, using this JSON file, we created a TaskGroup and for each product combination, we passed its configuration to the AWS Batch Operator. This resulted in a task being created for each product combination and the DAG that looked like this:

And a Python file that looked a little like this:
def job(task_id): return AwsBatchOperator( task_id=task_id, job_name=task_id, job_queue="iaw-airflow-test-queue", job_definition="IAWBatchJobDefinitionCA-49f5321cd86b5a1", overrides={} ) @dag( # default_args=args, schedule_interval=None, start_date=days_ago(2), tags=["Inawisdom"], ) def test_batch_dag(): # job_def_revision = args['params']['job_def_revision'] start_task = DummyOperator( task_id=f"Start" ) end_task = DummyOperator( task_id=f"End" ) with TaskGroup("GroupA") as group_a: _ = [ job( task_id=f"GroupA-{idx}" ) for idx in range(100) ] with TaskGroup("GroupB") as group_b: _ = [ job( task_id=f"GroupB-{idx}" ) for idx in range(100) ] with TaskGroup("GroupC") as group_c: _ = [ job( task_id=f"GroupC-{idx}" ) for idx in range(100) ] start_task >> group_a >> group_b >> end_task start_task >> group_a >> group_c >> end_task # This is needed for dag to be refreshed correctly. do not uncomment below dag_flow = test_batch_dag()
We then ran the DAG and watched as the jobs got queued in AWS Batch. AWS Batch started several EC2 instances to cope with the jobs; once the instances came online, the jobs started to be run as containers on the ECS cluster and removed from the queue in AWS Batch… and then bang! Strange things started to happen.
The main issue we encountered was that Airflow would say that it had scheduled a task, but no job would be in the AWS Batch queue and no entries would be in Airflow’s CloudWatch logs for the task. Therefore, we looked at the scheduler’s log – the scheduler in Airflow controls and distribute tasks across worker nodes. For the tasks that failed, we would see the following entry in the scheduler logs:
*** Reading remote log from Cloudwatch log_group: airflow-iaw-airflow-test2y-env-Task log_stream: test_batch_dag/GroupA.GroupA-84/2022-08-24T10_18_08.423660+00_00/1.log.
Could not read remote logs from log_group: airflow-iaw-airflow-test2y-env-Task log_stream: test_batch_dag/GroupA.GroupA-84/2022-08-24T10_18_08.423660+00_00/1.log.
And:
[[34m2022-08-24 10:23:22,077[0m] {{[34mscheduler_job.py:[0m1239}} ERROR[0m – Executor reports task instance <TaskInstance: test_batch_dag.GroupA.GroupA-84 2022-08-24 10:18:08.423660+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
We ran the DAG a few times and every time we would see a different number of instances of the same error, and it would affect different product combinations. So, we contacted AWS Support, and they advised us to do three things:
- Split the DAG up into multiple DAGs, as there is a limit of 255 tasks per DAG
- Make sure the requirements file where the Providers are defined contained the versions that Airflow supports and used a constraints file
- Make sure the latest maintenance was applied to MWAA
The revised DAG
Luckily the product combinations contain 5 geolocations and 5 product groups, so 25 DAGs with 200 Tasks each. Therefore, we took some additional steps:
- We created an overall DAG that triggered the 25 DAGs; each sub-DAG is triggered in parallel and with the wait_for_completion=True set on the operator in the triggering DAG. This made sure that the triggering DAG waited until all tasks where complete.
- We made a set of centralised Python files containing all shared logic that each DAG used. This simplified the logic in each DAG to just parameters needed for each set of tasks.
- We then wrote a generator that took the 5 geolocations and 5 product groups and created the 25 DAGS from a template. We just had to be careful around the @DAG annotation, as if this was in the template, then Airflow would think that it was a DAG and fail to deploy.
- We modified our CI/CD process to run the generator as the code was “built”
Once these changes had been completed, we then ran the revised DAGs a few times. However, little changed – again, every time we would see a different number of instances of the same error. We made a few interesting observations, though. Firstly, the number of tasks did not surpass the number of slots in the execution pool (the amount of tasks Airflow can handle at once). In fact, it barely touched it, using just 200-300 slots of 9000 in the pool. Here is the graph in CloudWatch that shows the amount used vs the amount available, as it appeared in our QA environment:

The other thing we observed is that AWS Batch would also only process 200-300 jobs at a time – as the CloudWatch chart shows, the number of submissions to AWS Batch was limited to 200-300.
This meant something was limiting us, so let’s look at the service limits for AWS Batch (https://docs.aws.amazon.com/batch/latest/userguide/service_limits.html). The main two limits that might impact us are:
Maximum number of jobs in SUBMITTED state | 1000000 |
Maximum number of transactions per second (TPS) for each account for SubmitJob operations | 50 |
However, 200-300 jobs a minute is lot less than both. Then, we found another limit – the celery.worker_autoscale (see https://docs.aws.amazon.com/mwaa/latest/userguide/best-practices-tuning.html for more info). The defaults here are max limits, so we have limit of 20 tasks per worker for the large instance type and we had a maximum of 12 workers configured (AWS does support up to 25 workers). That meant we had a max of 240 concurrent “running” tasks for the cluster. This is very wasteful, as we are not really using the resources that Airflow has available, as the processing is done in AWS Batch. It would be good to have a higher maximum threshold from AWS for the worker_autoscale for such scenarios as this use case.
The Final DAG
At this point we were running out of time, so we took some more steps:
- We modified the overall DAG to trigger 5 batches of DAGs at a time, so that we only tried to start 1000 tasks simultaneously overall.
- For the reporting stage, we added the trigger_rule = all_done to the operator within the overall DAG that called the reporting DAG. This meant reporting would always run, even if any of the 5000 tasks failed.
- We optimised the batches in the overall DAG, so that tasks which took a long time were started first. This meant we could still complete the overall process within the time allowed.
As a result of taking these steps we were able run the process successfully and reduced the failures of the underlying issues. Here you can see the final successful run:

Conclusion
This use case proves that when it comes to delivery of business value from Machine Learning, producing a pilot model is not always as straightforward as you’d expect. When you need to operate regularly and at scale, lots of attention is needed on how to realise the value.
An orchestrator is a critical tool in the box to achieving this. Also, being able to do detailed analysis and optimisations within an orchestrator will be required to tune it to your needs. Our view is that both Apache Airflow and AWS StepFunctions on AWS work well for complex batch processes, and both are very good orchestrators. However, there are some key differences:
- Airflow has lots of providers from lots of technology vendors. StepFunctions does not have this range but does have a better native AWS integration.
- Managed Airflow is billed per unit of CPU time, it will scale instances up and down to suit your load, but there will be idle time that you pay for. StepFunctions has a pay per execution price model and no idle time.
- Airflow uses an acyclic graph, so the task and dependencies must be created before running them. This means it’s harder to change them at runtime compared to StepFunctions.
- Managed Airflow requires maintenance and for dependencies to be installed. StepFunctions does not.
I hope you found this useful and if you have any questions or feedback, then please do reach out in the comments.
And lastly, a quick shout-out. I mention ‘we’ a lot in this blog – in fact, it was my colleague John Root and myself, who were heads down on this problem for a good number of weeks. I would like to thank John for his contribution, expertise, and hard work – thanks John!
Hi Phil, Thanks for his article. We have just encountered the same issue with running hundreds of tasks in a TaskGroup in Airflow. It just fails randomally some tasks.
I loved the first part with the code examples.
I will you also included the final dag’s code as well.