Top Skincare Products for Glowing and Healthy Skin: Our Ultimate Guide

If the version is higher than one, the process will look for a contract matching the schema name to copy. It will select the latest one, and copy it with the new version, while, at the same time, expiring the previous contract.
In both cases, a pull request will be raised for the user to review. In the first case, the user will need to complete several fields. In the second, the user will only need to modify the proposal if a new personal data field was included or if some other information has changed (execution hour for instance).
Despite this automation, the data producer can still create a contract manually if needed. But the idea is for them to focus on the business value and data definition, creating only the schema and filling some fields inside the contract. Everything (contract proposal, deployment and scheduling) should then run automatically until the data is available and ready to be exploited in the data lake.
Ingestion process
Overview
Once the contract is created and the DAG is deployed and activated, it will trigger as defined in the contract (hourly or daily at 8am for instance). Then a Spark process written in Scala will run for each of the contracts with the following parameters:
config | Config file for the spark json |
start_ts | Start timestamp |
end_ts | End timestamp |
connector | Source type, only kafka allowed so far |
contract | Contract to be processed in format |
As the first step, the Spark process will look for all contracts matching source and name in the S3 bucket where they are stored. It will load only the contracts valid as per the start_ts and end_ts received as parameter. This is done because an event may have several versions alive at the same time.
It then connects to the Kafka topic defined in the contract and reads data for the given time frame (start_ts -> end_ts), filtering the read data by two clauses:
- A field name type, in the event must match the contract name
- A field schema version in the event must match the schema version defined in the contract
Then the data is loaded, and it’s time to clean it up. The consumer will check for personal fields in the contract, separating them to another dataFrame if they have analytical purposes or removing them directly otherwise. See the GDPR management section of this article for more information.
There is no other transformation apart from the add/delete of the column explained above. The process also lacks any kind of schema evolution thus relying on all the evolution logic described in the contract lifecycle section. Once the data is split between personal and non-personal it will be stored in the proper table partitioned by processingTime, thus ensuring idempotency and allowing consistent retries.
GDPR management considerations
Many times, when ingesting data to a data platform, we fall under the garbage-in-garbage-out paradigm, where we store as much data as possible regardless of its analytical value. This approach may avoid schema evolutions and backfills in the future. However, it also ingests more data than needed, increasing storage demands and, more importantly, management and maintenance costs.
Another issue when ingesting data to a data platform is that personal fields span across several database tables, making it difficult to confine access to authorised users. This complexity increases the time and cost of maintaining compliance, such as the right to be forgotten.
With that in mind, we defined the main goals of the GDPR management in this ingestion process as:
- Minimise ingestion of non-analytical personal information
- Store all personal information in a single partitioned table
The first goal is achieved by defining in the contract if a personal field has analytical value and removing any that don’t.
The second goal is based on a personal table per website whose schema evolves only with the approval of the security office. So if a producer has a personal field that is not yet present in said table, an assessment must take place to ensure that the field has analytical value.
Here are the details of the personal data related fields defined in the contract:
PII list | List of fields that contain personal information. Each field contains:
|
SDRN | The SDRN is an attribute that identifies a single client in Adevinta in Spain. Each client that has personal data requires an SDRN. |
user_ud | If the personal information does not belong to a client, but to an employee for instance, a user_id is used instead and SDRN is omitted |
relationKey | Field to relate each row from the personal information to their non-personal data equivalent. Usually the id of each event. |
This way, if the input data comes with personal data, two output tables will be updated (regular data and personal data). This raises a new problem: we need to make the whole write atomic. This means that if the write on the personal data fails we need to rollback both that write and the one for the regular data. In order to do that, we take advantage of the “time travel” feature from Delta Tables.
As the first step, we read the current version in the Delta history for the regular data table, before writing to that table. If there is a failure there, it will rollback thanks to Delta atomicity and the process will finish with error. If, on the other hand, it finishes successfully, we’ll go forward to write to the personal data table, capturing any error if it fails so we can time travel to the previous version in the regular data table before finishing the process.
Error management
When there is an error, both the producer and the consumer want to be aware of it, as they may need to fix it or manage the delay of the dependent processes respectively. The Data Platform team, as facilitators, need to be aware of errors in case some kind of bug in the code was promoted to production or if there is any issue with the underlying infrastructure.
In order to help the producer to analyse, find and fix the error, the process will write the input data in a special table called Quarantine.
The Quarantine table contains the following fields:
rawEvent | Input data as String |
error | Error information |
source | Source of the data |
event | Event name |
Contract version | Contract version |
processingTime | Execution time |
This table allows the producers to check if the error is due to something related to the source code, to the contract definition or some other issue. Knowing the root cause they can then perform the necessary actions to fix it, whether they are correcting the source, fixing the contract, modifying the schema or something else as required.
Once the error cause is known and fixed, the process can be rerun. Quarantine data is not re-ingested in any case, it’s only there for debugging purposes.
Monitoring and Alerting
In order to keep track of the errors, data volumetry and everything that may affect data availability there are several processes in place.
First if an Airflow DAG fails, an alert will be sent to a Slack channel, so both the producers, consumers and Data Platform team as facilitators can be aware of it, check it and deal with it in the proper way.
Second, there is a Grafana dashboard to monitor the process. It’s a public dashboard, so anyone in the company can check it, both interested producers and consumers, as well as Data Platform team members as facilitators and builders of the process.
There are three sections in the dashboard:
- Process monitor: This section contains visualisations showing the amount of data for each event in several views (per event, per owner, per source) so we can keep track of sudden changes in volumetry that may indicate problems that need to be dealt with
- Table management: This one
- helps to keep track of the tables creation and schema evolution, it’s useful to know when a table evolves
Errors: Here, the errors raised during the process will be shown. As said before, each error will result in the data being loaded in the Quarantine zone and an alert being sent to a Slack channel.
Conclusion
With this solution, we achieved most of the goals we defined at the beginning of the project:
- Data is a consumable output, source aligned and properly versioned
- All the metadata that allows the consumer to know how to consume the data (frequency, schema, personal fields) is present in an easy-to-find public document (the contract)
- Personal fields are separated from the data, making it easier to ensure access control and GDPR compliance
- A single code to ingest several sources of data in a standardised way. There is no need to code again for each new source. In fact we can extend this code with some minor changes to other types of events with differences in definition time or in the schema format.
- Table creation/update is managed outside runtime, so there is no risk to break the process doing schema evolution at execution time
- This table management technique allows us to swiftly detect errors when defining the contract and the schema
- The Producer is the owner of the whole process, without intervention from the facilitator team. Right now, there is still some kind of interaction from us when validating contracts or solving problems. This will disappear as the users grow more used to the process.