Outskirt development

Serverless Mediaserver on AWS

January 10, 2019

cloud
nodejs
programming

Building a mediaserver is really simple, here’s the recipe:

You make an api with 2 endpoints proxied by nginx, one public route to retrieve the medias, and one private to upload the resources. Whenever you upload the files these are written to the hard disk and the route hands back an url composed with a hash, you’ll use this url to retrieve the media you’ve just uploaded. The retrieving endpoint will need a way to pass in parameters that define dimensions, watermarks, and other details of the returning payload, then you plumb everything through imagemagick and woilá!

This will work, shouldn’t it?

This might even work for a while but in reality it is much more complicated than this, dealing with an on premise mediaserver is always a delicate issue, it usually starts dropping request when you most need to serve them;
configuring the machine is hard, IO, virtual memory and many other low level factors are at stake, getting them right is a trial/error process that involves a lot of time.

Recently I had to prepare a mediaserver in a situation where there will be little time for maintenance, and with my previous experience I decided to go serverless on AWS.

The way I decided to structure this is to modularize the functionalities into two aws lambda functions, one for uploads and one for retrieving medias.
The lambdas are where the program resides and also the communication layer between services.

In the rest of the post I will describe the uploading functionality.

The services we will rely on for uploading images are:

  • Api Gateway to trigger the function and manage http requests.
  • A DynamoDB table to store associations of images, original urls, api urls.
  • An S3 bucket to store the raw images.
  • One Lambda function.

Configuring our services

API Gateway

We will need a single POST route to submit our medias, since we will use this route from our server a token auth can be enough for now, you can hookup the trigger directly in lambda console, but for configuring the method and token you will need to go to the Api Gateway Console (it’s quite easy so I won’t got through this).

DynamoDB and S3

In the code below we reference a dynamodb table called YourTableNameHere with a primary key named urlName, and also a S3 bucket called YourBucketName. The lambda will need the appropriate permission to write and read from these.

Building the lambda

The function that acts as main method in the lambda nodejs environment is exports.handler, the event argument passed in represents the private api route call.

1. First thing to do is extract the list of urls which have been passed in:

exports.handler = async (event) => {
  const body = JSON.parse(event.body)
  const { urls } = body

  const urlsIsArray = Array.isArray(urls)
  // throw if urls is not and array with something in it
  if (!urlsIsArray && !!urls.length) {
    return {
      statusCode: 200,
      body: JSON.stringify('urls parameter needs to be a non empty array')
    }
  }
  ...

The api has been configured to only accept post call so our payload will be in body, the only field we’re interested in is urls (the list of original urls to be processed).

2. Check if the urls have already been processed.

All processed urls are written in a DynamoDB table (to avoid processing an image that has already been inserted in S3).
In this phase we check which urls we already have.
Inside the handler function we make a call to checkKeys

  const dbCheck = await checkKeys(urls)

and here’s the checkKeys function:

// client connection to DynamoDB
const client = new AWS.DynamoDB.DocumentClient()

const checkKeys = async hashes => {
  const params = {
    RequestItems: {
      YourTableNameHere: {
        Keys: hashes.map(x => ({ urlName: x }))
      }
    }
  }

  let data = ''
  try {
    data = await client.batchGet(params).promise()
  }
  catch (e) {
    data = e
  }
  return data
}

Inside this function we check on YourTableNameHere table which urls have already been processed.

3. Once we’ve received the response from dynamoDB we normalize it and see the urls that need processing.

...
  const dbCheck = checkKeys(urls)
  const urlsInDb = dbCheck.Responses.YourTableNameHere.map(x => x.urlName)

  // we filter the ones that are missing
  const missingUrls = filterAreadyInserted(urlsInDb, urls)
...

// this can be placed outside handler
const filterAreadyInserted = (inDb, urls) =>
  urls.filter(x => !inDb.includes(x))

4. Process the missing urls adding files to S3.

... 
  const uploads = await uploadImagesToS3(missingUrls)
...

and here is the function and it’s callback

const uploadImagesToS3 = async list => 
  Promise.all(list.map(processSingleImage))


const s3 = new AWS.S3()
const uniqid = require('uniqid')
const fetch = require('node-fetch')
const crypto = require('crypto')

const processSingleImage = orgUrl => {
  const Bucket = 'YourBucketName'
  const ext = orgUrl.split('.').pop().toLowerCase()
  // we use uniq to create the filename
  const uid = uniqid()
  // storing the files in r/ directory inside S3
  const fileName = `${uid}.${ext}`
  const Key = `r/${fileName}`

  // getting the content of the media
  const blob = fetch(orgUrl)
    .then(x => x.buffer())

  // writing to S3
  const putOperation = blob.then(data => {
    const params = {
      Bucket,
      Key,
      Body: data ,
      CacheControl: 'max-age=31536000'
    }
    return s3.putObject(params).promise()
  })

  return Promise.all([blob, putOperation])
    .then(([b, x]) => {
      // if ETag is present it means the image as been written properly
      if (!!x.ETag) {
        const ETag = JSON.parse(x.ETag)

        // we eventually check(not implemented here) that the ETag given back is s3
        // is equal to the one calculated in the lambda (it should)
        const md5hash = crypto.createHash('md5').update(b).digest('hex')

        return {
          status: 'OK',
          mSUrl: `https://s3.eu-west-3.amazonaws.com/${Bucket}/${Key}`,
          urlName: orgUrl,
          s3Payload: x,
          md5hash,
          md5SumRes: ETag === md5hash,
          insertedAt: Date.now()
        }
      }

      // this is our error case
      return {
        status: 'KO',
        orgUrl,
        payload: x
      }
    })
    // we set status to 'KO' if md5SumRes is false
    .then(x => {
      if (x.status === 'OK' && !x.md5SumRes)
        x.status = 'KO'

      return x
    })
}

What happens here is we map the array of urls through processSingleImage, this function gets the content of the url with node-fetch and it’s convenient buffer method which returns the full binary data.

Then we send the binary data (Body) to S3 through it’s wrapper putObject method, in params we also set other fields such as filename, bucket and cache max-age.

At the end we check the insertion has succeeded, we also check the ETag returned from S3 is equal to the one calculated.

5. Insert the newly uploaded s3 files to the database

  // we update dynamodb with the new urls
  const urlInsertionInDb = await insertions(uploads)
const insertions = async arr => {
  const promArr = arr.map(async (x) => {
    const params = {
      TableName: 'YourTableNameHere',
      Item: x
    }

    try {
      let data = await client.put(params).promise()
    }
    catch (e) {
      throw new Error('from insertions in DynamoDB: ', e)
    }
    return x
  })
  // we return a single promise that contains an array
  return Promise.all(promArr)
}

6. Finally we merge the starting urls with the one already in the database and the ones created while uploading the files to S3 and return.

  const mergedUrls = mergeUrls(urls, urlsInDb, urlInsertionInDb)

  const response = {
    statusCode: 200,
    body: JSON.stringify(mergedUrls)
  }
  return response
}

const mergeUrls = (urls, urlsFromDb, urlsFromInsertion) => (
  urls.map(url => {
    // if we have an url in db response that's it
    const fromDb = urlsFromDb && urlsFromDb.find(x => x.urlName === url)
    if (fromDb)
      return fromDb

    // if we have something here we're done
    const fromInsertion = urlsFromInsertion.find(x => x.urlName === url)
    if (fromInsertion)
      return fromInsertion

    // we should never get here but if we do we reconstruct the response
    // passing null as the value for mSUrl
    return {
      urlName: url,
      mSUrl: null
    }
  })
)

So this is how I built the uploading functionality of the mediaserver, if you have questions or want to share some comments you can contact me through twitter or github (link to the related repo here).


Francesco Calo developing on linux in La Spezia.
Just a programming journey.

We're using cookies to do some basic tracking analytics, closing this popup you express you're OK with that.