Building an edge computing platform

This is the last post in the series, now we’re going to design an edge computing platform using Lua, Nginx, and Redis cluster. Previously, we explained how to add code, not dynamically, in nginx.

Platformize the computations

The platform is meant to provide a way to attach Lua code dynamically into the edge servers. The starting point can be to take the authentication code and port it to this solution.

At the bare minimum, we need a Lua phase name, an identifier and the Lua code. Let’s call this abstraction the computing unit (CU).

If we plan to add a computing unit dynamically to the NOTT, we need to persist it somewhere. A fast data store for this could be Redis.

overview_architecture

We also need to find a way to encode the computing unit into some of the Redis data types. What we can do is to use the string to store the computing unit. The key will be the identity and the value will hold the Lua phase and code separated by the string “||.

# run this in one tab
docker run –rm -it –name some-redis redis
# run this in another tab
docker exec -it some-redis redis-cli
127.0.0.1:6379> set authentication "access_by_lua_block||if token ~= 'token' then \n return ngx.exit(ngx.HTTP_FORBIDDEN) \n else \n ngx.header['Set-Cookie'] = {'superstition=token'} \n end"
OK
127.0.0.1:6379> get authentication
"access_by_lua_block||if token ~= 'token' then \n return ngx.exit(ngx.HTTP_FORBIDDEN) \n else \n ngx.header['Set-Cookie'] = {'superstition=token'} \n end"
view raw exploring.sh hosted with ❤ by GitHub

The platform needs to know all of the computing units, therefore, we need to list them. We could use the keys command but it can be very slow depending on how much data we have.

A somewhat better solution would be to store all the identities in a set data type, providing an O(N) solution, where N is the number of CUs.

KEYS pattern would also be O(N) however with N being the total number of keys in the data store.

# run this in one tab
docker run –rm -it –name some-redis redis
# run this in another tab
docker exec -it some-redis redis-cli
127.0.0.1:6379> set authentication "access_by_lua_block||if token ~= 'token' then \n return ngx.exit(ngx.HTTP_FORBIDDEN) \n else \n ngx.header['Set-Cookie'] = {'superstition=token'} \n end"
OK
127.0.0.1:6379> get authentication
"access_by_lua_block||if token ~= 'token' then \n return ngx.exit(ngx.HTTP_FORBIDDEN) \n else \n ngx.header['Set-Cookie'] = {'superstition=token'} \n end"
127.0.0.1:6379> sadd coding_units authentication
(integer) 1
127.0.0.1:6379> smembers coding_units
1) "authentication"
127.0.0.1:6379> sadd coding_units anewcomputingunit
(integer) 1
127.0.0.1:6379> smembers coding_units
1) "authentication"
2) "anewcomputingunit"
view raw redis.sh hosted with ❤ by GitHub

Now that we know how we’re going to encode the computing unit, we need to find a method to parse this string separated by || and also a process to evaluate this string as code in Lua.

To find a proper and safe delimiter is difficult, so we need to make sure that no Lua code or string will ever contain ||.

running luajit in repl will be useful for exploration
docker run -it –rm akorn/luajit:2.1-alpine
just inspired at
https://stackoverflow.com/questions/1426954/split-string-in-lua
function mysplit (inputstr, sep)
if sep == nil then
sep = "%s"
end
local t={}
for str in string.gmatch(inputstr, "([^"..sep.."]+)") do
table.insert(t, str)
end
return t
end
codeparts = mysplit("luaphase||luacode", "||")
print(codeparts[1])
luaphase
print(codeparts[2])
luacode
luacode = "a = 0 \n a = a + 1 \n print(a) \n"
luacodefunction = loadstring(luacode)
luacodefunction()
1
view raw lua.lua hosted with ❤ by GitHub

To split a string we’ll use a function taken from Stackoverflow PepeLaugh - Discord Emoji and for the code evaluation, Lua offers the loadstring function.

But now some new questions arise, what happens if the code is syntactically invalid or when the CU raises an error? and how can we deal with these issues?

Error during evaluation – syntactically invalid
code, err = loadstring(" a = 1 \n a a \n pring(a) \n")
print(code)
nil
print(err)
[string " a = 1 …"]:2: '=' expected near 'a'
Error during runtime – syntactically valid
stringcode = " a = taketime.menu "
code, err = loadstring(stringcode)
print(err) no evaluation error
nil
print(code) a runtime error will happen once we execute this function
function: 0x40a068a0
function err_handler(err_msg)
print("an err was raised", err_msg)
return err_msg
end
status, ret = xpcall(code, err_handler)
print(status)
false
print(ret)
[string " a = taketime.menu "]:1: attempt to index global 'taketime' (a nil value)
view raw lua.lua hosted with ❤ by GitHub

To deal with syntax errors, we need to validate the returned values from the function loadstring where the first value is the function instance and the last is the error.

And to protect against runtime error, Lua has a builtin function called xpcall (pcall meaning protected call) that receives a function to execute and a second argument which is an error handler function.

With all this in mind, we can develop the core of our platform. Somehow we need to get all the computing units from Redis, parse them to something easier to consume and finally, we can execute the given function.

Before we start to code, we can create a prototype that will replicate the authorization token system we did before but now using Redis to add and fetch the computing unit as well as shielding the platform from broken code.

location /test {
content_by_lua_block {
local redis_client = redis_cluster:new(config)
– simulating a dynamic code being stored at Redis —
redis_client:set('authentication', "access_by_lua_block||local token = ngx.var.arg_token or ngx.var.cookie_superstition \n if token ~= 'token' then \n return ngx.exit(ngx.HTTP_FORBIDDEN) \n else \n ngx.header['Set-Cookie'] = {'superstition=token'} \n end")
redis_client:sadd('coding_units', 'authentication')
– simulating a dynamic code being stored at Redis —
– fetch —
local resp, err = redis_client:smembers("coding_units")
local raw_coding_units = {}
for _, coding_unit_key in ipairs(resp) do
local resp, err = redis_client:get(coding_unit_key)
table.insert(raw_coding_units, resp)
end
– fetch —
– parse —
local coding_units = {}
for _, raw_coding_unit in ipairs(raw_coding_units) do
local parts = mysplit(raw_coding_unit, "||")
local cu = {}
cu['phase'] = parts[1]
cu['code'] = parts[2]
table.insert(coding_units, cu)
end
– parse —
– execute —
for _, coding_unit in ipairs(coding_units) do
ngx.log(ngx.ERR, "phase ", coding_unit['phase'])
ngx.log(ngx.ERR, "code ", coding_unit['code'])
local function_code, err = loadstring(coding_unit['code'])
ngx.log(ngx.ERR, "loadstring error ", err)
local status, ret = xpcall(function_code, err_handler)
ngx.log(ngx.ERR, "xpcall status ", status, " ret ", ret)
end
– execute —
ngx.say(" R$ 45,567,900,00 ") a random value
}
}
view raw test.lua hosted with ❤ by GitHub

To test these lines of code, we can go to the terminal and simulate calls to the proper nginx location. Therefore we can understand if the expected behavior is shown.

Screen Shot 2020-04-14 at 12.59.56 PM

Since we’re comfortable with our experiment, we can start to brainstorm thoughts about the code design and performance trade-offs.

Querying Computer Units

The first decision we can take is about when we’re going to fetch all the computing units (CUs). For the sake of simplicity, we can gather all the CUs for every request but then we’re going to pay extra latency for every client’s request.

Screen Shot 2020-04-14 at 11.36.37 PM
the step 2 can generate several round trips in Redis cluster

To overcome these challenges, we’ll rely on two known techniques,  caching and  background processing.

We’ll move the fetch and parse logic to run in the background. With that running periodically, we then store the CUs into a shared memory where the edge computing core can lookup without the need for additional network connections.

Openresty has a function called ngx.timer.every(delay, callback), it runs a callback function every delay seconds in a “light thread” completely detached from the original request. This background job will do the fetch/parser instead of doing so for every request.

Once we got the CUs, we need to find a buffer that our fetcher background function will store them for later execution, openresty offers at least two ways to share data:

  • a declared shared memory (lua_shared_dict) with all the Nginx workers
  • encapsulate the shared data into a Lua module and use the require function to import the module

The nature of the first option requires software locking. To make it scalable, we need to try to avoid this lock contention.

The Lua module sharing model also requires some care:

“to share changeable data among all the concurrent requests of each Nginx worker, there is must be no nonblocking I/O operations (including ngx.sleep) in the middle of the calculations. As long as you do not give the control back to the Nginx event loop and ngx_lua’s light thread scheduler (even implicitly), there can never be any race conditions in between. “

source

Edge Computing Bootstrapping

The usage of this edge computing lua library requires you to start the background process and also to explicitly call an execution function for each location and lua phase you want to add it to.

http {
init_by_lua_block {
config = "redis-cluster-config"
redis_cluster = require "resty-redis-cluster"
edge_computing = require "resty-edge-computing"
}
server {
listen 8080;
location /platform {
alias /usr/local/openresty/nginx/;
}
rewrite_by_lua_block {
local redis_client = redis_cluster:new(config)
local status, err = edge_computing.start(redis_client)
if not status then
ngx.log(ngx.ERR, " edge_computing.start error ", err)
end
}
access_by_lua_block {
local status, errs = edge_computing.execute()
if errs ~= {} then
for _, err in ipairs(errs) do
ngx.log(ngx.ERR, " edge_computing.execute error ", err)
end
end
}
}
}
view raw nginx.conf hosted with ❤ by GitHub

In the past example, we started, on the first request, at the rewrite phase, this will also initiate the background job to update every X seconds.

On the access phase, we’re going to execute the available CUs. If a second request comes in, it’ll “skip” the start and just execute all the cached CUs.

no_extra_cost

While using these APIs solve our challenge of adding unnecessary latency, it comes at a price: now when we add a new CU to the data store, it might take X seconds to be available for workers.

The rewrite_by_lua_block was used because it’s the first phase where we can start a background job that can access cosockets and works with lua-resty-lock (a library used by resty-redis-cluster).

funny behavior will happen, related to the eventual consistency nature of this solution: if a client issues a request1, in a given time for a Worker1, later the same user does another request2 and a different Worker2 will accept it. The time in which each worker will run the update function will be different.

This means that the effective deployment of your CUs will be different even within a single server. The practical consequence for this is that the server might answer something different for a worker in comparison to another one. It will eventually be consistent given the x seconds delay declared as update interval.

worker_memory_update

Nginx worker load balancing relies on the event-based model and OS-dependent mechanisms to efficiently distribute requests among worker processes.

How to use it

Adding the CU via the Redis cluster will make it work.

git checkout 1.0.4
# in tab1 – run the NOTT
make run
# in tab2 – run the tv show
make broadcast_tvshow
# in tab3 – test
http http://localhost:8080/hls/colorbar.m3u8
# in tab4 – let's add the token CU
# to act in access rewrite phase
# — first need to discovery the redis cluster id
docker ps | grep redis
# — then let's connect to the redis cluster
docker exec -it f44ed71b3056 redis-cli -c -p 7000
# inside redis-cluster let's add the CU
set authentication "rewrite||local token = ngx.var.arg_token or ngx.var.cookie_superstition \n if token ~= 'token' then \n return ngx.exit(ngx.HTTP_FORBIDDEN) \n else \n ngx.header['Set-Cookie'] = {'superstition=token'} \n end"
sadd coding_units authentication
# go back to tab3 – you should eventually (after max 20s)
# receive 403 as response
http http://localhost:8080/hls/colorbar.m3u8
# then add the token and it'll work again
http http://localhost:8080/hls/colorbar.m3u8?token=token
view raw to_test.sh hosted with ❤ by GitHub

Computing Edge Use Cases

Let’s list some of the possible usages for this platform so we can think a little bit ahead of time.

  • access control – tokens, access control, origin
  • change response
  • decorate headers
  • generate content
  • traffic redirect
  • advanced caching

The options are endless, but let’s try to summarize the features that we didn’t add to the code yet.

When we implemented the request counter we used redis as our data store so it’s safe to assume that somehow the CUs might use redis to persist data. Another thing we could do is to offer sampling, instead of executing the task for all requests we could run it for 3% of the them.

Another feature we could do is to allow filtering by the host. In this case, we want a given CU to execute in a specific set of machines only, but we also can achieve that in the CU itself if we need to.

The persistence needs to be passed for the CU, we can achieve that by wrapping the provided raw string code with a function that receives an input and pass this argument through the pcall call.

local wrapper_loadstring = function(str_code)
local api_fun, err = loadstring("return function (edge_computing) " .. str_code .. " end")
if api_fun then return api_fun() else return api_fun, err end
end
… compile cu["code] = wrapper_loadstring(raw_code)
local status, ret = pcall(cu["code"], {redis_client=edge_computing.redis_client})
CU in Redis —
set authentication "rewrite||local resp, err = edge_computing.redis_client:incr(\"key\") \n ngx.log(ngx.ERR, \" >>>> \", resp) \n"
local resp, err = edge_computing.redis_client:incr("key")
ngx.log(ngx.ERR, " >>>> ", resp)
view raw wrapping.lua hosted with ❤ by GitHub

We’ll have access to the edge_computing in our CUs as if it was a global variable.

And finally, we can mimic the sampling technique by using a random function. Let’s say we want a given CU to be executed 50% of the time.

We need to encode the desired state at the datastore level and before we run the CU, we check if the random number, ranging from 1 to 100, is smaller or equal to the desired sampling rate.

if math.random(100) <= sampling then
pcall(code)
end
view raw sampling.lua hosted with ❤ by GitHub

Pure random distribution is not the most adequate, maybe in the future, we can use an algorithm similar to the power of two choices.

Future

Any platform is a complex piece of software that needs to be flexible, accommodate many kinds of usage, be easy to use, be safe and it still needs to be open for future innovation/changes such as:

  • a control plane where you can create CUs using UI
  • if the execution order is important then change the current data types
  • add usage metrics per CU
  • wrapper the execution with a timeout/circuit breaker mechanism

I hope you enjoyed these blog posts, they were meant to mostly show some trade-offs and also to deepen the nginx and Lua knowledge.

Empowering Nginx with Lua code

This is the second post in the series where we develop an edge computing platform. In this post, we’ll add some code/behavior to the front end servers. Here’s a link to the previous entry.

Add code inside the front end

The OTT service we did before don’t employ any kind of authentication thus the users can watch the streams for free. To solve this authentication issue we can add Lua code embed into nginx.

OpenResty – an Nginx with support for LuaJIT 2.0/2.1 code.

To run Lua code inside nginx you need to understand a little bit of the request phases within the server. The request will travel across different stages where you can intercept it using Nginx directives and add the code.

Screen Shot 2020-04-20 at 2.09.49 PM

Just for the sake of learning, the authentication logic will be a straightforward token system. During the access phase, we’ll deny access for those with no proper authentication. Once a user has the required token it’s going to be persisted in form of a cookie.

Fixed token with no expiration is unsafe for production usage, you should look for something like JWT.

server {
location / {
proxy_cache my_cache;
proxy_cache_lock on;
proxy_pass http://backend;
access_by_lua_block {
local token = ngx.var.arg_token or ngx.var.cookie_superstition
if token ~= "token" then
return ngx.exit(ngx.HTTP_FORBIDDEN)
else
ngx.header['Set-Cookie'] = {'superstition=token'}
end
}
}
}
view raw nginx.conf hosted with ❤ by GitHub

The edge server can run useful behavior/code, now let’s laid out some examples that demonstrate the power we can have while executing functions at the front end.

Suppose a hacker, behing the IP 192.168.0.253, is exploting a known issue, that is going to be fixed soon. We can solve that by forbiddening his/her IP. Adding lua code, to the same phase, can fix this problem.

You can access all the nginx variables using the api ngx.var.VARIABLE.

if ngx.var.remote_addr == "192.168.0.253" then
return ngx.exit(ngx.HTTP_FORBIDDEN)
end
view raw acl.lua hosted with ❤ by GitHub

Nginx has the deny directive to solve this problem although it doesn’t allow a dynamic way to update the IP list. We would need to reload the server every time we want to update the IPs.

It’s wanted to avoid different domains to consume our streams, to prevent that, we’re going to examine the referer header and reject all the requests not originated from our domain.

CORS and CSP will be safer to solve this issue.

local headers = ngx.req.get_headers()
if not string.find(headers["Referer"],"localhost") then
return ngx.exit(ngx.HTTP_FORBIDDEN)
end
view raw Origin.lua hosted with ❤ by GitHub

To on-the-fly change the response from the backend, we’ll add a custom HLS tag in the playlist.

header_filter_by_lua_block
since we're going to change the content
we need to
ngx.header.content_length = nil
body_filter_by_lua_block
ngx.arg[1] = ngx.arg[1] .. "\n#COPYRIGHT: mysite.com"
view raw changing.lua hosted with ❤ by GitHub

To decorate the HTTP headers, we’ll attach new ones exposing some metrics from the server and for that matter, it can rely on the ngx.header[‘name’] API.

header_filter_by_lua_block
ngx.header['X-Metrics-upstream_response_time'] = ngx.var.upstream_response_time
ngx.header['X-Metrics-upstream_connect_time'] = ngx.var.upstream_connect_time
ngx.header['X-Metrics-request_time'] = ngx.var.request_time
ngx.header['X-Metrics-tcpinfo_rtt'] = ngx.var.tcpinfo_rtt
ngx.header['X-Metrics-time_iso8601'] = ngx.var.time_iso8601
view raw gistfile1.lua hosted with ❤ by GitHub

Finally, we’ll count how many requests a given user (based on her/his IP) does and expose it through a custom HTTP header. The counter was stored in Redis.

counting how many requests a given ip did
local redis_client = redis_cluster:new(config)
local resp, err = redis_client:incr(ngx.var.remote_addr)
ngx.header['X-Request-Counter'] = resp
view raw request_counter.lua hosted with ❤ by GitHub

All this is working, if you want, you can test it by yourself.

# make sure you have docker
git clone https://github.com/leandromoreira/nott.git
cd nott
git checkout 1.0.2
# in a tab
make run
# wait until the platform is up and running
# and in another tab run
make broadcast_tvshow
# ^ for linux users you use –network=host and your
# IP instead of this docker.for.mac.host.internal
# for windows user I dunno =(
# but you can use OBS and point to your own machine
# open your browser and point it to http://localhost:8080/app
# observe the metrics in the network console window
# or even our custom hls manifest
# try to play our stream in clappr.io/demo/ it won't do it.
view raw steps1.sh hosted with ❤ by GitHub

Conclusion

Did you notice a pattern? Every time we want add a new feature, we need to:

  • write a little bit of Lua code
  • attach it to a request phase directive in nginx
  • deploy and reload the edge servers

That’s why we need to build an edge computing platform, to put code faster into production and avoid server reload.

Building an open-source OTT platform

Create software from “scratch” might not be a good idea at first but it’s often a great way to study a specific technology or even to deepen your knowledge in a particular field of computer science.

In this three-post series, we’re going to build a simple video platform using open-source software, will add features to it so it handles the computional power on the front end (edge computing) and we’ll conclude designing a platform that will enable us to add code/features dynamically to the servers.

Screen Shot 2020-04-21 at 9.07.02 AM

An over-the-top (OTT) is a streaming media service offered directly to viewers via the Internet. OTT bypasses cable, broadcast, and satellite television platforms. Now you don’t need to spend your money that much.

Edge computing is the ability to put computation and storage closer to the place where it is demanded, in simpler terms is the code running within your front end servers.

We’re going to design two distinct services: a simple video streaming solution and an edge computing platform for this video streaming service.

  1. Building NOTT – an open source OTT video platform
  2. Add edge computation to NOTT – empower nginx with lua code
    • token authentication code
    • IP acl
    • forbid other sites
    • add a custom HLS tag on the fly
    • expose metrics in HTTP headers
    • count user request per IP
  3. Platformize the edge computing – using lua + redis
  4. $profit$

The NOTT

The new OTT is a VERY SIMPLE open-source video platform that expects an input signal and produces an output stream. It was made mostly as an excuse to discuss and design an edge computing platform around it.

NOTT
NOTT is built using a simple html5 app
Screen Shot 2020-04-12 at 8.11.15 AM
NOTT architecture

The UI app is a simple static html5 file served from nginx. We’re using Clappr (backed by hls.js and shaka) as the selected player. The front end works as a caching layer for the video streaming and it also hosts the NOTT app.

The live streaming reaches the platform through FFmpeg, the broacasting, which is also used to transcode the input producing multiple renditions. The nginx-rtmp acts as a packager, converting the RTMP input into the adaptive output streaming format known as HLS.

The main selling point of our OTT platform is that it has the popular TV channel color bar (60fps) and the legendary TV show big buck bunny (partner’s licensed content). :slightly_smiling_face:

Compatibility: I didn’t test on all platforms (browsers, ios, android, CTVs), video is hard and NOTT won’t cover 100% of the devices but it should work in most places.

How does it work?

To broadcast the color bar TV show into the platform, we’ll use FFmpeg. It has some filters that are capable to create synthetic color bar frames at a given rate. It also offers an audio source filter known as sine can be used to create artificial sound.

This command creates color bar pictures at 60 frames per second and a sine wave sound at 48000 hertz. It encodes them to the video codec h264 using the libx264 and to the audio codec aac. Finally, we send them to the transcoder/packager using RTMP.

ffmpeg -f lavfi -i 'testsrc2=size=1280×720:rate=60,format=yuv420p' \
-f lavfi -i 'sine=frequency=440:sample_rate=48000:beep_factor=4' \
-c:v libx264 -preset ultrafast -tune zerolatency -profile:v high \
-b:v 1400k -bufsize 2800k -x264opts keyint=120:min-keyint=120:scenecut=-1 \
-c:a aac -b:a 32k -f flv rtmp://transcoder/encoder/colorbar

The ingest server runs nginx-rtmp and it acts as input service, receiving the FFmpeg synthetic stream. It also transcodes (spawning FFmpeg processes for that) and creates the HLS format in a given folder.

The front end servers will consume the streaming via HTTP backed by this ingest server.

rtmp {
server {
listen 1935;
application encoder {
live on;
exec ffmpeg -i rtmp://localhost:1935/encoder/$name
-c:v libx264 -b:v 750k -f flv -s 640×360 rtmp://localhost:1935/hls/$name_high
-c:v libx264 -b:v 400k -f flv -s 426×240 rtmp://localhost:1935/hls/$name_mid
-c:v libx264 -b:v 200k -f flv -s 426×240 rtmp://localhost:1935/hls/$name_low;
}
application hls {
live on;
hls on;
hls_variant _high BANDWIDTH=878000,RESOLUTION=640×360;
hls_variant _mid BANDWIDTH=528000,RESOLUTION=426×240;
hls_variant _low BANDWIDTH=264000,RESOLUTION=426×240;
}
}
}
view raw nginx.conf hosted with ❤ by GitHub

The front end server we chose was nginx, a scalable web server and reverse proxy. This will be the endpoint where the final users can access the html5 application to watch the stream. It will also work as a caching layer for scalability.

http {
upstream backend {
server ingest;
}
server {
listen 8080;
location / {
proxy_cache my_cache;
proxy_cache_lock on;
proxy_pass http://backend;
}
location /app {
alias /usr/local/openresty/nginx/;
}
}
}
view raw nginx.conf hosted with ❤ by GitHub

Finally, the app is a simple HTML static file that instantiates the player.

<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width">
<title>NOTT – The New OTT</title>
<script type="text/javascript" src="https://cdn.jsdelivr.net/npm/clappr@latest/dist/clappr.min.js"></script>
</head>
<body class="notstealenfromnetflix">
<ul class="flex-container">
<li class="flex-item">
<div id="player"></div>
</li>
</ul>
<script>
var player = new Clappr.Player(
{
source: "http://localhost:8080/hls/colorbar.m3u8&quot;,
parentId: "#player",
poster: 'https://www.bogotobogo.com/FFMpeg/images/test_patterns/testsrc010.png&#39;,
mute: true,
height: 360,
width: 640,
});
</script>
</body>
</html>
view raw app.html hosted with ❤ by GitHub

How to use it

The entire platform was conceived with Linux containers in mind so you just need to run make run and this is going to start it all. You also need to start the color bar in a different tab by running make broadcast_tvshow and point your browser to http://localhost:8080/app.

# make sure you have docker
git clone https://github.com/leandromoreira/nott.git
cd nott
git checkout 0.0.3
# in a tab
make run
# wait until the platform is up and running
# and in another tab run
make broadcast_tvshow
# ^ for linux users you use –network=host and your
# IP instead of this docker.for.mac.host.internal
# for windows user I dunno =(
# but you can use OBS and point to your own machine
# open your browser and point it to http://localhost:8080/app
view raw steps.sh hosted with ❤ by GitHub

Conclusion

The genuine reason we created this simplistic video platform is to have a software where we can explore the computation at the edge. The next post will be empowering the Nginx front end with Lua code to add features to NOTT, things like authentication and IP acl.

Good Code Design From Linux/Kernel

Learn how Linux/FFmpeg C partial codebase is organized to be extensible and act as if it were meant to have “polymorphism”. Specifically, we’re going to briefly explore how Linux concept of everything is a file works at the source code level as well as how FFmpeg can add support fast and easy for new formats and codecs.

diagram_components

 

Good software design – Introduction

To write useful and long term maintainable software we tend to look out for patterns and group them into abstractions and it seems that’s the case for devs behind Linux and FFmpeg too.

Software design

When we’re creating software, we’re building data structures and defining their behaviors and dependencies. The way we create and link them can be seen as the design/architecture of the software.

Let’s say we’re building a media framework that encodes/decodes video and audio. The codecs AV1, H264, HEVC, and AAC all do some common operations and if we can provide a generic abstraction that holds these common operations and data we can use this concept instead of relying on the concrete idea of what a specific codec does.

Through the years many developers noticed that software with a good design is a good idea that pays off as software grows in complexity.

This is one of the ideas behind the good design for software, to rely on components that are weakly linked and with boundaries around what it should do.

Ruby

Maybe it’s easier to see all these concepts in practice. Let’s code a quick pseudo media stream framework that provides encoding and decoding for several codecs.

class AV1
def encode(bytes)
end
def decode(bytes)
end
end
class H264
def encode(bytes)
end
def decode(bytes)
end
end
# …
supported_codecs = [AV1.new, H264.new, HEVC.new]
class MediaFramework
def encode(type, bytes)
codec = supported_codecs.find {|c| c.class.name.downcase == type}
codec.encode(bytes)
end
end

view raw
ruby.rb
hosted with ❤ by GitHub

This pseudo-code in ruby tries to recreate what we’re discussing above, there is an implicit concept here of what operations a codec must have, in this case, the operations are encode and decode. Since ruby is a dynamically typed language any class can present these two operations and act as a codec for us.

Developers sometimes may use the words: contract, API, interface, behavior and operations as synonyms.

This design might be considered good because if we want to add a new codec we just need to provide an implementation and add it to the list, even the list could be built in a dynamic way but the idea is that this code seems easy to extend and maintain because it tries to keep link between the components weak (low coupling) and each component does only what it should do (cohese).

Rails framework even enforce some way to organize the code, it adopts the model-view-controller (MVC) architecture

Golang

When we go (no pun intended) to a statically typed language like golang we need to be more formal, describing the required types but it’s still doable.

type Codec interface {
Encode(data []int) ([]int, error)
Decode(data []int) ([]int, error)
}
type H264 struct {
}
func (H264) Encode(data []int) ([]int, error) {
// … lots of code
return data, nil
}
var supportedCodecs := []Codec{H264{}, AV1{}}
func Encode(codec string, data int[]) {
// here we can chose e use
// supportedCodecs[0].Encode(data)
}

view raw
file.go
hosted with ❤ by GitHub

The interface type in golang is much more powerful than Java’s similar construct because its definition is totally disconnected from the implementation and vice versa. We could even make each codec a ReadWriter and use it all around.

Clang

In the C language we still can create the same behavior but it’s a little bit different.

struct Codec
{
*int (*encode)(*int);
*int (*decode)(*int);
};
*int h264_encode(int *bytes)
{
//
}
*int h264_decode(int *bytes)
{
//
}
struct Codec av1 =
{
.encode = av1_encode,
.decode = av1_decode
};
struct Codec h264 =
{
.encode = h264_encode,
.decode = h264_decode
};
int main(int argc, char *argv[])
{
h264.encode(argv[1]);
}

view raw
file.c
hosted with ❤ by GitHub

Code inspired by https://www.bottomupcs.com/abstration.xhtml

We first define the abstract operations (functions in this case) in a generic struct and then we fill it with the concrete code, like the av1 decoder and encoder real code.

Many other languages have somewhat similar mechanisms to dispatch methods or functions as if they were part of an agreed protocol and then the system integration code can deal only with this high-level abstractions.

Linux Kernel – Everything is a file

Have you ever heard the expression everything is a file in Linux? The idea is to have a common interface for all kinds of resources in Linux, for instance, Linux handles network socket, special files (like /proc/cpuinfo) or even USB devices as files.

This is a powerful idea that can make easy to write or use programs for linux since we can rely in a set of well known operations from this abstraction called file. Let’s see this in action:

# the first case is the easiest, we're just reading a plain text file
$ cat /etc/passwd
root:x:0:0:root:/root:/bin/bash
daemon:x:1:1:daemon:/usr/sbin:/usr/sbin/nologin
# now here, we think we're reading a file but we are not! (technically yes.. anyway)
$ cat /proc/meminfo
MemTotal: 2046844 kB
MemFree: 546984 kB
MemAvailable: 1535688 kB
Buffers: 162676 kB
Cached: 892000 kB
# and finally we open a file (using fd=3) for read/write
# the "file" being a socket, we then send a request to this file >&3
# and we read from this same "file"
$ exec 3<> /dev/tcp/www.google.com/80
$ printf 'HEAD / HTTP/1.1\nHost: http://www.google.com\nConnection: close\n\n' >&3
$ cat <&3
HTTP/1.1 200 OK
Date: Wed, 21 Aug 2019 12:48:40 GMT
Expires: -1
Cache-Control: private, max-age=0
Content-Type: text/html; charset=ISO-8859-1
P3P: CP="This is not a P3P policy! See g.co/p3phelp for more info."
Server: gws
X-XSS-Protection: 0
X-Frame-Options: SAMEORIGIN
Set-Cookie: 1P_JAR=2019-08-21-12; expires=Fri, 20-Sep-2019 12:48:40 GMT; path=/; domain=.google.com
Set-Cookie: NID=188=K69nLKjqge87Ymv4h-gAW_lRfLCo7-KrTf01ULtY278lUUcaNxlEqXExDtVB104pdA8CLUZI8LMvJv26P_D8RMF3qCDzLTpjji96B9v_miGlZOIBro6pDreHP0yW7dz-9myBfOgdQjroAc0wWvOAkBu-zgFW_Of9VpK3IfIaBok; expires=Thu, 20-Feb-2020 12:48:40 GMT; path=/; domain=.google.com; HttpOnly
Accept-Ranges: none
Vary: Accept-Encoding
Connection: close

view raw
bash.sh
hosted with ❤ by GitHub

This only is possible because the concept of a file (data structure and operations) was design to be one of the main way to communicate among sub-systems. Here’s a gist of the file_operations’ API.

struct file_operations {
struct module *owner;
loff_t (*llseek) (struct file *, loff_t, int);
ssize_t (*read) (struct file *, char __user *, size_t, loff_t *);
ssize_t (*write) (struct file *, const char __user *, size_t, loff_t *);
//
}

view raw
file_operations.c
hosted with ❤ by GitHub

The struct file_operations define what one should expect from a concept of what file can do.

const struct file_operations ext4_dir_operations = {
.llseek = ext4_dir_llseek,
.read = generic_read_dir,
//..
};

view raw
ext4.c
hosted with ❤ by GitHub

Here we can see the directory implementation of these operations for the ext4 file system.

static const struct file_operations proc_cpuinfo_operations = {
.open = cpuinfo_open,
.read = seq_read,
.llseek = seq_lseek,
.release = seq_release,
};

And even the cpuinfo proc files is done over this abstraction. When you’re operating files under linux you’re actually dealing with the VFS system, this system delegates to the proper implementation file implemenation.

Screen Shot 2019-08-21 at 10.14.07 AM

Source: https://ops.tips/blog/what-is-that-proc-thing/

FFmpeg – Formats

Here’s an overview of FFmpeg flow/architecture that shows that the internal componets are linked mostly to the abstract concepts like AVCodec but not directly to their implemenation, H264, AV1 or etc.

remuxing_libav_components
FFmpeg architecture view from transmuxing flow

 

For the input files, FFmpeg creates a struct called AVInputFormat that is implemented by any format (video container) that wants to be used as an input. MKV files fill this structure with its implementation as the MP4 format too.

 

typedef struct AVInputFormat {
const char *name;
const char *long_name;
const char *extensions;
const char *mime_type;
ff_const59 struct AVInputFormat *next;
int raw_codec_id;
int priv_data_size;
int (*read_probe)(const AVProbeData *);
int (*read_header)(struct AVFormatContext *);
}
// matroska
AVInputFormat ff_matroska_demuxer = {
.name = "matroska,webm",
.long_name = NULL_IF_CONFIG_SMALL("Matroska / WebM"),
.extensions = "mkv,mk3d,mka,mks",
.priv_data_size = sizeof(MatroskaDemuxContext),
.read_probe = matroska_probe,
.read_header = matroska_read_header,
.read_packet = matroska_read_packet,
.read_close = matroska_read_close,
.read_seek = matroska_read_seek,
.mime_type = "audio/webm,audio/x-matroska,video/webm,video/x-matroska"
};
// mov (mp4)
AVInputFormat ff_mov_demuxer = {
.name = "mov,mp4,m4a,3gp,3g2,mj2",
.long_name = NULL_IF_CONFIG_SMALL("QuickTime / MOV"),
.priv_class = &mov_class,
.priv_data_size = sizeof(MOVContext),
.extensions = "mov,mp4,m4a,3gp,3g2,mj2",
.read_probe = mov_probe,
.read_header = mov_read_header,
.read_packet = mov_read_packet,
.read_close = mov_read_close,
.read_seek = mov_read_seek,
.flags = AVFMT_NO_BYTE_SEEK | AVFMT_SEEK_TO_PTS,
};

view raw
ffmpeg.c
hosted with ❤ by GitHub

This design allows new codecs, formats, and protocols to be integrated and released easier. DAV1d (an av1 open-source implementation) was integrated into FFmpeg May this year and you can follow along the commit diff to see how easy it was. In the end, it needs to register itself as an available codec and follow the expected operations.

+AVCodec ff_libdav1d_decoder = {
+ .name = "libdav1d",
+ .long_name = NULL_IF_CONFIG_SMALL("dav1d AV1 decoder by VideoLAN"),
+ .type = AVMEDIA_TYPE_VIDEO,
+ .id = AV_CODEC_ID_AV1,
+ .priv_data_size = sizeof(Libdav1dContext),
+ .init = libdav1d_init,
+ .close = libdav1d_close,
+ .flush = libdav1d_flush,
+ .receive_frame = libdav1d_receive_frame,
+ .capabilities = AV_CODEC_CAP_DELAY | AV_CODEC_CAP_AUTO_THREADS,
+ .caps_internal = FF_CODEC_CAP_INIT_THREADSAFE | FF_CODEC_CAP_INIT_CLEANUP |
+ FF_CODEC_CAP_SETS_PKT_DTS,
+ .priv_class = &libdav1d_class,
+ .wrapper_name = "libdav1d",
+};`

view raw
diff.diff
hosted with ❤ by GitHub

No matter the language we use we can (or at least try to) build a software with low coupling and high cohesion in mind, these two basic properties can allow you to build easier to maintain and extend software.

How to build a distributed throttling system with Nginx + Lua + Redis

graph

At the last Globo.com’s hackathon, Lucas Costa and I built a simple Lua library to provide a distributed rate measurement system that depends on Redis and run embedded in Nginx but before we explain what we did let’s start by understanding the problem that a throttling system tries to solve and some possible solutions.

Suppose we just built an API but some users are doing too many requests abusing their request quota, how can we deal with them? Nginx has a rate limiting feature that is easy to use:

events {
worker_connections 1024;
}
error_log stderr;
http {
limit_req_zone $binary_remote_addr zone=mylimit:10m rate=1r/m;
server {
listen 8080;
location /api0 {
default_type 'text/plain';
limit_req zone=mylimit;
content_by_lua_block {
ngx.say("hello world")
}
}
}
}

view raw
nginx.conf
hosted with ❤ by GitHub

This nginx configuration creates a zone called mylimit that limits a user, based on its IP, to be able to only do a single request per minute. To test this, save this config file as nginx.conf and run the command:

docker run –rm -p 8080:8080 \
-v $(pwd)/nginx.conf:/usr/local/openresty/nginx/conf/nginx.conf \
openresty/openresty:alpine

view raw
run.sh
hosted with ❤ by GitHub

We can use curl to test its effectiveness:

screen shot 2019-01-25 at 9.51.19 pm

As you can see, our first request was just fine, right at the start of the minute 50, but then our next two requests failed because we were restricted by the nginx limit_req directive that we setup to accept only 1 request per minute. In the next minute we received a successful response.

This approach has a problem though, for instance, a user could use multiple cloud VM’s and then bypass the limit by IP. Let’s instead use the user token argument:

events {
worker_connections 1024;
}
error_log stderr;
http {
limit_req_zone $arg_token zone=mylimit:10m rate=1r/m;
server {
listen 8080;
location /api0 {
default_type 'text/plain';
limit_req zone=mylimit;
content_by_lua_block {
ngx.say("hello world")
}
}
}
}

view raw
nginx.conf
hosted with ❤ by GitHub

There is another good reason to avoid this limit by IP approach, many of your users can be behind a single IP and by rate limiting them based on their IP, you might be blocking some legit uses.

Now a user can’t bypass by using multiple IPs, its token is used as a key to the limit rate counter.

screen shot 2019-01-25 at 10.22.00 pm

You can even notice that once a new user requests the same API, the user with token=0xCAFEE, the server replies with success.

Since our API is so useful, more and more users are becoming paid members and now we need to scale it out. What we can do is to put a load balancer in front of two instances of our API. To act as LB we can still use nginx, here’s a simple (workable) version of the required config.

events {
worker_connections 1024;
}
error_log stderr;
http {
upstream app {
server nginx1:8080;
server nginx2:8080;
}
server {
listen 8080;
location /api0 {
proxy_pass http://app;
}
}
}

view raw
lbnginx.conf
hosted with ❤ by GitHub

Now to simulate our scenario we need to use multiple containers, let’s use docker-compose to this task, the config file just declare three services, two acting as our API and the LB.

version: '3'
services:
nginxlb:
image: openresty/openresty:alpine
volumes:
"./lbnginx.conf:/usr/local/openresty/nginx/conf/nginx.conf"
ports:
"8080:8080"
nginx1:
image: openresty/openresty:alpine
volumes:
"./nginx.conf:/usr/local/openresty/nginx/conf/nginx.conf"
ports:
"8080"
nginx2:
image: openresty/openresty:alpine
volumes:
"./nginx.conf:/usr/local/openresty/nginx/conf/nginx.conf"
ports:
"8080"

view raw
docker-compose.yml
hosted with ❤ by GitHub

Run the command docker-compose up and then in another terminal tab simulate multiple requests.

When we request http://localhost:8080 we’re hitting the lb instance.

screen shot 2019-01-25 at 10.58.25 pm

It’s weird?! Now our limit system is not working, or at least not properly. The first request was a 200, as expected, but the next one was also a 200.

It turns out that the LB needs a way to forward the requests to one of the two APIs instances, the default algorithm that our LB is using is the round-robin which distributes the requests each time for a server going in the list of servers as a clock.

The Nginx limit_req stores its counters on the node’s memory, that’s why our first two requests were successful.

And if we save our counters on a data store? We could use redis, it’s in memory and is pretty fast.

screen shot 2019-01-25 at 11.28.41 pm

But how are we going to build this counting/rating system? This can be solved using a histogram to get the average, a leaky bucket algorithm or a simplified sliding window proposed by Cloudflare.

To implement the sliding window algorithm it’s actually very easy, you will keep two counters, one for the last-minute and one for the current minute and then you can calculate the current rate by factoring the two minutes counters as if they were in a perfectly constant rate.

To make things easier, let’s debug an example of this algorithm in action. Let’s say our throttling system allows 10 requests per minute and that our past minute counter for a token is 6 and the current minute counter is 1 and we are at the second 10.

last_counter * ((60 current_second) / 60) + current_counter
6 * ((60 10) / 60) + 1 = 6 # the current rate is 6 which is under 10 req/m

redis_client is an instance of a redis_client
key is the limit parameter, in this case ngx.var.arg_token
redis_rate.measure = function(redis_client, key)
local current_time = math.floor(ngx.now())
local current_minute = math.floor(current_time / 60) % 60
local past_minute = current_minute 1
local current_key = key .. current_minute
local past_key = key .. past_minute
local resp, err = redis_client:get(past_key)
local last_counter = tonumber(resp)
resp, err = redis_client:incr(current_key)
local current_counter = tonumber(resp) 1
resp, err = redis_client:expire(current_key, 2 * 60)
local current_rate = last_counter * ((60 (current_time % 60)) / 60) + current_counter
return current_rate, nil
end
return redis_rate

To store the counters we used three simple (O(1)) redis operations:

  • GET to retrieve the last counter
  • INCR to count the current counter and retrieve its current value.
  • EXPIRE to set an expiration for the current counter, since it won’t be useful after two minutes.

We decided to not use MULTI operation therefore in theory some really small percentage of the users can be wrongly allowed, one of the reasons to dismiss the MULTI operation was because we use a lua driver redis cluster without support but we use pipeline and hash tags to save 2 extra round trips.

Now it’s the time to integrate the lua rate sliding window algorithm into nginx.

http {
server {
listen 8080;
location /lua_content {
default_type 'text/plain';
content_by_lua_block {
local redis_client = redis_cluster:new(config)
local rate, err = redis_rate.measure(redis_client, ngx.var.arg_token)
if err then
ngx.log(ngx.ERR, "err: ", err)
ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR)
end
if rate > 10 then
ngx.exit(ngx.HTTP_FORBIDDEN)
end
ngx.say(rate)
}
}
}
}

view raw
nginx.conf
hosted with ❤ by GitHub

You probably want to use the access_by_lua phase instead of the content_by_lua from the nginx cycle.

The nginx configuration is uncomplicated to understand, it uses the argument token as the key and if the rate is above 10 req/m we just reply with 403. Simple solutions are usually elegant and can be scalable and good enough.

The lua library and this complete example is at Github and you can run it locally and test it without great effort.