How We Migrated All User Files to AWS S3

Migrating data is often a challenging job, especially when every little mistake can instantly impact a user’s experience. Thus, it is necessary to research the task properly, pinpoint possible issues and make sure the description is totally clear and everyone knows the expected result. Nevertheless, errors may (and most probably will) occur during the process; it is important to keep that in mind, and to design the solution in a way that anticipates those errors and handles them accordingly. In an ideal scenario, we want to complete the migration process without anyone even noticing that something has changed.

In this post, we will take a look at one such situation, which we've dealt with on one of our projects. The assignment from the manager was: “We need to migrate all user data from MS Azure to AWS. The reason was to gradually get rid of technical debt and move everything under one roof. And as we'd already embraced AWS for all of our new services, the choice was obvious.

INSPECTING THE TASK

As already mentioned, a properly defined task is a prerequisite for a successful result. Let’s do this now. The project we are migrating data in is a dating app, so there are many user accounts and each has a number of good-looking photos. Additionally, on each photo upload, we generate and store five different versions for performance (large, medium, small) and privacy (if a user doesn’t have active premium) reasons, plus the originally uploaded one. With this approach in place, it might easily happen that one can have dozens of different files associated with the account (and that's not even mentioning other files, such as verification photos, etc.). Now that we know what to migrate, let’s make sure we don’t miss any details.

DESIGNING A SOLUTION

Our solution consists of two main points.

  • Start uploading new files to AWS S3 instead of Azure
  • Migrate already existing files to AWS S3

Uploading New Files to S3

This step is quite simple to achieve. We are using pre-signed URLs for uploading files. This way, the API generates a URL which is then read by a client app and subsequently uploads a file directly to a cloud provider via this URL. As a result, the whole uploading process is handled by the cloud provider and we don’t have to deal with anything.

It’s also important to mention that we use Heroku for running our API. Heroku uses so-called dynos, which are mortal, so it could easily happen that those instances get restarted when a file is uploading—resulting in a failed upload and a lost file. Not good. This is not a problem specific to Heroku but, rather, to all cloud providers that run managed applications. You should therefore always strive for designing them to run in a stateless way, ensuring that any unexpected restarts will not do any damage.

TL;DR: All we need to do to redirect all new photos to be uploaded to the new location is just change the library (use aws-sdkinstead of azure-storage) for generating these URLs. That’s it.

Migration Script

  • First, we need to select users for whom we want to migrate data. This sounds simple, but some users are already deleted, don’t have any photos uploaded or have already been migrated, so we want to eliminate those records. Regarding the latter, it’s good practice to implement a flag column so we know which rows have already been migrated. For this purpose, we add a new column: photos_migrated_at .
  • Once we have a list of users, we get an array of all stored files and download all of them one by one from the original storage.
  • Next, we upload those downloaded files to the new location—in our case, AWS S3. (Note that we used Node.js’s Streams, which is much more memory efficient as we don’t need to save the whole file before the actual upload can start.)
export const migrateFileToAWS = async (fileUrl, newDestinationPath) => {
  // s3.upload() accepts readable stream for the Body parameter,
  // but calling request().get() doesn't provide a true readable stream therefore
  // we need to use PassThrough() stream
  // See the full explanation here https://github.com/aws/aws-sdk-js/issues/2100#issuecomment-398534493
  const fileWriteStream = new PassThrough()
  
  request(fileUrl) // download file via request library
    .pipe(fileWriteStream) // pipe the download response stream 
  
  await s3
    .upload({ // upload to S3
      Body: fileWriteStream, // pass the stream here
      Bucket: 'migrated-files-bucket',
      Key: newDestinationPath,
    })
    .promise()
  return newDestinationPath
}
  • Once we have all files uploaded to the new location, the last step is to update the database to point to those new URLs. This is as simple as one SQL update query. In our case, we also needed to update Firebase, so this means one extra call.

After writing down all the steps, the following is a simplified version of the migration script. We loop through all the users in batches (pagination) and process files for each given user if he/she has any URLs to migrate. (Note that we also check if a file is already stored in S3 before trying to migrate it — isAWSUrl().)

const migrateIfNeededAndReturnUrl = async photoUrl => {
    if (services.storage.isAWSUrl(photoUrl)) {
        return photoUrl // Skipping, this file is already in AWS
    }

    try {
        const migratedUrl = await services.storage.migrateFileToAWS(photoUrl)
        return migratedUrl
    } catch (e) {
        // if upload fails, just return the old url so we don't break anything
        // Also, here is a good point to mark this attempt as failed to process it later again
        return photoUrl
    }
}

const migratePhotos = async photos => Promise.all(photos.map(async photo => {
    photo.original = await migrateIfNeededAndReturnUrl(photo.original)
    // do the similar for the rest
    // ...
    return photo
}))

let lastId = null
while (true) {
    const profiles = await repositories.profile.getProfilesToMigrate({ limit: 50, lastId })
    if (!profiles.length) {
        break
    }

    for (const profile of profiles) {
        if (profile.photos) {
            profile.photos = await migratePhotos(profile.photos)
        }

        // update database row - photos and photos_migrated_aws flag
        await repositories.profile.updateMigratedPhotos(profile.id, profile.photos)

        // update photos in Firebase
        await services.firebase.updateProfilePhotos({
            id: profile.id,
            photos: profile.photos,
        })
    }
}


This code works fine, but there is one issue. Remember when we said that for each photo, we actually need to store five different files? Now, if you look at the code, you may notice that we try to migrate all of them asynchronously at once using Promise.all. Imagine that a user has 10 photos. If we were to migrate all of them simultaneously, it would create 50 pending promises(!). This could get even worse if we decided to migrate all of the users in the current batch asynchronously as well (we use a for-of loop, which makes the processing synchronous).

The more pending promises, the more memory used and open network calls—which is not good for migration on such a big scale. We can mitigate this by limiting the max allowed number of spawned promises at once by a simple helper function, where we combine both the synchronous and asynchronous approaches by splitting an array into smaller chunks, iterating through them synchronously and runningPromise.all only on those smaller parts.


import chunk from 'lodash.chunk'

export const processInChunks = async (array, handlerFn, { chunkSize = 5 } = {}) => {
  const result = []
  for (const dataChunk of chunk(array, chunkSize)) {
    result.push(...await Promise.all(dataChunk.map(handlerFn)))
  }
  return result
}

const migratePhotos = async photos => processInChunks(photos, async photo => {
    photo.original = await migrateIfNeededAndReturnUrl(photo.original)
    // do the similar for the rest
    // ...
    return photo
})

This way, a code will inherently take more time to complete but will be much more efficient. It is a tradeoff here, and you always have to decide what works better in your case and what is more important to you: time or resources.

Note: As I learned later, there is something very similar to what we came up with already implemented in the popular library Bluebird.

LAUNCHING THE MIGRATION

Since we use Heroku, the easiest and most straightforward method is to run a new, separate process. This is as easy as adding a new line to your Procfile.

data-migration: node ./scripts/migrate-user-files.js

Cool, the migration script is running. After a few checks to verify that everything works as expected, we can finally relax, enjoy our cup of coffee and watch how the database is slowly populating with the updated records.

If you are thinking that sounds too easy, you're right. When I initially thoughts it'd be “slowly populating,I hadn't really realized how slow it could actually be. After running the script for a whole day, we ran some statistics queries and quickly realized the script wasn’t performing well. We found out it was processing only about 700 profiles per hour which, after doing simple math, meant that it would have needed to run for several months (!!!) to process all the data. This was run on Heroku standard-1x dyno with 512MB of RAM and a price of $25/month. We decided to try it with a larger instance, standard-2x, that offers 1GB of RAM for $50/month. After upgrading the instance and running the script for another day, we saw a slight improvement in terms of numbers... but it still wasn’t good enough.

RETHINKING THE SOLUTION

At this point, we knew we had to revise our solution and come up with a more performant alternative. If one process can handle 100 users, two workers should process twice as much, right? With that idea, we decided to run multiple workers in parallel. But that's when it gets much more complex, as we need a way to make sure multiple parallel processes don’t touch the same portion of data.

The plan we came up with consisted of one master and several worker processes. For spawning multiple worker processes, we used Node.js’s cluster. The job of the master process is to spawn worker processes and then listen for their requests to fetch a batch of data. The master then queries the database in a lock, which assures only one request at a time will be handled; this way, we avoid multiple workers getting the same data. Once the workers get the batch of data, they start processing it asynchronously. Anytime a worker is done, it just asks for the next payload.

With this change, the script was able to process roughly 4000 profiles an hour. Yay, this looked very promising... but only until you do the math again and still don’t get satisfying numbers. It would have still required a couple of months to finish. Since we had the script ready for parallel processing, the next step was to just take bigger advantage of it and try to scale it more.

BYE HEROKU. HELLO AGAIN, AWS.

As we could see, Heroku is not very generous when it comes to scaling options and money. We didn’t want to pay this absurd amount of money for even more ridiculous offers (I mean, 512MB for 25 bucks? Come on, it’s 2021!)

That’s why we decided to run an EC2 instance in the AWS cloud instead. We opted for t2.mediumwith 4GB of RAM for the price of ~$33/month ($0.0464/hour). We increased the number of workers to 6. After running it for another day, we were astonished. In this configuration, we were able to process almost 25k profiles within an hour. Whoa, that was what we'd been looking for. The whole script didn’t even reach half of the available memory so, theoretically, we could have tried to add even more workers—but we were happy enough with the current solution and speed.

Another idea was to enable the so-called VPC endpoint for S3. VPC allows routing all the traffic between EC2 instances and S3 storage within the internal AWS network, without the need to hit the public internet. This would obviously help to reduce the total time even more but as I said, we didn’t need any further optimizations.

Screen-Shot-2021-01-26-at-2.11.00-PM

ORCHESTRATING PARALLEL PROCESS

Introducing parallel processing saved us a lot of time and sped things up significantly, but it also brought more complexity into the code. While developing the solution, we found it to be useful in other places or similar situations as well; that’s why we eventually decided to offload the logic into a standalone open-source npm package.

The package works on the idea of master and worker processes, and it takes care of a few other useful concepts—such as retries, dead-letter queues and exited processes.

How it works

Coming from a real-world scenario, this tiny library allows you to process lots of data much faster by introducing parallelism. It spawns multiple workers which access an assigned portion of data in the database simultaneously, so you can process much more data at the same time without worrying about synchronization or duplicate processing.

Using the package is as easy as defining two functions.

setFetchNext(async lastId => { ... })

This function defines the way of fetching the next payload from a database. You provide a callback function as a parameter where you can specify how to fetch data. It is always called just once at a time to make sure it won’t return the same portion of data multiple times—which would mean duplicate processing. It gives you a parameter lastId, which can be thought of as a pointer for the next database fetch.

setHandler(async ({ lastId, ...payload }) => { ... })

The second function you need to provide is used for specifying your business logic for processing an assigned range of data from a database. You always operate on data in a variable payload, which gives you secure access to the reserved data portion in the database and makes sure no one else will touch this.

Screen-Shot-2021-01-26-at-2.15.30-PM

Now, when we know how the library is used, here is a rewritten example of the code above:

const limit = 50
const worker = new ParallelWorker({
    redis: services.redis.client,
})

worker.setLoadNextRange(async lastId => {
    const batch = await repositories.verification.getProfilesToMigrate({ limit: 50, lastId })
    if (!batch.length) {
        // Returning null signalizes there is no more data and worker should stop
        return null
    }
    const idsRange = batch.map(item => item.id)

    return {
        // Return last ID from fetched rows as a pointer for next iteration
        // This field is required!
        lastId: _.last(idsRange),
        // You can also add any additional payload data that will be available in setHandler callback
        idsRange,
    }
})

worker.setHandler(async ({ lastId, idsRange }) => {
    const profiles = await repositories.profile.getProfilesToMigrate({ limit: 50, lastId })
    for (const profile of profiles) {
        if (profile.photos) {
            profile.photos = await migratePhotos(profile.photos)
        }

        // update database row - photos and photos_migrated_aws flag
        await repositories.profile.updateMigratedPhotos(profile.id, profile.photos)

        // update photos in Firebase
        await services.firebase.updateProfilePhotos({
            id: profile.id,
            photos: profile.photos,
        })
    }
})

// You can listen for events and act accordingly if needed
worker.on(ParallelWorkerEvent.beforeStop, async () => {
    await Promise.all([
        services.redis.close(),
        services.db.close(),
    ])
})

worker.start()

SEE SURGEAPP/PARALLEL-WORKER ON GITHUB

SUMMARY

The whole process of migration from the initial idea until the last migrated user lasted nearly two months. During this time, we defined what was necessary to do, how we wanted to tackle it, and we designed the first draft. Quite quickly, we encountered some bugs or edge-cases for which the script didn’t operate properly—so further fixing and polishing was required. We learned that Heroku is a very good choice for getting started quickly and without much of a configuration, but this comes with a significant drawback in terms of scaling or further development, as your hands are tied tightly and you don’t have much control. It’s important to keep that in mind and to switch to something more appropriate that gives you better control (e.g. AWS, GCP, …) as soon as your project starts growing.

We had to deal with a kind of “big data” (even though not really), in which case the original idea might not be always a good fit and you need to take into account the amount of data you're looking to process. What works well for 10s, 100s or 1000s of users (rows, data, basically whatever) might naturally not work the same way for millions.

On such a big scale, errors are also more likely to happen. While you can never avoid them fully, you can design your code in a way that embraces it and acts accordingly. As already stated, you can implement retry mechanisms, which should be able to repeat the failed task, dead-letter queues, for keeping the track of erroneous tasks, handling unexpected exits gracefully so you won’t lose data, etc.

BONUS TIP

If you read all the way to here, I really appreciate it—so here you are, one extra bonus tip that we learned the hard way. If you use pm2 process manager, be aware it stores logs by default. It might sound obvious, but it didn’t to us and we ended up in a situation like this:

Screen-Shot-2021-01-26-at-2.28.12-PM-1
RAM utilization on our EC2 instance

As you can see, when the script was running normally it used around 2GB of memory but then, one day and all of a sudden, this big spike occurred. After a little bit of investigation, we learned that pm2 stores all the logs on a disk which, after a few days, resulted in draining all the memory on the instance. The solution was to just delete those logs and add a configuration option to not save anything. After fixing this, the script continued working perfectly until everything got migrated.

P.S.: All of these drops and raises indicate worker processes being killed and respawned, respectively.

Further reading

Share Article
Jozef Cipa

Jozef Cipa

Backend Developer & AWS Solutions Architect

You might also like...