A Data Migration Story, Part II: Implementing a Databricks Migration

Noah Kennedy
11 min readApr 27, 2021

In Part I, I presented a high-level view of my project’s architecture, and outlined the questions any data migrator should be asking themselves. I also shared some of the choices we made, along with the rationale for each decision. If you haven’t read the first part of this story yet, I would recommend checking that out before continuing.

Photo by Gunnar Ridderström on Unsplash

For the last 6 months, my team has been implementing a solution for migrating our client’s Oracle database into Databricks delta. In that time we’ve focused mainly on the following:

  1. Setting up the required cloud infrastructure for migration.
  2. Ingesting data from our S3 staging buckets into Databricks Delta tables.
  3. Writing scripts to ensure data quality, and unit test our streaming code.

For the rest of this article, I plan on discussing the above milestones, providing a closer look on what roadblocks we hit and how we overcame them. My goal is to provide more transparency on what a migration to Databricks looks like, not listing out all the steps required (a task worthy of a textbook). My college professors would describe what I’m sharing as “tribal knowledge” — knowledge learned from trial and error, wisdom not easily learned through reading documentation.

As I said in Part I, this migration is for an AWS RDS-managed Oracle Data Guard Database to Databricks Delta tables. It should be noted that milestone 1 is highly specific to our source database, while milestones 2 and 3 should be applicable to most Databricks migrations, regardless of source.

Milestone 1 — Setting up the AWS “pipes”

As I mentioned in Part I, our architecture looks a bit like this —

Generic architecture diagram for our migration.

The two Oracle blocks on the right side are already set up. We needed to configure DMS and S3 so that we could pull data from Oracle and stage it for our Autoloader jobs. We already had a default S3 bucket configured, so I’ll focus mainly on Autoloader.

Mistake #1: Looking back, we should have made a separate S3 bucket specifically for streaming and restricted all user permissions to this S3 bucket. The Databricks IAM role and the DMS IAM Role should be the only roles that can access streaming data. This would prevent the data in our S3 buckets from being accidentally deleted (you can probably guess how we found out about that vulnerability).

Configuring DMS may be simple with the right experience, but with no prior knowledge we had to learn iteratively, uncovering road blocks and solving them as we went. For those of you not familiar with DMS, here’s a brief description of the components:

  • Migration Task (The actual ‘job’)
  • DMS Instance (Basically just an EC2 instance — we used default settings)
  • Source Endpoint (In our case, our Oracle DB)
  • Target Endpoint (For Databricks, an S3 Bucket)

A Migration Task holds basic settings as well as table mappings. Here, you can achieve a very high level of granularity, choosing exactly which tables and schemas you’d like to migrate or skip, as well as adding column filters to each table/schema filter. We actually had minimal hang-ups in our task set-up, and found the base AWS tutorials to be more than sufficient. For the DMS Instance itself, we progressed with default settings, only making changes as performance required. One notable resource that helped us along the way for the Task and Instance settings was the three-part DMS debugging guide found here. The trickier parts for a DMS setup actually lie in the Endpoint configuration.

Our first roadblock was understanding that most of the task settings exist on the endpoints for DMS. Importantly, different kinds of endpoints have different available settings, and it takes a great bit of digging to understand what manipulation you can do on your endpoints. For example, a “Target S3 Endpoint” has one list of settings, while a “Source S3 Endpoint” has a different list of settings. Furthermore, an Oracle Source Endpoint has different settings than a SQL Server Source endpoint. The same is true for all other types of sources/targets, so make sure you’re reading carefully before trying to apply settings. Applying the right endpoint settings reminded me of writing CSS code — incorrect syntax threw errors, but incorrect rules were just ignored. Some notable mistakes:

Mistake #2: The proper setting for enabling inserts, updates, and deletes is on the S3 Target: ‘cdcInsertsOnly=false;’. Counterintuitively, there is no cdcUpdatesAndDeletes=true; setting.

Mistake #3: Log Miner vs. Binary Reader: We used Binary reader for many reasons, and if it fits with your architecture, I’d recommend you use it too. Log Miner was a waste of time. The proper setting for enabling binary reader on the Oracle Source: useLogMinerReader=N;useBfile=Y;’.

The second tricky part was understanding the underlying database settings that we had to enable on our source in order to allow DMS to assert certain permissions and properly read logs. This was a headache and a half, and trying to explain all the intricacies of what rules to enable is already outside of my pay grade. Luckily, AWS seems to have improved their documentation! You’re best off reading their documentation fully, however here is an overview of database settings required for enabling migration on the Source:

  • The settings you enable depend on the underlying characteristics of your source database. For example —
  • Read Replicas have different settings than on-prem or self-managed, and RDS instances have different settings than non-RDS managed databases.
  • Furthermore, if you’re using Binary Reader (recommended), the settings are slightly different than if you’re using LogMinerReader (not recommended).
  • Lastly, setting up permissions for DMS to do a bulk load is easy, but enabling permissions for CDC is trickier. Make sure you follow the steps for both.

Ultimately, the documentation is so confusing because of the underlying mechanisms that DMS uses to migrate data: you need to provide enough permissions for the DMS job to read the proper logs, while also ensuring that the database is writing the proper logs to begin with!

As I said above — a lot of this was trial and error. Some of the error didn’t come until much later, after we had migrated the entire database several times in several different file formats! Carpenters have it right: measure twice cut once. In our case, architect twice, migrate once (if you’re a novice architect like myself, best to measure three times). A few more lessons we learned, while becoming expert architects:

Mistake #4: We originally started with one target endpoint for both bulk load and streaming data. Down the line we had to separate these out into two separate target endpoints to account for differences in how we handled streamed vs initial load data.

Mistake #5: When splitting out our endpoints, we started our initial load first, assuming we could use the Oracle System Change Number (SCN) to start our streaming job “in the past” — this meant we had to re-do everything, since we couldn’t get a correct SCN number, and the archive logs for changes are only held for a max of 24 hours, less time than our initial load took. Later, we started the streaming job first, then started the bulk load. Our Autoloader code sorted out the duplicate data that came from that approach.

The above can really be summarized like this: read the documentation for your specific source, and make sure that you are following the right documentation for your exact source setup. Then, read the documentation again with your plan in hand!

Milestone 2: Ingesting Data From S3

You’ve probably heard the buzzwords “Auto Loader” and “Structured Streaming” (or Spark Streaming) thrown around a lot. I prefer the phrase Spark Streaming because it’s the most descriptive. These two phrases are synonymous, and refer to the following code block:

Code snippet from a helpful Databricks blog post.

where the .format can be anything from CSV to parquet and the settings in .option are myriad.

Auto Loader is the Databricks recommended way of getting data into delta, and streaming jobs can be as simple as 5 lines of code or as complex as 500, it really depends on the use case. Since each use case is different, I’m going to stick with some intriguing problems we encountered instead of trying to walk through several hundred lines of code.

Streaming Problem #1:

How do you run a streaming job that is listening to hundreds of tables?

We tried running one Databricks streaming job that had 600 separate spark streams. It crashed our notebooks quickly and moved unbearably slow. Next, we tried opening up one SNS/SQS task per table, which failed due to a 100-SNS/SQS limit per S3 bucket. Our final solution was initializing a single SNS/SQS on the root directory, using the binaryFile format to pull the table name, dropping the binaryFile content, and reading in the parquet file from the location in the binaryFile. This has worked pretty well over the last two months. We have minor performance issues on our 600-table job — sometimes it lags behind and struggles to catch up with the sheer volume of data, despite 48 xlarge worker nodes. If we had to do it again, I would recommend examining the option of separating out a few high-throughput tables from the main DMS job, and loading those in a separate job. For more background on our cloudFiles approach, Databricks has an example of the cloudFiles SNS/SQS here, with additional information here.

Streaming Problem #2:

What if I have a high number of tables, but a low throughput of data?

Having 48 worker nodes doesn’t mean 48 separate threads. That’s not how distributed computing works. A high number of tables requires concurrency, which we implemented using forkJoinPools on several different jobs. My understanding of a forkJoinPool is limited, however essentially it is a java library that allows you to parallelize an object like a list, so that you can run a multi-threaded map operation on the list. A quick example —

forkJoinPool example.

What we’re doing above is parallelizing a list, so that we can run an operation on 32 of the items simultaneously. This is especially useful for streaming jobs, where a high number of tables can be either prohibitively expensive, or prohibitively slow.

Streaming Problem #3:

Our data is in delta, but data reads are extremely slow. What gives?

An important step in a streaming job is optimizing your tables on write, and then setting up a cron job (or Databricks job) that re-optimizes every few hours. At a high level, z-ordering reorganizes your data, deleting the old parquet files and writing new parquet files that are sequentially ordered by the specified column (in this case, our primary key). This increases read speed by enabling data skipping — much like how a binary tree works, Spark can see the range of values that a given parquet file has, and if the ‘index’ Spark is searching for is greater than or less than the max/min ‘index’ in, it can skip the file rather than reading each record. For a whole lot more information on z-ordering, I would absolutely recommend this YouTube video, which describes z-ordering versus partitioning in Databricks.

For our job, we ran the command spark.sql(“OPTIMIZE ${TABLE} ZORDER BY ${PRIMARY_KEY}”), which dramatically improved performance on read later on. We got the primary keys to do so by passing in a SQL statement through a JDBC connector to our source database, following this example for inspiration. In order to maintain the Z-Ordering on our tables, we used Databricks Jobs to re-run a similar Optimize tables command every 2 hours. The frequency required for re-optimizing depends on the amount of data coming in.

Milestone 3: Learning to trust your new data warehouse

A step that cannot be overlooked is data quality checks. If you’re streaming from point A to point B, you need confidence in the quality and quantity of your data. The options for doing proper data quality checks are myriad and depend on considerations such as requirements, architecture, and access patterns. Below is our quality control philosophy for Spark Streaming.

Data Quantity Checks:

For data streaming, we want certainty that our stream is up and running. For a complex architecture like ours, there are a half-dozen points of failure, and the most reliable fail-safe is to ensure that the number of records on a daily basis is a close match. Our end-state goal for data quantity checks is a system that:

  • Calculates the delta between source and target for the last n number of hours.
  • Throws an alert if the delta is greater than a pre-determined threshold.
  • Does the above in a performant fashion.

The first two points are easy enough, however the performant part is difficult — counts in Oracle are cheap, but counts in Delta are expensive. We employed Z-ordering on the primary key of the table, and counted the distinct primary keys, which allowed us to count up ~25b records across 600 tables in under 10 minutes. We repeat this operation daily. Partitioning on the timestamp date of the column would allow you to do this even faster, by creating a mechanism to filter back through the last n hours rather than always checking every record in existence.

Data Quality Checks:

This is our next step. Currently, we have a general framework for how we want our data quality checks to work: random sampling of a statistically significant number of recent records in a table, hashing the record on the source and target and comparing the hash for accuracy. We’re still talking with our SMEs for best practices, but I think we’re on the right track. Just like with our data quantity checks, successful data quality will depend on Z-ordering and partitioning our tables for the proper access patterns. With Databricks Delta and any other distributed database, a key architecture step that cannot be overlooked is how to optimize your reads to make steps like quality control performant.

Conclusions

It feels like the hard parts of our migration are now behind us. With stable pipelines streaming data into Databricks, and quality control measures starting to show that this data is indeed what we think it is, the ‘vertical’ section of our learning curve is over. Now, we have solid footholds to continue making further progress. What are our next steps?

  • Continue implementing quality checks to gain total certainty in our data.
  • Begin implementing logging and monitoring on our pipeline, integrating logs with our existing ELK stack to enable simple email alerting.
  • Start transforming our data into “gold tables” that our end users can leverage.

Just like any other software engineering project, the road was a bit bumpier than anticipated and most aspects of our implementation were not perfect on the first try. Many of the mistakes I outlined above probably could have been avoided through more intentional planning and a few more sets of expert eyes. In the end, we were lucky: our implementation went largely as expected.

What would I change if I did it again? I would start by more intentionally mapping out the state of our data at each transformation. We were so focused on the structures of our system, that we often failed to look at the state of the data within. This would have helped us skip many of our errors. Instead of starting with CSV’s, we probably would have recognized that programmatically encoding the schema was extremely important and only truly feasible with parquet. By focusing on the actual data, we would have been forced to confront our access patterns, and how we our users planned on doing reads/writes.

My brother once said that real challenge of being a developer is having the awareness to zoom in/zoom out on the problem at hand without getting too caught up in the details or the big picture. Development is an almost meditative practice in that sense, requiring regular switches in mindset to make sure that you no problem steals away too much of your attention. In our case, we were often distracted by the big picture. Diving back into the details of the data a few more times probably would have worked out in our favor.

Thank you to everyone on the Deloitte engineering team as well as the Databricks staff who supported us. The hard work of countless individuals went into the successes I’ve described above, and it was a pleasure to work on such a challenging task. Thank you especially to Christian Stano who was my editor in chief and Databricks confidant for both Pt I and Pt II.

--

--

Noah Kennedy

Senior Data Engineer at Tempus AI. All opinions are my own. Easily excited, mainly by topics from dbt, to endurance sports, to pour-overs, to biotech trends.