Articles, Blog

Mercedes Coyle – Build Serverless Realtime Data Pipelines with Python and AWS Lambda – PyCon 2016

December 5, 2019


[applause] So, hi everybody. I hope you can
see me behind this gigantic podium. I am a vertically
challenged individual, so I will try and pop out
from time to time. Again, my name is Mercedes. For those of you who don’t know me
in real life or on Twitter, I’m also known as
the crazy chicken lady because I own a farm
and I raise chickens. So If you like tweets
about chickens and ARC servers and architecture,
you should follow me on Twitter. So when I’m not doing that,
I’m wrangling data and servers for Scripps Lifestyle Networks, and today I’m going to talk
to you about my experience building a data pipeline
with serverless architecture. So, here’s what I’m
going to cover today. I’m going give you
a high level overview of our existing data pipeline, or the old data pipeline at SLN, and that’s going to go over
the system architecture, what we learned from it, and what the requirements
for building the new system are. Then I’ll go a bit more in depth
on serverless components. We chose to build our new system. We used API Gateway, which is a
fully managed API service from AWS; Kinesis Streams, which is
basically a big queuing system or document storer;
Lambda is written in Python; and EMR, which is
a managed Hadoop platform. Maybe my slides
will advance correctly. Oh no. So, our use case. We’re an online
video syndication platform. We have about
two to three million streams or events generated per day. Our data pipeline
supports analytics reporting, video recommendations,
and development and testing, and ad targeting,
so we have a lot of data needs. Some of this reporting
is needed daily, while other insights are needed
hourly or immediately. So, what does “realtime” mean? A system is considered realtime
if it ingests and processes individual events
as soon as it receives them. Batching or holding events for
later processing isn’t realtime, even if the batches are frequent. So essentially, a realtime system
has the ability to receive, process, and respond to a request
or a piece of data on demand. Data can only go so fast
over the wire, and more components
in the system increase latency. So this is a big, ugly diagram of what our old data system
looked like. It’s actually not terrible,
but there are some things that didn’t work so great about it, but it supported about
a hundred million events per day. So, websites host our video player, and we provide content
and advertising. Behavioral data is generated
when an event happens in the player, whether it be an ad impression,
a video start, pause, etc., and that data is sent to us. The edge, which is the NGINX web
server on the very left hand-side, is basically just an NGINX web
server that receives the events. It’s a JSON payload
and an HTTP post request. It then logs that to a file;
that file is then rotated out and uploaded to S3 every hour. At the same time,
data is sent to a socket, which is read by our syslog, and that forwards it
to our realtime system, which is basically an ELK cluster, so it’s Elasticsearch,
Logstash, and Kibana. We don’t do a lot reporting
on that, but that’s what we use for ad hoc analytics and some —
the developers use it to kind of look at data
as it comes in in real time. So, after the current hour’s data
from every web server is uploaded to S3,
it’s then ingested into Hadoop via an Oozie in a batch. And It takes about
30 to 45 minutes to process and do reporting
based on that data, so it can take up to two hours
from the time an event was generated to the time it actually
updates a metric or a report. So, what do we learn
from this system besides it was a little bit
of a swamp of sadness in terms of maintaining it
and giving us answers, it didn’t always give us
the correct information. So we would like to be able to run
realtime continuous reporting, and we can’t easily do that
with this system, being that it’s basically a batch. The ELK component didn’t — it wasn’t as easy
to transform our Hadoop queries into things we could run
on the Elasticsearch cluster, and it was just a little
too much load for what we had, and we didn’t have
the bandwidth to scale that up. So we are looking at running
realtime continuous reporting in a different manner. The other problem we ran into
is scheduled jobs aren’t always aware
of the data source. So they might run
and the data might not be there and they succeed and they’re like,
“Great, we have reporting,” and we look at it and we’re like,
“That doesn’t look right, “that’s not accurate at all,”
and we’re actually missing data, so that’s great. Relying on logging to disk
as a means of data collection is really prone to failure.
So we’re thrashing disk IO. We’re thrashing disk IO
every time we receive an event. We’re also thrashing it
every time we’re uploading and rotating that file out to S3 because we’re making a copy of it
and it’s many gigs of data, so that was affecting latency
in our system with NGINX. So we want to move away from that. We also have to do
a lot of data cleanup. NGINX has a security feature
that substitutes hex encoding for quotes in the JSON payload,
which breaks the JSON, and so we have to go back
and parse and clean that up later. So there are a lot of steps to
getting data in the right format, and also getting it
to the right place, and this kind of
complex chain of events was costing us a lot of overhead
in terms of maintenance and also engineering time
to be able to develop new reporting
based on this system. So what do we need to do
to go serverless? Or really, what the question is is:
what do we need for a new system? Our new system needs to allow
for streaming analytics, and we need to reduce
the system complexity. So, any new system should reduce
the number of services the data has to go through
in order to reach its destination, and those services need to be
independent of one another. So, the old system
was also running several services
off one type of box, which was the NGINX web server, and then routing it to a chain
of load balancers, Logstash, our syslog, Elasticsearch,
Kibana, etc., etc., so we would get latency delay
in the system. Often we would get JVM “out of memory”
issues from Elasticsearch, and that would cause a backup
in the entire chain. Our biggest requirement
is that we need to log data and store it in S3. That was kind of the easiest
thing to build around since it was going to be able
to allow us to port what we had, and also it’s a pretty solid
design anyway and it worked for us, so that was the one requirement
that we had to port was it had to store data in S3. So this is what we came up with. This is a bit simpler
than our previous implementation. Instead of a bank of web servers, I’m using API Gateway
for the request payload. So this is — instead of having
a load balancer in NGINX, I would just use the API Gateway and our JavaScript player
does a post request to that. So, that then feeds a Lambda that takes that data
and puts it into Kinesis, so it basically just transforms
a POST to a PUT. Once that data
is in a Kinesis stream, it tells another Lambda function
every 100 events to process those events
and store it into a file in S3. From there, we’ll have several
more Lambdas that are triggered and S3 events to do Redshift
and Elasticsearch insertion, and our Oozie pipeline,
which is left over from our Hadoop implementation runs every hour on EMR now
instead of Hadoop. So we’re able to use all of the
existing reporting jobs we had. We just transferred it
to a new system. So, now I’m going to get
to the meat of the presentation: the how and why of the serverless — how and why we went serverless
using AWS offerings. So I’m going to get
more into detail from the “API Gateway to S3”
portion of that previous diagram. So, why API Gateway? We have video players embedded in
pages that we don’t own or control, and for ease and data collection
and ability to secure credentials, we’re not requiring
the client to authenticate. You can of course do so
if that’s something you need to do. You can create an API
using AWS Console GUI tools or by using Swagger. I haven’t
actually personally used it, but in doing research
on automation tools, that was one of the ones
that came up, and it looked pretty
straightforward and easy to use and supports
infrastructure automation. So if you have clients
that can also authenticate using an IAM role or some other
type of AWS authentication, you can also go
straight to Kinesis, as it essentially
just takes a PUT request. So you don’t necessarily have to
have an API in front of it, we just chose to do so
so that we didn’t have to make any changes in our player. Kinesis is basically
a multi-subscriber scalable queueing system. So, using an AWS client
or authenticated HTTP request to send data to Kinesis,
either one at a time or batching records
by the PutRecords command — so you can send
multiple events at a time if you’re using
the Kinesis client library. It has seven-day data retention,
which is a really nice buffer. So if have something
going on downstream and you can’t resolve it
right away, or if you want to do a new data source or do some
new processing on top of that and you want to pull
from your history of Kinesis, you can pull from that
seven-day window of data. Lambdas or other consumers
can subscribe to the same stream, so it’s multi-subscriber. And Kinesis will take —
keep a transactional record for each to maintain
the state of the data. This is kept track of in what’s
known as a shard iterator, and I’ll explain shards
momentarily. So, each Kinesis shard
is a unit of throughput. And what that means is
there’s a thousand PutRecords in one megabyte per second
of throughput per shard of input — that was kind of a mouthful,
I apologize– and two megabytes
per second output. So when you put data
into a Kinesis shard, you’re able to draw it off faster,
so you don’t get delays and backups. So, to increase the number of records
that you can put into Kinesis, or the amount of data
that you can put or pull off of Kinesis at one time,
you increase the number of shards, and then to decrease that,
you decrease the number of shards. S3, Simple Storage Service. It stores file objects, and it’s
not a traditional file system. We’ve been using S3
as a central file storage repository for all of our important log files. So anything that gets
processed through us generally makes it way through S3,
unless it’s from a third party that we don’t necessarily
have control over. But then we could still put it
in there if we choose to. It’s inexpensive, it’s very stable,
and it’s well-supported. You can’t interact with it
in the way you would a traditional file system. You need to use a client library
or the AWS command-line interface to do that. Buckets are basically
the directories, the keys are subdirectories, and file objects
are basically the files. And EMR, which is — we’ve been working on moving
all of our data processing over to EMR from a 20-node
Hortonworks Hadoop cluster. Instead of the scheduled
hourly analytics jobs, we’re writing — we want to go
the route of writing Lambdas that stream data constantly and then update reports
as that data comes in. So moving towards
an event-based realtime system. We can then choose to spin up
the EMR cluster for daily batch processing
for doing other types of reporting that’s more intensive, and for
processing all of that data at once. So you don’t have to have
a cluster that’s up and running. I don’t have to maintain that.
I just bring it up, do my processing,
and then tear it down when I’m done. Hadoop has been a frequent source
of operational troubleshooting and maintenance,
and EMR is going to help us reduce the costs
associated with that. So, what is AWS Lambda? It’s a basically event-driven
processing model. It’s push/pull,
so you can feed data to it, and as that data comes in,
it does something with it, sends it to another Lambda. It can basically call another
Lambda by pushing data to that, or another —
any other type of system that operates
on the same principle. Its processing happens on demand
as the data becomes available. More Lambdas will spin up as the number of events
to be processed increases. It’s essentially running
on a very large herd of EC2 Amazon Linux instances, but you only pay
for the time your functions run, and you don’t need to worry about
the underlying infrastructure. You can also do scheduled Lambdas
for specific times and days, or intervals just like cron, and Lambdas can feed
and trigger other Lambdas. So It’s pretty flexible. It’s a
micro services-based architecture. So, here’s a basic Lambda function. All this does is take events
and put them into Kinesis. And this isn’t necessarily
from API Gateway, this could be from anything
that I choose to trigger it. Lambda is a compute platform, and this function
is standard Python code. So that’s kind of
an important distinction is that I can run this code anywhere,
I just don’t get to take advantage of the compute platform
unless I use it with AWS. Boto3 is the AWS
official Python library, and it’s included
in the Python runtime for Lambda, so you don’t have to worry about
importing that, it’s just there. I mean, you do have to import it
into your code, but you don’t have to worry
about setting it up in the environment when you run it,
and I’ll get to that in a little bit. So the heart of every Lambda
function is the event handler, which takes two arguments:
an event, and context. The event is the data
that triggers the function, and the context is metadata
about the function’s runtime. So here’s an example event, and it’s a record or set of records;
in this case it’s a dictionary. It contains metadata
about the event itself, and then the data,
in which case this is a base64-encoded string
with the key “data.” Pretty straightforward. The type of record you will
process will be different depending on the source —
it could be a string, it could be some other
type of event. A lot of times it’s just
either JSON or it’s a dictionary, in the case of Python. So, when you’re writing Lambdas, you should write them
to only process one type of event. They’re not going to process
and be triggered by multiple different types
of information. If you need another data source,
you add another Lambda. So, the context object
has a few pieces of data about the Lambda function
that are useful data points to add for logging or for getting
more performance or debug data
out of your functions. Remaining_time is how long
the Lambda has left to run, and the request_id
is unique for every run, except in the case
of a retry or failure, in which case it’s the same. So, Lambda is designed
with scalable processing in mind. It’s stateless. Data goes in from one source
and leaves for another, potentially transformed,
potentially the same and just in a different
data storage area. Previous data has no impact
on the processing of current data. This is to allow Lambda to run
as many copies of itself as is needed
without clobbering records. If you’re storing anything
such as file names in S3, make sure you use
a unique identifier. If an exception is raised, the Lambda function will retry
on that event data until it succeeds
or until the record expires. So for Kinesis, that could be
seven days of retries. In practice, you should monitor
your retries and failures, and if you’re seeing a lot of them
or if you’re seeing the same request_id or seeing
the same repeated error, you should take a look at your Lambda,
the type of data that’s coming in, and the entire system to see
where you can optimize that. So here’s another example
of a Lambda function. This is the one that loads
our Kinesis data into S3. And this is what actually replaces
six NGINX logging servers, a load balancer, and a crowd job, so as an ops person I love this
because it’s 50 lines of code and I don’t have to worry about
the underlying infrastructure. So I would normally write out
a series of events into a file, and then copy that file
over to S3 and then clean up after myself
when I’m done. However, I don’t have
access to disk here, so I have to take records from
Kinesis in batches of a hundred. I can take that number
and scale it up or down as needed. I’m concatenating that
into a string, and then compressing
and encoding it using bzip2, and then uploading
that compressed string as a file to S3,
and that function ends with logging the success
or failure of that request. I’m using a small batch here because
I want to process data quickly, and I want to optimize for
memory and speed of execution. I can adjust that number
up and down by monitoring how long my Lambda runs
with the context object, and also by looking at logging,
which I’ll get to, and how much memory it uses. So, logging and monitoring. Logs are visible in Cloudwatch
immediately after a Lambda function runs. This is where you’ll
get more information, and this is an example of
the information that you’ll see after your Lambda
finishes running. The last line here kind of lists
the actual duration of the function, the billed duration, and the
billed and used memory footprint. So for example, here we —
it took 308 milliseconds. So we’re billed
in 100-millisecond increments. That actually billed us for 400,
so I can optimize there by trying to get
as close to 400 as possible with the number of record
or the size of the records as I can without going over so that I know I’m getting
exactly what I’m paying for. Same thing with memory size. I set this as a 128-meg
max memory and it used 28. So this is kind of a —
it’s an over-provisioned function, but this is about the minimum
provisioning that you can do with AWS. You can scale up memory
and you can scale up the maximum duration
as you need to. So monitoring these numbers
are how you’re going to tune your function to get
better performance and more effectively
utilize memory and time. So — and you also get some nice
basic metrics dashboards. These aren’t super detailed
or fine grained, but it’s good
at a high level overview. And again, if I failed to mention it,
this is in the AWS console. So this is all just stuff
you can click around and look at. The invocation count here is
how many times the function has been called
in a given period. The duration is of course
run time in milliseconds. An important note is
there’s a limit of a hundred concurrent
Lambda function executions per account,
so if your Lambda functions pass this number,
some will be throttled, and there’s invocation errors and
throttled invocations are the graphs. I don’t have any currently
to show you, but that’s where they would be
if they showed up. So what this means is that
if you have five Lambda functions and each of them has to run
20 copies to process your data, any additional Lambdas
that spin up to run won’t run until
one of those hundred currently running functions
has finished. So, testing this. Since our code can run standalone,
I can run unit tests against it. I can test it with whatever
testing environment I prefer. I think I used unittest2. You’ll need to mock out the event
andn the context object and any external dependencies. Well, you don’t have to,
but that’s good practice to do so. You can also invoke
the Lambda functions using AWS command-line tools,
so you can actually see how they operate
in kind of real time. You can give it a payload, and also run it as a dry run,
so it doesn’t run any of its effects. If you don’t actually return
anything in your function — none of my functions
return any data once they run, they just dump files to a bucket,
so I don’t actually get any confirmation,
it just finishes running. If it finishes without an error,
I know it’s good. So, packaging. All of the built-in functions
for Python, of course, as well as boto3, are included
in the Python Lambda runtime. If you’re using
any external libraries, you’ll need to add those libraries
to a zip deployment package. So this is pretty straightforward
and simple to do, just making sure that you’re
zipping your file directories recursively so you get the contents
in addition to the directories. I totally missed that
the first time around. That was entertaining. So you can either install
the libraries directly into your project directory
by doing pip install -t, and then zip the contents
of that entire directory, or you can zip up
your project file and then go to your site packages
in your virtualenv and zip up the contents
of that directory. So basically you just have to put
all of the stuff you’re using into the cloud, which is what
we’re trying to do anyway. So, we just started
adopting Travis CI, and it’s great
because we use a VPC, so a lot of the Travis stuff
being it’s a third party tool, it’s outside of our VPC,
it can’t access it, but it can access
deployment tools for Lambda because that doesn’t run
inside of a VPC necessarily. So what it does is
it spins up a docker container, I install some tooling via pip, and then I script
some shell commands to do basically an update function code. It was a little too much
to show in this slide, but there’s some, like,
demonstrations on how to do this in the AWS documentation
if you’re curious. AWS Command Iine is a Python package.
It’s pretty easy to use. Also, I haven’t personally
done this, but you can also use Cloud Formation templates
to deploy Lambda functions. We probably will go that route,
although we use a packaging on top of that to do
more composable infrastructure, which is another talk in and of itself,
so I won’t go into that here. It’s also possible to upload
the zip functions to S3, and then do the deploy from S3, so if you’re doing particular
versioning or staging and whatnot, you can trigger codedeploy
or Lambda functions from S3. So, summary. This is Python 2.7 only,
if I forgot to mention that. So no Python 3.0.
If you’re sad about that, I’m sorry. Vendors have to go with completely
stable and tested technologies, which often means
they’re several versions behind. It’s fantastic
for prototyping something that we’re not sure
how it will prove out or what we need to do
in able to get it working. We can spin up functions
faster than we can servers. So this was pretty great for — a lot of this work
that I’m presenting here was pretty much done
in a prototype and we’re in the process
of putting it in production. But it took us about —
took one person maybe like a few weeks of working on and off,
working on other projects. So API Gateway ended up
being prohibitively expensive for us to use as a long-term
data ingestion end point. It wasn’t really
designed for that, especially with this
cost structure in mind. So what we ended going back to is
just using those NGINX web servers but making them as efficient
as possible by pulling out all of the logging, and only
making it as a pass through, so it’s really just doing
a proxy, which is efficient, and that will work for us
in the long term, and it costs us
several orders of magnitude less. So it makes sense to go serverless
in some cases, but not in all cases. We have a small team
and a small amount of resources, and we need to be strategic
about how we allocate that time and cost. One other thing is vendor lock-in. So this kind of ties us to AWS. For us that’s not
as much of a problem because everything else we have
is bought into AWS. We’re fully
on the EC2 infrastructure. Everything we have runs on AWS and we’re not planning
on changing that. But if you’re considering this, think about
how difficult it will be to move away from this
if your needs change. So I’m going to stand
on my soapbox. Maybe out here. Why is an ops person advocating
for serverless architecture? Serverless is a tool
in your Swiss army knife that you can pull out
to solve problems. Going serverless will not solve
all of your infrastructure problems. It just pushes them to
a different part of your stack. If anything, going serverless
will allow you to focus on your products
and systems architecture in a more critical light. The server is now a black box
that you don’t own and you can’t easily debug. It’s helpful to have operations-
and systems-minded folks in your engineering team
to help you design and build automated, repeatable,
and scalable infrastructure. Serverless components
are one part of that. So, I hope I have intrigued you
to learn more about serverless architecture
and components. Here are some resources
to get started. And these slides will be posted
later on as well. The first two links
are information about AWS and a couple other vendor
serverless cloud offerings. We used AWS, but there are other
vendors doing this as well. The third link
is an AWS blog post about working with Kinesis, and the next two
are documentation on AWS Lambda core components and limits.
If you’re thinking about trying this stuff out,
read about the limits. That’s going to give you
a lot of information. If you already have
an implementation in mind you want to move over to, this will give you idea
of if it’s possible or not. And the last link is a library
for mocking boto3 when testing. So, thank you for listening. There was a lot of information
in this talk. We might have time for — no,
we’re done on time for questions, so you can find me
in the hallways. And also, I’d like to mention,
if you’re looking for someone to solve infrastructure
or data problems in your organization,
also come talk to me as I’m looking for
some new opportunities. So, thank you. [applause]

You Might Also Like

1 Comment

  • Reply William Mckee July 30, 2016 at 12:18 am

    No excuse to not support Python3

  • Leave a Reply