Graph Embeddings with Feature Store

embeddings

In this video I will show how to generate and use graph embeddings with feature store.

Before you will continue reading please watch short introduction:

Graphs are structures, which contain sets of entity nodes and edges, which represent the interaction between them. Such data structures, can be used in many areas like social networks, web data, or even molecular biology, for modeling real-life interactions.

To use properties contained in the graphs, in the machine learning algorithms, we need to map them, to more accessible representations, called embeddings.

embeddings

In contrast to the graphs, the embeddings are structures, representing the nodes features, and can be easily used, as an input of the machine learning algorithms.

Because graphs are frequently represented by the large datasets, embeddings calculation can be challenging. To solve this problem, I will use a very efficient open source project, Cleora which is entirely written in rust.

theory

Let’s follow the Cleora algorithm. In the first step we need to determine the number of features which will determine the embedding dimensionality. Then we initialize the embeddings matrix. In the next step based on the input data we calculate the random walk transition matrix. The matrix describes the relations between nodes and is defined as a ratio of number of edges running from first to second node, and the degree of the first node. The training phase is iterative multiplication of the embeddings matrix and the transition matrix followed by L2 normalization of the embeddings rows.

Finally we get embedding matrix for the defined number of iterations.

theory

Moreover, to be able to simply build a solution, I have extended the project, with possibility of reading and writing to S3 store, and Apache Parquet format usage, which significantly reduce embedding size.

theory

Additionally, I have wrapped the rust code, with the python bindings, thus we can simply install it and use it as a python package.

Based on the Cleora example, I will use the Facebook dataset from SNAP, to calculate embeddings from page to page graph, and train a machine learning model, which classifies page category.

curl -LO https://snap.stanford.edu/data/facebook_large.zip
unzip facebook_large.zip

As a s3 store we will use minio storage:

docker run --rm -it -p 9000:9000 \
 -p 9001:9001 --name minio \
 -v $(pwd)/minio-data:/data \
 --network app_default \
 minio/minio server /data --console-address ":9001"
import os 
import boto3
from botocore.client import Config

os.environ["AWS_ACCESS_KEY_ID"]= "minioadmin"
os.environ["AWS_SECRET_ACCESS_KEY"]= "minioadmin"
os.environ["FEAST_S3_ENDPOINT_URL"]="http://minio:9000"
os.environ["S3_ENDPOINT_URL"]= "http://minio:9000"

s3 = boto3.resource('s3', endpoint_url='http://minio:9000')
s3.create_bucket(Bucket="input")
s3.create_bucket(Bucket="output")
s3.create_bucket(Bucket="data")

In the first step, we need to prepare the input file, in the appropriate click, or star expansion format.

# based on: https://github.com/Synerise/cleora/blob/master/example_classification.ipynb
import pandas as pd
import s3fs
import numpy as np
import random
from sklearn.model_selection import train_test_split
random.seed(0)
np.random.seed(0)

df_cleora = pd.read_csv("./facebook_large/musae_facebook_edges.csv")
train_cleora, test_cleora = train_test_split(df_cleora, test_size=0.2)

fb_cleora_input_clique_filename = "s3://input/fb_cleora_input_clique.txt"
fb_cleora_input_star_filename = "s3://input/fb_cleora_input_star.txt"

fs = s3fs.S3FileSystem(client_kwargs={'endpoint_url': "http://minio:9000"})

with fs.open(fb_cleora_input_clique_filename, "w") as f_cleora_clique, fs.open(fb_cleora_input_star_filename, "w") as f_cleora_star:
    grouped_train = train_cleora.groupby('id_1')
    for n, (name, group) in enumerate(grouped_train):
        group_list = group['id_2'].tolist()
        group_elems = list(map(str, group_list))
        f_cleora_clique.write("{} {}\n".format(name, ' '.join(group_elems)))
        f_cleora_star.write("{}\t{}\n".format(n, name))
        for elem in group_elems:
            f_cleora_star.write("{}\t{}\n".format(n, elem))

Then, we use Cleora python bindings, to calculate embeddings, and write them as a parquet file in the s3 minio store.

Cleora star expansion training:

import time
import cleora
output_dir = 's3://output'
fb_cleora_input_star_filename = "s3://input/fb_cleora_input_star.txt"

start_time = time.time()
cleora.run(
    input=[fb_cleora_input_star_filename],
    type_name="tsv",
    dimension=1024,
    max_iter=5,
    seed=None,
    prepend_field=False,
    log_every=1000,
    in_memory_embedding_calculation=True,
    cols_str="transient::cluster_id StarNode",
    output_dir=output_dir,
    output_format="parquet",
    relation_name="emb",
    chunk_size=3000,
)
print("--- %s seconds ---" % (time.time() - start_time))

Cleora clique expansion training

fb_cleora_input_clique_filename = "s3://input/fb_cleora_input_clique.txt"
start_time = time.time()

cleora.run(
    input=[fb_cleora_input_clique_filename],
    type_name="tsv",
    dimension=1024,
    max_iter=5,
    seed=None,
    prepend_field=False,
    log_every=1000,
    in_memory_embedding_calculation=True,
    cols_str="complex::reflexive::CliqueNode",
    output_dir=output_dir,
    output_format="parquet",
    relation_name="emb",
    chunk_size=3000,
)
print("--- %s seconds ---" % (time.time() - start_time))

For each node, I have added an additional column datetime which represents timestamp, and will help to check how calculated embeddings, will change over time. Additionaly every embeddings recalculation will be saved as a separate parquet file eg. emb__CliqueNode__CliqueNode_20220910T204145.parquet. Thus we will be able to follow embeddings history.

Now, we are ready to consume the calculated embeddings, with Feast feature store, and Yummy extension.

feature_store.yaml

project: repo
registry: s3://data/registry.db
provider: yummy.YummyProvider
backend: polars
online_store:
    type: sqlite
    path: data/online_store.db
offline_store:
    type: yummy.YummyOfflineStore

features.py

from datetime import timedelta
from feast import Entity, Field, FeatureView
from yummy import ParquetSource
from feast.types import Float32, Int32

my_stats_parquet = ParquetSource(
    name="my_stats",
    path="s3://output/emb__CliqueNode__CliqueNode_*.parquet",
    timestamp_field="datetime",
    s3_endpoint_override="http://minio:9000",
)

my_entity = Entity(name="entity", description="entity",)

schema = [Field(name="entity", dtype=Int32)] + [Field(name=f"f{i}", dtype=Float32) for i in range(0,1024)]

mystats_view_parquet = FeatureView(
    name="my_statistics_parquet",
    entities=[my_entity],
    ttl=timedelta(seconds=3600*24*20),
    schema=schema,
    online=True, source=my_stats_parquet, tags={},)

Then we apply feature store definition:

feast apply

Now we are ready to fetch ebeddings for defined timestamp.

from feast import FeatureStore
import polars as pl
import pandas as pd
import time
import os
from datetime import datetime
import yummy

store = FeatureStore(repo_path=".")
start_time = time.time()

features = [f"my_statistics_parquet:f{i}" for i in range(0,1024)]

training_df = store.get_historical_features(
    entity_df=yummy.select_all(datetime(2022, 9, 14, 23, 59, 42)),
    features = features,
).to_df()

print("--- %s seconds ---" % (time.time() - start_time))
training_df

Moreover I have introduced method:

yummy.select_all(datetime(2022, 9, 14, 23, 59, 42))

which will fetch all entities.

Then we prepare training data for data for the SNAP dataset:

import numpy as np
from sklearn.model_selection import train_test_split
df = pd.read_csv("../facebook_large/musae_facebook_target.csv")

classes = df['page_type'].unique()
class_ids = list(range(0, len(classes)))
class_dict = {k:v for k,v in zip(classes, class_ids)}
df['page_type'] = [class_dict[item] for item in df['page_type']]

train_filename = "fb_classification_train.txt"
test_filename = "fb_classification_test.txt"

train, test = train_test_split(df, test_size=0.2)

training_df=training_df.astype({"entity": "int32"})

entities = training_df["entity"].to_numpy()

train = train[["id","page_type"]].to_numpy()
test = test[["id","page_type"]].to_numpy()

df_embeddings=training_df.drop(columns=["event_timestamp"])\
    .rename(columns={ f"f{i}":i+2 for i in range(1024) })\
    .rename(columns={"entity": 0}).set_index(0)

valid_idx = df_embeddings.index.to_numpy()
train = np.array(train[np.isin(train[:,0], valid_idx) & np.isin(train[:,1], valid_idx)])
test = np.array([t for t in test if (t[0] in valid_idx) and (t[1] in valid_idx)])

Finally, we will train page classifiers.

from sklearn.linear_model import SGDClassifier
from sklearn.metrics import f1_score
from tqdm import tqdm
epochs=[20]
batch_size = 256
test_batch_size = 1000
embeddings=df_embeddings
y_train = train[:, 1]
y_test = test[:, 1]

clf = SGDClassifier(random_state=0, loss='log_loss', alpha=0.0001)
for e in tqdm(range(0, max(epochs))):
    for idx in range(0,train.shape[0],batch_size):
        ex=train[idx:min(idx+batch_size,train.shape[0]),:]
        ex_emb_in = embeddings.loc[ex[:,0]].to_numpy()
        ex_y = y_train[idx:min(idx+batch_size,train.shape[0])]
        clf.partial_fit(ex_emb_in, ex_y, classes=[0,1,2,3])
    
    if e+1 in epochs:
        acc = 0.0
        y_pred = []
        for n, idx in enumerate(range(0,test.shape[0],test_batch_size)):
            ex=test[idx:min(idx+test_batch_size,train.shape[0]),:]
            ex_emb_in = embeddings.loc[ex[:,0]].to_numpy()
            pred = clf.predict_proba(ex_emb_in)
            classes = np.argmax(pred, axis=1)
            y_pred.extend(classes)

        f1_micro = f1_score(y_test, y_pred, average='micro')
        f1_macro = f1_score(y_test, y_pred, average='macro')
        print(' epochs: {}, micro f1: {}, macro f1:{}'.format( e+1, f1_micro, f1_macro))

Because feature store can merge multiple sources, we can easily enrich graph embeddings, with additional features like additional page information.

We can also track, embeddings historical changes.

theory

Moreover, using feature store we can materialize embeddings to online store, which simplifies building a comprehensive MLOps process.

You can find the whole example.ipynb on github and yummy documentation.

Real-time ingested historical feature store with Iceberg, Feast and Yummy.

iceberg

In this video I will show how to use Apache Iceberg as a store for historical feature store. Moreover we will build end to end real-time ingestion example with:

  • Postgres
  • Kafka connect
  • Iceberg on Minio
  • Feast with Yummy extension

Before you will continue reading please watch short introduction:

Apache Iceberg, is an high-performance table format, which can be used for huge analytic datasets.

Iceberg offers several features like: schema evolution, partition evolution and hidden partitioning, and many more, which can be used to effectively process, petabytes of data.

Read more if you want to learn more about Iceberg features and how it compares to the other lake formats (Delta Lake and Hudi).

Apache Iceberg, is perfect candidate to use as an historical store thus I have decided to integrate it, with the Feast feature store through, Yummy extension.

To show how to use it I will describe end to end solution with the real-time Iceberg ingestion from the other data sources.

To do this, I will use Kafka connect, with Apache Iceberg Sink This can be used, to build Iceberg lake on on-premise s3 store, or to move your data and build a feature store in the cloud.

The Kafka connect inegration is based on the article. The source code of the Iceberg sink is available on getindata/kafka-connect-iceberg-sink.

You can follow the solution in the notebook: example.ipynb and simply reproduce using docker.

architecture

Suppose, we have our transactional system based on the postgres database, where we keep current clients features. We will track features changes, to build historical feature store.

The Kafka Connect, will use debezium postgres connector, to track every data change and put it to the Iceberg using Iceberg sink.

We will store iceberg tables, on the minio s3 store, but of course you can use AWS S3.

Kafka Connect, is based on Kafka, thus we will also need a Kafka instance and zookeeper.

We will setup selected components using docker.

To start clone the repository:

git clone https://github.com/yummyml/yummy-iceberg-kafka-connect.git
cd yummy-iceberg-kafka-connect

Then run ./run.postgres.sh

docker run -it --name postgres --rm --network=app_default \
 -e POSTGRES_PASSWORD=postgres \
 -p 5432:5432 postgres:12.11 -c wal_level=logical

./run.zookeeper.sh

docker run -it --rm --name zookeeper --network app_default \
 -e ZOOKEEPER_CLIENT_PORT=2181 -e ZOOKEEPER_TICK_TIME=2000 \
 confluentinc/cp-zookeeper:7.2.0

./run.kafka.sh

docker run -it --rm --name kafka --network app_default -p 9092:9092 \
 -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092 \
 -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT \
 -e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
 confluentinc/cp-kafka:7.2.0

./run.minio.sh

docker run --rm -it -p 9000:9000 \
 -p 9001:9001 --name minio \
 -v $(pwd)/minio-data:/data \
 --network app_default \
 minio/minio server /data --console-address ":9001"

./run.connect.sh

docker run -it --name connect --rm --network=app_default -p 8083:8083 \
        -e GROUP_ID=1 \
        -e CONFIG_STORAGE_TOPIC=my-connect-configs \
        -e OFFSET_STORAGE_TOPIC=my-connect-offsets \
        -e BOOTSTRAP_SERVERS=kafka:9092 \
        -e CONNECT_TOPIC_CREATION_ENABLE=true \
        -v $(pwd)/kafka-connect-iceberg-sink/kafka-connect-iceberg-sink-0.1.3-shaded.jar:/kafka/connect/kafka-connect-iceberg-sink/kafka-connect-iceberg-sink-0.1.3-shaded.jar \
        debezium/connect:2.0

Please note that components setup is not production ready and you should use only for testing purposes.

Finally we will run the local jupyter notebooks with the local spark: ./run.yummy.sh

docker run -it -p 8887:8888 --rm --shm-size=5.09gb --name yummy \
	--network app_default \
	-v $(pwd)/notebooks:/home/jovyan/notebooks \
	qooba/yummy:v0.0.2_spark /home/jovyan/notebooks/jupyter.sh

where jupyter.sh is:

#!/bin/bash

export FEAST_USAGE=False
export PYSPARK_PYTHON=/opt/conda/bin/python3 
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=/home/jovyan --ip='0.0.0.0' --port=8888 --no-browser --allow-root --NotebookApp.password='' --NotebookApp.token=''"

#pip3 install rise

pyspark \
    --packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.13.2,org.apache.hadoop:hadoop-aws:3.3.1,software.amazon.awssdk:s3:2.17.131 \
    --conf "spark.driver.memory=5g" \
    --conf "spark.executor.memory=5g" \
    --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
    --conf "spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog" \
    --conf "spark.sql.catalog.local.type=hadoop" \
    --conf "spark.sql.catalog.local.warehouse=s3a://mybucket" \
    --conf "spark.hadoop.fs.s3a.endpoint=http://minio:9000" \
    --conf "spark.hadoop.fs.s3a.access.key=minioadmin" \
    --conf "spark.hadoop.fs.s3a.secret.key=minioadmin" \
    --conf "spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem" \
    --conf "spark.hadoop.fs.s3a.path.style.access=true" \
    --conf "spark.hadoop.fs.s3a.connection.ssl.enabled=false"

Now open the browser url: http://localhost:8887/tree/notebooks

All below commands are already in the example.ipynb notebook but I will explain all of them.

Kafka Connect, will publish database changes to the kafka, thus we also need to create appropriate topics, if we don’t have topics auto-creation enabled.

from confluent_kafka.admin import AdminClient, NewTopic


admin_client = AdminClient({
    "bootstrap.servers": "kafka:9092"
})

topic_list = []
topic_list.append(NewTopic("postgres.public.mystats_fv1", 1, 1))
topic_list.append(NewTopic("postgres.public.mystats_fv2", 1, 1))
admin_client.create_topics(topic_list)

I have created two topics because we will track the two postgress tables.

Now, we can setup a postgres connector, and Iceberg sink through, Kafka connect api.
In the postgres connector, we need to specify a list of tables, which we want to track.

import requests
import json

data = {
  "name": "postgres-connector",  
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector", 
    "database.hostname": "postgres", 
    "database.port": "5432", 
    "database.user": "postgres", 
    "database.password": "postgres", 
    "database.dbname" : "postgres", 
    "database.server.name": "postgres",
    "slot.name": "debezium",
    "plugin.name": "pgoutput",
    "table.include.list": "public.mystats_fv1,public.mystats_fv2"
  }
}

headers = { "Content-Type": "application/json" }
url="http://connect:8083/connectors"
requests.post(url, headers=headers, data=json.dumps(data))

Because debezium, has a wide range of integrations you can also use other databases like: mysql, mongodb, oracle, sql server or db2.

In the next step, we will post iceberg sink configuration, where we specify the topics to read, but also table and s3 store configuration.

import requests
import json
data = {
  "name": "iceberg-sink",
  "config": {
    "connector.class": "com.getindata.kafka.connect.iceberg.sink.IcebergSink",
    "topics": "postgres.public.mystats_fv1,postgres.public.mystats_fv2",
    "upsert": False,
    "upsert.keep-deletes": True,
    "table.auto-create": True,
    "table.write-format": "parquet",
    "table.namespace": "mytable_dbz",
    "table.prefix": "debeziumcdc_",
    "iceberg.warehouse": "s3a://mybucket",
    "iceberg.fs.defaultFS": "s3a://mybucket", 
    "iceberg.catalog-name": "mycatalog", 
    "iceberg.catalog-impl": "org.apache.iceberg.hadoop.HadoopCatalog", 
    "iceberg.fs.s3a.path.style.access": True,
    "iceberg.fs.s3a.endpoint": "http://minio:9000",
    "iceberg.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
    "iceberg.fs.s3a.access.key": "minioadmin",
    "iceberg.fs.s3a.secret.key": "minioadmin",
  }
}

headers = { "Content-Type": "application/json" }
url="http://connect:8083/connectors"
requests.post(url, headers=headers, data=json.dumps(data))

Kafka connect is ready, thus we will simulate database changes, using generated data. We will split features, into two tables.

import pandas as pd
import numpy as np
from datetime import datetime, timezone
from sklearn.datasets import make_hastie_10_2
import warnings
import psycopg2
import pandas as pd
from sqlalchemy import create_engine
warnings.filterwarnings("ignore", category=DeprecationWarning)

DATABASE_HOST='postgres';
DATABASE_USER='postgres';
DATABASE_PASSWORD='postgres';
DATABASE_NAME='postgres';

def generate_entities(size):
    return np.random.choice(size, size=size, replace=False)

def generate_data(entities, year=2021, month=10, day=1) -> pd.DataFrame:
    n_samples=len(entities)
    X, y = make_hastie_10_2(n_samples=n_samples, random_state=0)
    df = pd.DataFrame(X, columns=["f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9"])
    df["y"]=y
    df['entity_id'] = entities
    df['datetime'] = pd.to_datetime(
            np.random.randint(
                datetime(year, month, day, 0,tzinfo=timezone.utc).timestamp(),
                datetime(year, month, day, 22,tzinfo=timezone.utc).timestamp(),
                size=n_samples),
        unit="s",
    )
    df['created'] = pd.to_datetime(
            datetime.now(),
            )
    return df

entities=generate_entities(100)
alchemyEngine = create_engine('postgresql+psycopg2://postgres:postgres@postgres', pool_recycle=3600);
dbConnection = alchemyEngine.connect();

for d in range(1,15):
    data=generate_data(entities,month=1, day=d)
    fv1 = data[["entity_id", "datetime", "f0", "f1", "f2", "f3", "f4"]]
    fv2 = data[["entity_id", "datetime", "f5", "f6", "f7", "f8", "f9", "y"]]
    fv1.to_sql('mystats_fv1', dbConnection, if_exists='replace')
    fv2.to_sql('mystats_fv2', dbConnection, if_exists='replace')

The historical features, will be saved into an iceberg on minio.

minio_console.jpeg

minio

Now we are ready to fetch historical features, using feast and yummy.

To use Yummy with the Iceberg you need to install it:

pip install yummy

Then we need to prepare feature store configuration yaml.

project: example_feature_repo
registry: data/registry.db
provider: local
offline_store:
  type: yummy.YummyOfflineStore
  backend: spark
  config:
    spark.master: "local[*]"
    spark.ui.enabled: "false"
    spark.eventLog.enabled: "false"
    spark.sql.session.timeZone: "UTC"
    spark.sql.extensions: "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
    spark.sql.catalog.local: "org.apache.iceberg.spark.SparkCatalog"
    spark.sql.catalog.local.type: "hadoop"
    spark.sql.catalog.local.warehouse: "s3a://mybucket"
    spark.hadoop.fs.s3a.endpoint: "http://minio:9000"
    spark.hadoop.fs.s3a.access.key: "minioadmin"
    spark.hadoop.fs.s3a.secret.key: "minioadmin"
    spark.hadoop.fs.s3a.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
    spark.hadoop.fs.s3a.path.style.access: "true"
    spark.hadoop.fs.s3a.connection.ssl.enabled: "false"
online_store:
  path: data/online_store.db

Currently, you can use Iceberg, only with the spark backend. You can also, add additional spark configuration, such as catalog configuration or s3 store configuration.

In the next step, you have to add Iceberg Data Source. In the feature store definition, you specify a path to the iceberg table or table name, which you want to consume on filesystem or s3 store respectively.

from datetime import datetime, timezone, timedelta
from google.protobuf.duration_pb2 import Duration
from feast import Entity, Feature, FeatureView, ValueType
from yummy import IcebergDataSource

entity = Entity(name="entity_id", value_type=ValueType.INT64, description="entity id",)

fv1 = FeatureView(
    name="debeziumcdc_postgres_public_mystats_fv1",
    entities=["entity_id"],
    ttl=Duration(seconds=3600*24*20),
    features=[
        Feature(name="f0", dtype=ValueType.FLOAT), Feature(name="f1", dtype=ValueType.FLOAT),
        Feature(name="f2", dtype=ValueType.FLOAT), Feature(name="f3", dtype=ValueType.FLOAT),
        Feature(name="f4", dtype=ValueType.FLOAT), ],
    online=True,
    input=IcebergDataSource(
            path="local.mytable_dbz.debeziumcdc_postgres_public_mystats_fv1",
            event_timestamp_column="__source_ts",
    ), tags={},)

fv2 = FeatureView(
    name="debeziumcdc_postgres_public_mystats_fv2",
    entities=["entity_id"],
    ttl=Duration(seconds=3600*24*20),
    features=[
        Feature(name="f5", dtype=ValueType.FLOAT), Feature(name="f6", dtype=ValueType.FLOAT),
        Feature(name="f7", dtype=ValueType.FLOAT), Feature(name="f8", dtype=ValueType.FLOAT),
        Feature(name="f9", dtype=ValueType.FLOAT), Feature(name="y", dtype=ValueType.FLOAT), ],
    online=True,
    input=IcebergDataSource(
            path="local.mytable_dbz.debeziumcdc_postgres_public_mystats_fv2",
            event_timestamp_column="__source_ts",
    ), tags={},)

Of course, you can combine the Iceberg data source, with the other data sources like parquets, csv files or even delta lake if needed. Here you see how to do this.

Now, we are ready to apply feature store definition, and fetch historical features.

feast apply
import pandas as pd
import numpy as np
from datetime import datetime, timezone, timedelta
from feast import FeatureStore

def generate_entities(size: int):
    return np.random.choice(size, size=size, replace=False)

def entity_df(size:int = 10):
    entities=generate_entities(size)
    entity_df = pd.DataFrame(data=entities, columns=['entity_id'])
    entity_df["event_timestamp"]=datetime.now()
    return entity_df

entity_df = entity_df()
FeatureStore(".").get_historical_features(
    features=[
        "debeziumcdc_postgres_public_mystats_fv1:f0", "debeziumcdc_postgres_public_mystats_fv1:f1",
        "debeziumcdc_postgres_public_mystats_fv1:f2", "debeziumcdc_postgres_public_mystats_fv1:f3",
        "debeziumcdc_postgres_public_mystats_fv1:f4", "debeziumcdc_postgres_public_mystats_fv2:f5",
        "debeziumcdc_postgres_public_mystats_fv2:f6", "debeziumcdc_postgres_public_mystats_fv2:f7",
        "debeziumcdc_postgres_public_mystats_fv2:f8", "debeziumcdc_postgres_public_mystats_fv2:f9",
    ], entity_df=entity_df, full_feature_names=True).to_df()

Green Screen anywhere ? … sure, video matting with AI.

frog

In this article I’d like to show how to predict video matte using machine learning model.

Before you will continue reading please watch short introduction:

In the previous article I have shown how to cut the background from the image: AI Scissors – sharp cut with neural networks. This time we will generate matte for video without green box using machine learning model.

Video matting, is a technique which helps to separate video into two or more layers, for example foreground and background. Using this method, we generate alpha matte, which determine the boundaries between the layers, and allows for example to substitute the background.

Nowadays these methods, are widely used in video conference software, and probably you know it very well.

But is it possible, to process 4K video and generate a high resolution alpha matte, without green screen props ? Following the article: arxiv 2108.11515 we can achieve this using: “The Robust High-Resolution Video Matting with Temporal Guidance method”.

The authors, have used recurrent architecture to exploit temporal information. Thus the model predictions, are more coherent and this improves matting robustness.

Datasets

Moreover, their proposed new training strategy, where they use both matting (VideoMatte240K, Distinctions-646, Adobe Image Matting) and segmentation datasets (YouTubeVIS, COCO). This mixture helps to achieve better quality, for complex datasets and prevents overfitting.

Neural network architecture, consists of three elements.

Neural network architecture

The first element is Feature-Extraction Encoder, which extracts individual frames features, especially accurately locating human subjects. The encoder, is based on the MobileNetV3-Large backbone.

The second element is Recurrent Decoder, that aggregates temporal information. Recurrent approach helps to learn, what information to keep and forget by itself, on a continuous stream of video.

And Finally Deep Guided Filter module for high-resolution upsampling.

Because the authors shared their work and models, I have prepared an easy to use docker based application which we can use to simply process your video.

Application screen version 1

To run it you will need docker and you can run it with GPU or without GPU card.

With GPU:

docker run -it --gpus all -p 8000:8000 --rm --name aimatting qooba/aimatting:robust

Without GPU:

docker run -it -p 8000:8000 --rm --name aimatting qooba/aimatting:robust

Then open address http://localhost:8000/ in your browser.

Because the model does not require any auxiliary inputs such as a trimap or a pre-captured background image we simply upload our video and choose required the background. Currently we can generate green screen background which can be then replaced in the video editing software. We can also use predefined color, image or even video.

I have also prepared the app for the older algorithm version: arxiv 2012.07810

To use please run:

docker run -it --gpus all -p 8000:8000 --rm --name aimatting qooba/aimatting:background

This version additionally requires the background image but sometimes achieves better results.

Application screen version 2

Yummy - delicious Feast extension

yummy

In this article I’d like to present a really delicious Feast extension Yummy.

Before you will continue reading please watch short introduction:

Last time I showed the Feast integration with the Dask framework which helps to distribute ML solutions across the cluster but doesn’t solve other problems. Currently in Feast we have a warehouse based approach where Feast builds and executes query appropriate for specific database engines. Because of this architecture Feast can’t use multiple data sources at the same time. Moreover the logic which fetch historical features from offline data sources is duplicated for every datasource implementation which makes it difficult to maintain.

Feast

To solve this problems I have decided to create Yummy Feast extension, which is also published as a pypi package.

In Yummy I have used a backend based approach which centralizes the logic which fetches historical data from offline stores. Currently: Spark, Dask, Ray and Polars backends are supported. Moreover because the selected backend is responsible for joining the data we can use multiple different data sources at the same time.

Feast

Additionally with Yummy we can start using a feature store on a single machine and then distribute it using the selected cluster type. We can also use ready to use platforms like: Databricks, Coiled, Anyscale to scale our solution.

To use Yummy we have to install it:

pip install yummy

Then we have to prepare Feast configuration feature_store.yaml:

project: repo
registry: s3://feast/data/registry.db
provider: local
online_store:
    type: redis
    connection_string: "redis:6379"
offline_store:
    type: yummy.YummyOfflineStore
    backend: dask

In this case we will use s3 as a feature store registry and redis as an online store. The Yummy takes offline store responsibility and in this case we have selected dask backend. For dask, ray and polars backends we don’t have to set up the cluster to work. In this case if we don’t provide cluster configuration they will run locally. For Apache Spark additional configuration is required for local machines.

In the next step we need to provide feature store definition in the python file eg. features.py

from google.protobuf.duration_pb2 import Duration
from feast import Entity, Feature, FeatureView, ValueType
from yummy import ParquetDataSource, CsvDataSource, DeltaDataSource

my_stats_parquet = ParquetDataSource(path="/mnt/dataset/all_data.parquet", event_timestamp_column="datetime",)
my_stats_delta = DeltaDataSource(path="/mnt/dataset/all/", event_timestamp_column="datetime",)
my_stats_csv = CsvDataSource(path="/mnt/dataset/all_data.csv", event_timestamp_column="datetime",)

my_entity = Entity(name="entity_id", value_type=ValueType.INT64, description="entity id",)

mystats_view_parquet = FeatureView(name="my_statistics_parquet", entities=["entity_id"], ttl=Duration(seconds=3600*24*20),
    features=[
        Feature(name="f0", dtype=ValueType.FLOAT),
        Feature(name="f1", dtype=ValueType.FLOAT),
        Feature(name="y", dtype=ValueType.FLOAT),
    ], online=True, input=my_stats_parquet, tags={},)

mystats_view_delta = FeatureView(name="my_statistics_delta", entities=["entity_id"], ttl=Duration(seconds=3600*24*20),
    features=[
        Feature(name="f2", dtype=ValueType.FLOAT),
        Feature(name="f3", dtype=ValueType.FLOAT),
    ], online=True, input=my_stats_delta, tags={},)

mystats_view_csv = FeatureView(name="my_statistics_csv", entities=["entity_id"],
    ttl=Duration(seconds=3600*24*20),
    features=[
        Feature(name="f11", dtype=ValueType.FLOAT),
        Feature(name="f12", dtype=ValueType.FLOAT),
    ], online=True, input=my_stats_csv, tags={},)

In this case we have used three Yummy data sources: ParquetDataSource, DeltaDataSource, CsvDataSource. Before I have generated three data sources:

  • parquet file (/mnt/dataset/all_data.parquet)
  • delta lake (/mnt/dataset/all/)
  • csv file (/mnt/dataset/all_data.csv)

Currently Yummy won’t work with other Feast data sources like BigQuerySource or RedshiftSource.

Then we can apply our feature store definition and keep it on s3:

feast apply

Now we are ready to fetch required features from defined stores. To do this we simply run:

from feast import FeatureStore
import time

store = FeatureStore(repo_path='.')
start_time = time.time()
training_df = store.get_historical_features(
    entity_df=edf,
    features = [
        'my_statistics_parquet:f0',
        'my_statistics_parquet:f1',
        'my_statistics_parquet:y',
        'my_statistics_delta:f2',
        'my_statistics_delta:f3',
        'my_statistics_csv:f11',
        'my_statistics_csv:f12',
    ]
).to_df()
print("--- %s seconds --- " % (time.time() - start_time))
training_df

We have started with the dask backend but we can simply switch to ray changing feature_store.yaml configuration to:

project: repo
registry: s3://feast/data/registry.db
provider: local
online_store:
    type: redis
    connection_string: "redis:6379"
offline_store:
    type: yummy.YummyOfflineStore
    backend: ray

or to polars backend (which is currently the fastest option):

project: repo
registry: s3://feast/data/registry.db
provider: local
online_store:
    type: redis
    connection_string: "redis:6379"
offline_store:
    type: yummy.YummyOfflineStore
    backend: polars

we can also use spark cluster where additional configuration options are available (they are used during spark session initialization):

project: repo
registry: s3://feast/data/registry.db
provider: local
online_store:
    type: redis
    connection_string: "redis:6379"
offline_store:
    type: yummy.YummyOfflineStore
    backend: spark
    config:
        spark.master: "local[*]"
        spark.ui.enabled: "false"
        spark.eventLog.enabled: "false"
        spark.sql.session.timeZone: "UTC"

Finally we can materialize data from offline stores to online store using preferred backend:

feast materialize 2020-01-03T14:30:00 2023-01-03T14:30:00

Yummy solves several Feast limitations:

Feast

Distributed Feature Store with Feast and Dask

bubbles

In this article I will show how we combine Feast and Dask library to create distributed feature store.

Before you will continue reading please watch short introduction:

The Feature Store is very important component of the MLops process which helps to manage historical and online features. With the Feast we can for example read historical features from the parquet files and then materialize them to the Redis as a online store.

But what to do if historical data size exceeds our machine capabilities ? The Dask library can help to solve this problem. Using Dask we can distribute the data and calculations across multiple machines. The Dask can be run on the single machine or on the cluster (k8s, yarn, cloud, HPC, SSH, manual setup). We can start with the single machine and then smoothly pass to the cluster if needed. Moreover thanks to the Dask we can read bunch of parquets using path pattern and evaluate distributed training using libraries like scikit-learn or XGBoost

Feast with Dask

I have prepared ready to use docker image thus you can simply reproduce all steps.

docker run --name feast -d --rm -p 8888:8888 -p 8787:8787 qooba/feast:dask

Then check the Jupyter notebook token which you will need to login:

docker logs -f feast

And open (use the token to login):

http://localhost:8888/notebooks/feast-dask/feast-dask.ipynb#/slide-0-0

The notebook is also available on https://github.com/qooba/feast-dask/blob/main/docker/feast-dask.ipynb.

But with the docker you will have the whole environment ready.

In the notebook you will can find all the steps:

Random data generation

I have used numpy and scikit-learn to generate 1M entities end historical data (10 features generated with make_hastie_10_2 function) for 14 days which I save as a parquet file (1.34GB).

Feast configuration and registry

feature_store.yaml - where I use local registry and Sqlite database as a online store.

features.py - with one file source (generate parquet) and features definition.

The create the Feast registry we have to run:

feast apply

Additionally I have created simple library which helps to inspect feast schema directly in the Jupyter notebook

pip install feast-schema
from feast_schema import FeastSchema

FeastSchema('.').show_schema()

Feast schema

Dask cluster setup

Then I setup simple Dask cluster with scheduler and 4 workers.

dask-scheduler --host 0.0.0.0 --port 8786 --bokeh-port 8787 &

dask-worker --host 0.0.0.0 0.0.0.0:8786 --worker-port 8701 &
dask-worker --host 0.0.0.0 0.0.0.0:8786 --worker-port 8702 &
dask-worker --host 0.0.0.0 0.0.0.0:8786 --worker-port 8703 &
dask-worker --host 0.0.0.0 0.0.0.0:8786 --worker-port 8704 &

The Dask dashboard is exposed on port 8787 thus you can follow Dask metrics on:

http://localhost:8787/status

Dask dashboard

Fetching historical features

In the next step I have fetched the historical features using Feast with the Dask:

from feast import FeatureStore

store = FeatureStore(repo_path='.')
training_df = store.get_historical_features(
    entity_df=entity_df,
    feature_refs=[
        "my_statistics:f0",
        "my_statistics:f1",
        "my_statistics:f2",
        "my_statistics:f3",
        "my_statistics:f4",
        "my_statistics:f5",
        "my_statistics:f6",
        "my_statistics:f7",
        "my_statistics:f8",
        "my_statistics:f9",
        "my_statistics:y",
    ],
).to_df()
training_df

this takes about 14 seconds and is much more faster than Feast without the Dask.

Pandas
CPU times: user 2min 51s, sys: 6.64 s, total: 2min 57s
Wall time: 2min 52s

Dask
CPU times: user 458 ms, sys: 65.3 ms, total: 524 ms
Wall time: 14.7 s

Distributed training with Sklearn

After fetching the data we can start with the training. We can used fetched Pandas dataframe but we can also fetch Dask dataframe instead:

from feast import FeatureStore
store=FeatureStore(repo_path='.')
training_dd = store.get_historical_features(
    entity_df=entity_df,
    feature_refs=[
        "my_statistics:f0",
        "my_statistics:f1",
        "my_statistics:f2",
        "my_statistics:f3",
        "my_statistics:f4",
        "my_statistics:f5",
        "my_statistics:f6",
        "my_statistics:f7",
        "my_statistics:f8",
        "my_statistics:f9",
        "my_statistics:y",
    ]
).evaluation_function()

Using Dask dataframe we can continue distributed training with the distributed data. On the other hand if we will use Pandas dataframe the data will be computed to the one node.

To start distributed training with scikit-learn we can use Joblib library with the dask backend:

import joblib
from sklearn.ensemble import GradientBoostingClassifier
from dask_ml.model_selection import train_test_split

predictors = training_dd[["f0","f1","f2","f3","f4","f5","f6","f7","f8","f9"]]
targets = training_dd[["y"]]

X_train, X_test, y_train, y_test = train_test_split(predictors, targets, test_size=.3)

with joblib.parallel_backend('dask'):
    clf = GradientBoostingClassifier(n_estimators=100, learning_rate=1.0, max_depth=1, random_state=0, verbose=1).fit(X_train, y_train)
    
    score=clf.score(X_test, y_test)
    
score

Online features materialization

Finally I have materialized the data to the local Sqlite database:

feast materialize 2021-01-01T01:00:00 2021-01-31T23:59:00

In this case the materialization data is also prepared using Dask.