Astronomer Blog

Culture / Data Science / Dev / Growth

Why We Built Our Data Platform on AWS, and Why We Rebuilt It with Open Source

horizonB2x.jpg

Astronomer is a modern platform built to outfit organizations with a solid data infrastructure to support machine learning and analytical workloads. In other words, we help you organize, centralize and clean your data through a personalized data engineering experience. We exist because we believe the internet age is just a precursor to something much larger, something with the potential to push the world forward in the same ways the Agricultural and Industrial Revolutions did: the Data Revolution.

As you can imagine, preparing for a Revolution doesn’t happen overnight. As Astronomer’s CTO, I’m going to chronicle our journey so far, from a technical perspective, as we grow our platform and home in on how to meet our users’ real needs.

Switching Gears

Astronomer was born as a pivot away from another product called USERcycle, which gave customers visual insights into how their users behaved while interacting with their applications, over time. The largest obstacle we found ourselves fighting was actually getting our customers’ user data. That was a big problem because, without it, the product was useless.

We dug in and brainstormed ways to smooth this process out. While we did that, we uncovered new problems, and more importantly, found new opportunities to bring value to our customers. We became Astronomer to focus on solving our customers’ data problems, but at that point, we were still a work in progress.

Houston, We Have Liftoff

One of the first problems we tackled was getting clickstream data out of applications and into third party tools or a data warehouse in near real-time. This would allow our users to build live dashboards however they like, using the tools they already had or experimenting with multiple tools before deciding which one they wanted.

It turns out a nice chunk of the code required to put something like this together has already been open sourced. As an early stage startup with very limited resources, leveraging projects like analytics.js and similar open source libraries for other platforms was a no-brainer. These projects solved the issue of getting data out of applications, but we still had to implement the backend machinery to actually do something with it.

As we set out to build this first system in September 2015, we were simultaneously participating in the AngelPad accelerator Fall 2015 class. AngelPad is the top ranked accelerator in the world and comes with some pretty nice perks, like credits on Amazon and Google cloud platforms. Amazon and Google’s cloud platforms are known as Infrastructure as a Service (IaaS), and these services take a lot of low-level management out of the picture and let users focus on their specific applications. We were a bit strapped on humans at the time, so it was important for us to leverage the resources we did have to prove we could provide real value by the time Demo Day rolled around in November.

To do that, we had to start iterating, and quick. We threw together an embarrassingly ugly web application using Meteor and MongoDB as the user interface to our SaaS offering. This web app allows users to register their own applications, and configure the various integrations they want to use. For example, users could set their application up to forward events to Google Analytics, Mixpanel, MailChimp, Keen.io, and so on.

Now that we had a way for our users to set things up, we needed a reliable endpoint to get all this data into our system. Basically, we needed an HTTP API that could handle high velocity event ingestion, and a way to process these events. To reliably support this, we needed to ensure that every single event sent to us from our customers made it into the system. This means that our API had to be extremely stable.

For this, we leveraged Amazon’s API Gateway and AWS Lambda services. API Gateway is a fully managed service that makes it easy for developers to create, publish, maintain, monitor, and secure APIs at any scale. It stands in front of an API and can delegate requests to several types of backends, like AWS Lambda, which lets developers deploy code without thinking about the servers that it runs on, aka “serverless.” We defined our endpoints in API Gateway and mapped them to small JavaScript functions that ran on AWS Lambda. AWS Lambda has first class support for Node.js, so we were able to maintain a single language across our entire system, which was important for us at this stage.

Buffering...

Still, we needed to do something with all these events. In addition to broadcasting them to our users’ third-party integrations, we wanted to archive every event to Amazon S3, in case our users wanted to load their historical data into new tools down the road. We also wanted to create direct integrations into data warehouses, like Amazon Redshift. These are very different types of work and introduce a lot of potentially unreliable third parties into the system. We decided to decompose all of this work into Node.js microservices, each operating independently of each other. This would allow us to tailor each application to its specific needs, and give us the freedom to rapidly experiment, deploy, and iterate.

To enable something like this, we needed to introduce a buffer between our event ingestion API and the microservices that processed the events. Amazon Kinesis Streams fit this requirement perfectly. It allows users to build custom applications that process or analyze streaming data for specialized needs. Our API could simply validate the request and dump the payload onto an Amazon Kinesis Stream. Then our microservices could sit behind the stream and consume the ordered events independently at their own pace, checkpointing their place in the stream as they go. One application would microbatch events and dump them into S3 while another would broadcast data to our users' configured integrations, including a separate stream where our Amazon Redshift consumer application would feed.

When dealing with such high velocity data, milliseconds add up and you can run into scaling issues pretty quickly. Since our applications were all decoupled, we could deploy fixes and upgrades independently and keep moving. We added in-memory caches, and a Redis cache, via Amazon Elasticache between our applications and databases to keep things moving smoothly. By leveraging Node.js microservices and a slew of Amazon-managed services, we were able to build a scalable system to ingest and route clickstream data extremely quickly and securely. As we began to bring on new clients, we could iterate on new features and fixes with relative ease.

The Expansion of Our Universe

We soon learned, however, that while our customers loved getting clickstream data into their data warehouse, they needed more from us. Organizations typically want to combine that clickstream data with their existing data to get a more complete picture of how their organization is operating. This data could live in databases like Postgres, MySQL, or MongoDB, to name a few. Useful data could also exist in various SaaS products that the organization uses, like Salesforce, MailChimp, Slack, and thousands more. It became clear to us that we needed to build an ETL (Extract, Transform, Load) platform to support this kind of data movement. Again, we turned to our good friends—Amazon, open source, and JavaScript—to get this project going.

As we dug in, we discovered a few projects tackling this problem from some big names in the industry. We checked out Luigi from Spotify, Azkaban from LinkedIn, Airflow from Airbnb, and a few others. At the time, none of these projects perfectly fit our use case and raised questions about deployment and orchestration. We needed a system that could scale up to support thousands of organizations, each with hundreds of data flows that we could programmatically control and monitor.

After going back and forth, we eventually settled on building a small JavaScript wrapper library to let us run workflows on Amazon SWF (Simple Workflow Service). This library abstracted away all the back and forth communication involved with running workflows on SWF, as well as streaming datasets to and from S3. It wasn’t perfect but it got us moving, and we didn’t have time to waste. We figured that as long as we had a well-defined interface to run the tasks, we could swap SWF out for something else down the road.

This let us focus on writing JavaScript plugins to the system to take care of the various ETL tasks and opened the door for our community of users to write up custom sources, destinations or transform tasks using modern JavaScript, with very little effort. On the other hand, it also hardwired our platform to yet another Amazon service, which was becoming more and more of a concern as we talked about the idea of having an enterprise version of our platform that could run anywhere, even Mars.

Going Our Own Way

While we were starting to bring customers onto this new system, we were also in talks with another company about a possible acquisition. The acquisition didn’t end up happening, but we learned a ton. For example, Astronomer as a SaaS product is great, but we needed to be able to service enterprise clients by hosting the platform in their private clouds. Now that this was clear, we had no reasonable option but to reduce our IaaS reliance down to raw compute and storage on whichever cloud it was running on. By replacing Amazon services with open source alternatives, we could make our platform portable to any cloud.

We finally had a good reason to really dig in and think about what our ideal unified system would look like: it would be cross-infrastructure, secure, efficient, highly available, and self-healing. It needed to be able to execute long running processes, as well as spin up one-off processes, which could possibly even spin up specialized clusters of machines on the side. It needed to fit into our development team workflows, as well as the workflows of the wider community. Not only did it need to support developers but also provide an intuitive interface on top of all this for business users. Instead of looking at a list of Amazon services, and forcing our platform into their mold, we were able to freely think about what our system should look like.

Replacing Amazon infrastructure while migrating a live system sounded like a pretty big project, so we needed to break it into something smaller and achievable with this larger plan in mind as the ultimate goal. Our clickstream system was still running smoothly, but our ETL platform started to encounter some scaling issues. The infrastructure for running this type of work was less than ideal, because it had several single points of failure and provided very limited telemetry for debugging and performance monitoring. The system was currently running on Amazon’s Elastic Beanstalk, which is an easy way to deploy applications to EC2 virtual machines. Elastic Beanstalk takes care of load balancing, which is great but it was a bit too coarse-grained for the type of workloads we were dealing with. We needed to swap out SWF and Elastic Beanstalk for something a bit more tailored to this kind of workload, while requiring little to no changes to the code that actually executed the tasks. 

Rediscovering Airflow

At this point we saw that Airflow, Airbnb’s workflow management platform, had become an incubating project in the Apache Software Foundation. After looking back into the project, we discovered that now, a few months after we initially stumbled upon it, Airflow was looking pretty good! It was more stable, included a plugin system, and was actively being developed by its creators and a bunch of other smart people.

We continued down this path and started thinking about how we could run Airflow in production. Airflow can run in a development environment using SQLite and execute tasks locally, but is also designed to run in a distributed environment for production workloads. The three main processes involved in an Airflow system are the webserver for the UI, the scheduler, and the log server. Airflow also needs a MySQL or Postgres database to store its metadata. The scheduler sits at the heart of the system and is regularly querying the database, checking task dependencies, and scheduling tasks to execute… somewhere. The Airflow scheduler is great because it would allow us to execute a Directed Acyclic Graph (DAG) of tasks, rather than our current exclusively linear task workflows. This means we could construct much more complex and useful workflows than we could previously handle.

Airflow has a core concept called an “executor,” which is just a Python class that knows what to do with a command issued by the scheduler. The scheduler knows when it’s time to do something, and delegates an `airflow run` command to the executor module, which is responsible for actually “executing” the command. Airflow comes with several core executors and a few community-contributed executors, and allows users to plug in their own custom executors. One community-contributed executor was particularly interesting to us: the Mesos executor.

Apache Mesos is an open source project born out of UC Berkeley’s AMPLab as a “platform for fine-grained resource sharing in the data center.” Mesos can act as a distributed kernel, providing a system for applications, or “frameworks” in Mesos lingo, to distribute work across a cluster of machines. A highly available production system will typically have a quorum of 3 or 5 master nodes, and any number of agent nodes, where actual work takes place. Mesos agent nodes are constantly surveying their available resources, and forwarding that information to the leader node. The leader takes these “resource offers” and passes them on to registered frameworks.

A Mesos framework is just a little program that accepts resource offers and decides if it can schedule some amount of work out to one of these agent nodes. If the framework gets a resource offer with enough CPU, and memory available for the task, it can schedule a command to run on the agent. We ended up needing to fork Airflow’s built-in Mesos executor to execute Docker commands on our CoreOS agents, but I’ll save those details for a future post.

Let’s Live Forever

Mesos would allow us to build a cluster using a bunch of virtual machines living on any cloud and efficiently schedule our various tasks on these machines, wherever we had available resources. Airflow running on Mesos sounded like a pretty sweet deal, and checks a lot of boxes on our ideal system checklist, but there were still a few questions. Assuming we have a proper Mesos cluster to execute Airflow tasks on, we would need somewhere to run other tasks, like the Airflow webserver and the Airflow scheduler. We also needed to run a Postgres database for the Airflow system. It turns out the fine folks over at Mesosphere maintain an awesome project called Marathon that seemed like it could help us out here.

Marathon is a container orchestration platform that runs as a Mesos framework, which means it runs out on the cluster and accepts resource offers from the agent nodes. Using Marathon’s web interface or API, you can schedule long running processes to execute somewhere on the cluster. This means we could write up a JSON file describing all the programs we wanted to run, and continue to run them forever.

So we did. We wrote up a JSON file, which described an “application group” that we could fire off to the Marathon API running on our cluster. This was our way of telling Marathon that this group of tasks are all part of one system, and that dependencies exist between the different applications. For example, we don’t want to attempt to fire up the Airflow servers if the Postgres database is still starting up. Once all dependencies have been met, and all tasks are happily up and running, Marathon will make sure it stays that way.

You can think of Marathon as a distributed init system, kind of like systemd or upstart. It will monitor your long running processes, and if they exit unexpectedly, it will automatically start it back up somewhere on the cluster. An entire agent node could go offline, and Marathon will ensure that all tasks it had running on that node are placed somewhere else on the cluster! This might sound familiar if you’ve heard anything about Google’s container management project, Kubernetes. Marathon and Kubernetes definitely have some overlap, but the small differences matter. It turns out that you can also run Kubernetes as a Mesos framework - awesome! In the future, when/if our needs change, we should be able to fairly easily swap Marathon out for Kubernetes while leaving our Mesos cluster and containerized applications unchanged. All these different layers of abstraction might seem daunting up front, but they really enable a scalable system with parts that can change independently of the rest of the system, saving us time and letting us iterate to meet our customers' needs.

Dude, Where’s My Database Server?

At this point, we had every application, including Postgres, packaged up in Docker images, ready to be placed somewhere on the cluster. Now we needed a way for these applications to talk to each other. If a container can be placed anywhere on the cluster, and be killed and replaced at any time, how do your applications keep track of where it is? For instance, if our database crashes and gets moved somewhere else, we need a way for the Airflow applications to reconnect.

In distributed systems, this is known as “service discovery,” and just means that we need a system for our applications to register themselves so other applications can look up where to find them. There are a lot of ways to implement service discovery, using completely different techniques, each with their own pros and cons. We decided to use another open source project, Mesos-DNS, for its deep integration with Marathon and Mesos. Mesos-DNS is basically a cron job and HAProxy. The cron job polls the Mesos leader node for running tasks, and grabs the registered IP:PORT mappings. With this data, it runs a script to update its HAProxy DNS resolution configuration. This system would allow us to pass environment variables with friendly host names to address the database. For example, our Airflow applications can be pointed to `postgres-airflow.marathon.mesos` using an environment variable, and it will always work, no matter what host and port it’s listening on. Our applications could now talk to each other, but we still had one more major unsolved problem.

Finding a Permanent Home for Our Data

If our database container could be killed and moved to a different node at any moment, what would happen to its data? We could connect to it, but the data would be gone, because an entirely new container would be created. Although the container would be based on the same image, it would be a brand new environment.

One way to tackle this issue using Docker would be to map the container’s Postgres data directory to a directory on the host system. In this configuration, you could fire up a container, add some data to the database, kill it and recreate a new container with the same `docker run` command, and your data will persist. This is slightly better, but it doesn't really help us out since our container could end up being recreated on an entirely different host system.

Luckily, Docker 1.8 added support for Docker volume drivers, which allow developers to plug in custom drivers for mounting volumes within the container. As with everything, there were a lot of options. This is a pretty new way to do things, and there are differing schools of thought, with no clear “winner” yet. We ended up using REX-Ray from EMC {code}. REX-Ray seemed pretty stable compared to some other options and was relatively straightforward to setup and test. It also supported multiple cloud platforms, like Amazon EBS, for its backing block storage. We just needed to install REX-Ray and the Docker Volume Driver Isolator CLI onto our CoreOS Mesos agents.

With this infrastructure in place and a few more tweaks, we could modify our Docker configuration in our Marathon application definition to tell the Mesos Docker containerizer to mount volumes using REX-Ray. Now our container could be started, stopped, moved around, and no matter where it’s placed on the cluster, will always mount the EBS volume, and the data remains! This also means that our database would be replicated within its Availability Zone for increased durability automatically.

Just the Beginning

This new system gives a very solid platform on which we can build our business. It gives us much more control of the types of things we can do, which will ultimately benefit our customers. We can focus our team on building more and better integrations, and even new types of integrations. We have begun the process of transferring our ETL workload over and will follow that up by migrating our clickstream microservices into this system. This is just the beginning, and we are pretty obsessed with finding new ways to help our customers.

I’ll be going into deeper details on the things touched on in this post in future posts so check back if these things sound interesting! If you’re really interested, let me know! We’re hiring!

Topics: big data startups dev