Delta lake is an open source storage framework for building lake house architectures
on top of data lakes.
Additionally it brings reliability to data lakes with features like:
ACID transactions, scalable metadata handling, schema enforcement, time travel and many more.
Before you will continue reading please watch short introduction:
Delta lake can be used with compute engines like Spark, Flink, Presto, Trino and Hive. It also
has API for Scala, Java, Rust , Ruby and Python.
To simplify integrations with delta lake I have built a REST API layer called Yummy Delta.
Which abstracts multiple delta lake tables providing operations like: creating new delta table,
writing and querying, but also optimizing and vacuuming.
I have coded an overall solution in rust based on the delta-rs project.
Delta lake keeps the data in parquet files which is an open source,
column-oriented data file format.
Additionally it writes the metadata in the transaction log,
json files containing information about all performed operations.
The transaction log is stored in the delta lake _delta_log subdirectory.
For example, every data write will create a new parquet file.
After data write is done a new transaction log file will be created which finishes the transaction.
Update and delete operations will be conducted in a similar way.
On the other hand when we read data from delta lake at the first stage transaction
files are read and then according to the transaction data appropriate parquet files are loaded.
Thanks to this mechanism the delta lake guarantees ACID transactions.
There are several delta lake integrations and one of them is delta-rs rust library.
To be able to manage multiple delta tables on multiple stores I have built Yummy delta server which expose Rest API.
Using API we can: list and create delta tables, inspect delta tables schema, append or override data in delta tables and additional operations like optimize or vacuum.
Realtime models deployment is a stage where performance is critical.
In this article I will show how to speedup MLflow
models serving and decrease resource consumption.
Additionally benchmark results will be presented.
Before you will continue reading please watch short introduction:
The Mlflow is opensource platform which covers end to end
machine learning lifecycle
Including: Tracking experiments, Organizing code into reusable projects,
Models versioning and finally models deployment.
With Mlflow we can easily serve versioned models.
Moreover it supports multiple ML frameworks and abstracts
them with consistent Rest API.
Thanks to this we can experiment with multiple models flavors
without affecting existing integration.
Mlflow is written in python and uses python to serve real-time models.
This simplifies the integration with ML frameworks which expose python API.
On the other hand real-time models serving is a stage where
prediction latency and resource consumption is critical.
Additionally serving robustness is required even for higher load.
To check how the rust implementation will perform I have implemented
the ML models server which can read Mlflow models and expose the same Rest API.
For test purposes I have implemented integration with LightGBM
and Catboost models flavors.
Where I have used Rust bindings to the native libraries.
I have used Vegeta attack to perform load tests and measure p99 response time for
a different number of requests per seconds.
Additionally I have measured the CPU and memory usage of the model serving container.
All tests have been performed on my local machine.
The performance tests show that rust implementation is very promising.
For all models even for 1000 requests per second the response time is low.
CPU usage increases linearly as traffic increases.
And memory usage is constant.
On the other hand Mlflow serving python implementation performs much worse and for higher traffic
the response times are higher than 5 seconds which exceeds timeout value.
CPU usage quickly consumes available machine resources.
The memory usage is stable for all cases.
The Rust implementation is wrapped with the python api and available in yummy.
Thus you can simply install and run it through the command line or using python code.
pip install yummy-mlflow
importyummy_mlflow# yummy_mlflow.model_serve(MODEL_PATH, HOST, POST, LOG_LEVEL)
yummy_mlflow.model_serve(model_path,'0.0.0.0',8080,'error')
In this article I will introduce Yummy feature server implemented in Rust.
The feature server is fully compatible with Feast implementation.
Additionally benchmark results will be presented.
Before you will continue reading please watch short introduction:
Another step during MLOps process creation is features serving.
A historical feature store is used during model training to fetch a large range of entities
and a large dataset with small numbers of queries.
For this process the data fetch latency is important but not critical.
On the other hand when we serve the model features, fetching latency is crucial and determines prediction time.
That’s why we use very fast online stores like Redis or DynamoDb.
The question which appears at this point is shall we call online store directly or use feature server ?
Because multiple models can reuse already prepared features
we don’t want to add feature store dependencies to the models.
Thus we abstract an online store with a feature server which serves features
using for example REST api.
On the other hand latency due to additional layer should be minimized.
Using Feast, we can manage features lifecycle
and we can serve features using built-in features server
implemented in: python, java or go.
According to the provided benchmark Feast feature server is very fast.
But can we go faster with the smaller number of computing resources ?
To answer this question I have implemented feature server using Rust
which is known for its speed and safety.
One of the basic assumptions was to ensure full compatibility
with Feast and usage simplicity.
I have also decided to start implementation
with Redis as an online store.
To reproduce benchmark we will clone the repository:
git clone https://github.com/yummyml/feature-servers-benchmarks.git
cd feature-servers-benchmarks
For simplicity I will use docker.
Thus in the first step we will prepare all required
images: Feast and Yummy feature server, Vegeta attack load generator
and Redis.
./build.sh
Then I will use data generator to prepare dataset
apply feature store and materialize it to Redis.
./materialize.sh
Now we are ready to start the benchmark.
In contrast to the Feast benchmark where they used
sixteen feature store server instances I will perform
it with a single instance to simulate behavior
on the smaller number of resources.
The whole benchmark contains multiple scenarios like
changing number of entities, number of features or increasing
number of requests per second.
# single_run <entities> <features> <concurrency> <rps>echo"Change only number of rows"
single_run 1 50 $CONCURRENCY 10
for i in$(seq 10 10 100);do single_run $i 50 $CONCURRENCY 10;done
echo"Change only number of features"for i in$(seq 50 50 250);do single_run 1 $i$CONCURRENCY 10;done
echo"Change only number of requests"for i in$(seq 10 10 100);do single_run 1 50 $CONCURRENCY$i;done
for i in$(seq 100 100 1000);do single_run 1 50 $CONCURRENCY$i;done
for i in$(seq 10 10 100);do single_run 1 250 $CONCURRENCY$i;done
for i in$(seq 10 10 100);do single_run 100 50 $CONCURRENCY$i;done
for i in$(seq 10 10 100);do single_run 100 250 $CONCURRENCY$i;done
All results are available on GitHub but here I will limit it to p99
response time analysis for different numbers of requests.
All results were performed on my local machine
with 6 cpu cores 2.59 GHz and 32 GB of memory.
During these tests I will fetch a single entity
with fifty features using feature service.
To run Rust feature server benchmark we will run:
./run_test_yummy.sh
For Rust implementation p99 response times are stable and less
than 4 ms going from 10 requests per seconds to 100 requests per second.
For Feast following documentation
I have set go_feature_retrieval to True
in feature_store.yaml
Thus I assume that go implementation of the feature server will be used.
In this part I have used the official feastdev/feature-server:0.26.0 Feast docker image.
Again I will fetch a single entity with fifty features using feature service.
For 10 requests per second the p99 response time is 92 ms.
Unfortunately for 20 requests per seconds and above the p99 response
time is above 5s which exceeds our timeout value.
Additionally during Feast benchmark run I have noticed increasing
memory allocation which can be caused by the memory leak.
This benchmark indicates that rust implementation is very promising
because response times are small and stable,
additionally the resources consumption is low.
In this video I will show how to generate and
use graph embeddings with feature store.
Before you will continue reading please watch short introduction:
Graphs are structures, which contain sets of entity nodes and edges,
which represent the interaction between them.
Such data structures, can be used in many areas like social networks,
web data, or even molecular biology, for modeling real-life interactions.
To use properties contained in the graphs, in the machine learning algorithms,
we need to map them, to more accessible representations, called embeddings.
In contrast to the graphs, the embeddings are structures, representing the nodes features,
and can be easily used, as an input of the machine learning algorithms.
Because graphs are frequently represented by the large datasets,
embeddings calculation can be challenging. To solve this problem,
I will use a very efficient open source project,
Cleora which is entirely written in rust.
Let’s follow the Cleora algorithm. In the first step we need to determine
the number of features which will determine the embedding dimensionality.
Then we initialize the embeddings matrix. In the next step based on
the input data we calculate the random walk transition matrix.
The matrix describes the relations between nodes and is defined
as a ratio of number of edges running from first to second node,
and the degree of the first node.
The training phase is iterative multiplication of the embeddings matrix
and the transition matrix followed by L2 normalization of the embeddings rows.
Finally we get embedding matrix for the defined number of iterations.
Moreover, to be able to simply build a solution, I have extended the project,
with possibility of reading and writing to S3 store, and Apache Parquet format usage,
which significantly reduce embedding size.
Additionally, I have wrapped the rust code, with the python bindings,
thus we can simply install it and use it as a python package.
Based on the Cleora example,
I will use the Facebook dataset from
SNAP,
to calculate embeddings from page to page graph, and train a machine learning model,
which classifies page category.
In the first step, we need to prepare the input file, in the appropriate click,
or star expansion format.
# based on: https://github.com/Synerise/cleora/blob/master/example_classification.ipynb
importpandasaspdimports3fsimportnumpyasnpimportrandomfromsklearn.model_selectionimporttrain_test_splitrandom.seed(0)np.random.seed(0)df_cleora=pd.read_csv("./facebook_large/musae_facebook_edges.csv")train_cleora,test_cleora=train_test_split(df_cleora,test_size=0.2)fb_cleora_input_clique_filename="s3://input/fb_cleora_input_clique.txt"fb_cleora_input_star_filename="s3://input/fb_cleora_input_star.txt"fs=s3fs.S3FileSystem(client_kwargs={'endpoint_url':"http://minio:9000"})withfs.open(fb_cleora_input_clique_filename,"w")asf_cleora_clique,fs.open(fb_cleora_input_star_filename,"w")asf_cleora_star:grouped_train=train_cleora.groupby('id_1')forn,(name,group)inenumerate(grouped_train):group_list=group['id_2'].tolist()group_elems=list(map(str,group_list))f_cleora_clique.write("{} {}\n".format(name,' '.join(group_elems)))f_cleora_star.write("{}\t{}\n".format(n,name))forelemingroup_elems:f_cleora_star.write("{}\t{}\n".format(n,elem))
Then, we use Cleora python bindings,
to calculate embeddings, and write them as a parquet file in the s3 minio store.
For each node, I have added an additional column datetime which represents timestamp,
and will help to check how calculated embeddings, will change over time.
Additionaly every embeddings recalculation will be saved as
a separate parquet file eg. emb__CliqueNode__CliqueNode_20220910T204145.parquet.
Thus we will be able to follow embeddings history.
Now, we are ready to consume the calculated embeddings,
with Feast feature store, and Yummy extension.
In this video I will show how to use Apache Iceberg as a store for historical feature store.
Moreover we will build end to end real-time ingestion example with:
Postgres
Kafka connect
Iceberg on Minio
Feast with Yummy extension
Before you will continue reading please watch short introduction:
Apache Iceberg, is an high-performance table format, which can be used for huge analytic datasets.
Iceberg offers several features like: schema evolution, partition evolution and hidden partitioning,
and many more, which can be used to effectively process, petabytes of data.
Read more
if you want to learn more about Iceberg features and how it compares to the other lake formats (Delta Lake and Hudi).
Apache Iceberg, is perfect candidate to use as an historical store thus
I have decided to integrate it, with the Feast feature store through,
Yummy extension.
To show how to use it I will describe end to end solution with
the real-time Iceberg ingestion from the other data sources.
To do this, I will use Kafka connect, with Apache Iceberg Sink
This can be used, to build Iceberg lake on on-premise s3 store,
or to move your data and build a feature store in the cloud.
You can follow the solution in the notebook: example.ipynb
and simply reproduce using docker.
Suppose, we have our transactional system based on the postgres database,
where we keep current clients features.
We will track features changes, to build historical feature store.
The Kafka Connect, will use debezium postgres connector,
to track every data change and put it to the Iceberg using Iceberg sink.
We will store iceberg tables, on the minio s3 store,
but of course you can use AWS S3.
Kafka Connect, is based on Kafka, thus we will also need a Kafka instance and zookeeper.
We will setup selected components using docker.
To start clone the repository:
git clone https://github.com/yummyml/yummy-iceberg-kafka-connect.git
cd yummy-iceberg-kafka-connect
Then run
./run.postgres.sh
docker run -it--name postgres --rm--network=app_default \-ePOSTGRES_PASSWORD=postgres \-p 5432:5432 postgres:12.11 -cwal_level=logical
./run.zookeeper.sh
docker run -it--rm--name zookeeper --network app_default \-eZOOKEEPER_CLIENT_PORT=2181 -eZOOKEEPER_TICK_TIME=2000 \
confluentinc/cp-zookeeper:7.2.0
All below commands are already in the example.ipynb
notebook but I will explain all of them.
Kafka Connect, will publish database changes to the kafka, thus we also need to create appropriate topics,
if we don’t have topics auto-creation enabled.
I have created two topics because we will track the two postgress tables.
Now, we can setup a postgres connector, and Iceberg sink through, Kafka connect api.
In the postgres connector, we need to specify a list of tables, which we want to track.
Currently, you can use Iceberg, only with the spark backend.
You can also, add additional spark configuration, such as catalog configuration or
s3 store configuration.
In the next step, you have to add Iceberg Data Source.
In the feature store definition, you specify a path to the iceberg table or table name, which you want to consume on filesystem or s3 store respectively.
Of course, you can combine the Iceberg data source, with the other data sources like parquets, csv files or even delta lake if needed.
Here you see how to do this.
Now, we are ready to apply feature store definition, and fetch historical features.
We use cookies to ensure that we give you the best experience on our website. If you continue to use this site we will assume that you are happy with it.Ok