Let's copy some bytes using Airbyte Protocol and Rust

Let's copy some bytes using Airbyte Protocol and Rust

Replication Driver

Airbyte replication connectors are run as containers. So we need a container runtime like Docker Desktop.

But the driver here is agnostic to the container runtime when compared to the Official Airbyte Worker job.

The architecture differs in how the driver coordinates the replication process from Source to Destination. Driver plays two roles, Source, and Destination, and it is run inside the containers.

The driver processes inside the containers coordinate themselves through IPC (inter-process communication). They execute an initialization handshake protocol to start the Source and Destination processes.
If the process creation is successful Source driver will the start replication of data into the Destination process.

The Source driver will take care of tracking record flow and publish stats and metrics. The Destination driver will take care of state persistence.
Once the replication is complete, the drivers again coordinate the shutdown procedure through IPC and end gracefully. For the above Protocol, we need some shared volume for the process pipes. So an init command is executed at the startup.

We can run the replication using a docker compose when using Docker.

The end effect is we run just two containers. 🎉🎉

Now as the introduction is out of our way, we shall copy a file from one folder to another using the Airbyte replication.

Let's use Airbyte Connector Source File to Airbyte Connector Destination CSV

À la cp local/input.csv local/_airbyte_raw_test.csv ! 😃

First, download the sample configuration from Github repository.

git clone git@github.com:replication-rs/airbyte-replication-operator-external.git
cd airbyte-replication-operator-external

There are currently few scenarios used for testing, actually the E2E tests going forward.

cd e2e/hello-airbyte-file-to-csv/docker
docker compose up

Output should look like something below.

h7kanna@Harshas-MBP-6 docker % docker compose up
[+] Running 4/2
 â ¿ Network docker_default          Created                                                                                                                                                                               0.0s
 â ¿ Container docker-init-1         Created                                                                                                                                                                               0.0s
 â ¿ Container docker-destination-1  Created                                                                                                                                                                               0.1s
 â ¿ Container docker-source-1       Created                                                                                                                                                                               0.1s
Attaching to docker-destination-1, docker-init-1, docker-source-1
docker-init-1         | 2023-03-21T00:58:16.240494Z  INFO airbyte_replication_driver: Starting command="init"
docker-init-1         | 2023-03-21T00:58:16.240520Z  INFO airbyte::driver::init: Copying driver to /storage/airbyte-replication-driver ...
docker-init-1         | 2023-03-21T00:58:16.259013Z  INFO airbyte::driver::init: Driver successfully copied to: "/storage/airbyte-replication-driver"
docker-init-1         | 2023-03-21T00:58:16.259107Z  INFO airbyte::driver::init: Permissions set successfully to: "/storage/airbyte-replication-driver"
docker-init-1 exited with code 0
docker-source-1       | 2023-03-21T00:58:16.613952Z  INFO airbyte_replication_driver: Starting command="source"
docker-destination-1  | 2023-03-21T00:58:16.615943Z  INFO airbyte_replication_driver: Starting command="destination"
docker-destination-1  | 2023-03-21T00:58:16.616358Z  INFO airbyte::driver::pipes: Error while removing named pipes /pipes
docker-destination-1  | 2023-03-21T00:58:16.616390Z  INFO airbyte::driver::pipes: Created "/pipes/source"
docker-destination-1  | 2023-03-21T00:58:16.616424Z  INFO airbyte::driver::pipes: Created named pipe "/pipes/source/stdin.pipe"
docker-destination-1  | 2023-03-21T00:58:16.616432Z  INFO airbyte::driver::pipes: Created named pipe "/pipes/source/stdout.pipe"
docker-destination-1  | 2023-03-21T00:58:16.616438Z  INFO airbyte::driver::pipes: Created named pipe "/pipes/source/stderr.pipe"
docker-destination-1  | 2023-03-21T00:58:16.616458Z  INFO airbyte::driver::pipes: Created "/pipes/destination"
docker-destination-1  | 2023-03-21T00:58:16.616468Z  INFO airbyte::driver::pipes: Created named pipe "/pipes/destination/stdin.pipe"
docker-destination-1  | 2023-03-21T00:58:16.616475Z  INFO airbyte::driver::pipes: Created named pipe "/pipes/destination/stdout.pipe"
docker-destination-1  | 2023-03-21T00:58:16.616482Z  INFO airbyte::driver::pipes: Created named pipe "/pipes/destination/stderr.pipe"
docker-destination-1  | 2023-03-21T00:58:16.616489Z  INFO airbyte::driver::pipes: Created named pipe "/pipes/replication.pipe"
docker-destination-1  | 2023-03-21T00:58:16.616497Z  INFO airbyte::driver::destination: Replication started at Instant { tv_sec: 42417, tv_nsec: 705905322 }
docker-source-1       | 2023-03-21T00:58:16.616157Z  INFO airbyte::driver::source: /pipes/destination/stdin.pipe is not ready No such device or address (os error 6)
docker-source-1       | 2023-03-21T00:58:16.616379Z  INFO airbyte::driver::source: Waiting for destination
docker-source-1       | 2023-03-21T00:58:17.621428Z  INFO airbyte::driver::source: Waiting for destination client to connect
docker-source-1       | 2023-03-21T00:58:17.623848Z  INFO airbyte::driver::source: Destination client connected!
docker-destination-1  | 2023-03-21T00:58:17.621428Z  INFO airbyte::driver::destination: Destination AIRBYTE_ENTRYPOINT /airbyte/base.sh
docker-destination-1  | 2023-03-21T00:58:17.621456Z  INFO airbyte::driver::destination: Destination command /airbyte/base.sh
docker-destination-1  | 2023-03-21T00:58:17.621542Z  INFO airbyte::driver::destination: Destination output tracker spawned
docker-destination-1  | 2023-03-21T00:58:17.622338Z  INFO airbyte::driver::destination: Servery key /tmp/.tmpqjfsIU/socket
docker-destination-1  | 2023-03-21T00:58:17.622490Z  INFO airbyte::driver::destination: Client initiated
docker-source-1       | 2023-03-21T00:58:17.640206Z  INFO airbyte::driver::source: Source AIRBYTE_ENTRYPOINT python /airbyte/integration_code/main.py
docker-source-1       | 2023-03-21T00:58:17.640242Z  INFO airbyte::driver::source: Source command python
docker-source-1       | 2023-03-21T00:58:17.642231Z  INFO airbyte::driver::source: Source output tracker spawned
docker-destination-1  | 2023-03-21T00:58:18.074393Z  INFO airbyte::driver::destination: AirbyteMessage { catalog: None, connection_status: None, control: None, log: Some(AirbyteLogMessage { level: Info, message: "integration args: {catalog=/secrets/catalog-file.json, write=null, config=/secrets/destination-file.json}", stack_trace: None }), record: None, spec: None, state: None, trace: None, type_: Log }
docker-destination-1  | 2023-03-21T00:58:18.074576Z  INFO airbyte::driver::destination: AirbyteMessage { catalog: None, connection_status: None, control: None, log: Some(AirbyteLogMessage { level: Info, message: "Running integration: io.airbyte.integrations.destination.csv.CsvDestination", stack_trace: None }), record: None, spec: None, state: None, trace: None, type_: Log }
docker-destination-1  | 2023-03-21T00:58:18.074800Z  INFO airbyte::driver::destination: AirbyteMessage { catalog: None, connection_status: None, control: None, log: Some(AirbyteLogMessage { level: Info, message: "Command: WRITE", stack_trace: None }), record: None, spec: None, state: None, trace: None, type_: Log }
docker-destination-1  | 2023-03-21T00:58:18.077523Z  INFO airbyte::driver::destination: AirbyteMessage { catalog: None, connection_status: None, control: None, log: Some(AirbyteLogMessage { level: Info, message: "Integration config: IntegrationConfig{command=WRITE, configPath='/secrets/destination-file.json', catalogPath='/secrets/catalog-file.json', statePath='null'}", stack_trace: None }), record: None, spec: None, state: None, trace: None, type_: Log }
docker-destination-1  | 2023-03-21T00:58:18.206642Z  INFO airbyte::driver::destination: AirbyteMessage { catalog: None, connection_status: None, control: None, log: Some(AirbyteLogMessage { level: Info, message: "initializing consumer.", stack_trace: None }), record: None, spec: None, state: None, trace: None, type_: Log }
docker-source-1       | 2023-03-21T00:58:18.619138Z  INFO airbyte::driver::source: Replication complete
docker-source-1       | 2023-03-21T00:58:18.619162Z  INFO airbyte::driver::source: exit status: 0
docker-destination-1  | 2023-03-21T00:58:18.651731Z  INFO airbyte::driver::destination: AirbyteMessage { catalog: None, connection_status: None, control: None, log: Some(AirbyteLogMessage { level: Info, message: "Airbyte message consumer: succeeded.", stack_trace: None }), record: None, spec: None, state: None, trace: None, type_: Log }
docker-destination-1  | 2023-03-21T00:58:18.651797Z  INFO airbyte::driver::destination: AirbyteMessage { catalog: None, connection_status: None, control: None, log: Some(AirbyteLogMessage { level: Info, message: "finalizing consumer.", stack_trace: None }), record: None, spec: None, state: None, trace: None, type_: Log }
docker-destination-1  | 2023-03-21T00:58:18.655133Z  INFO airbyte::driver::destination: AirbyteMessage { catalog: None, connection_status: None, control: None, log: Some(AirbyteLogMessage { level: Info, message: "File output: /local/_airbyte_raw_test.csv", stack_trace: None }), record: None, spec: None, state: None, trace: None, type_: Log }
docker-destination-1  | 2023-03-21T00:58:18.655787Z  INFO airbyte::driver::destination: AirbyteMessage { catalog: None, connection_status: None, control: None, log: Some(AirbyteLogMessage { level: Info, message: "Completed integration: io.airbyte.integrations.destination.csv.CsvDestination", stack_trace: None }), record: None, spec: None, state: None, trace: None, type_: Log }
docker-destination-1  | 2023-03-21T00:58:18.687422Z  INFO airbyte::driver::destination: Tracking complete
docker-destination-1  | 2023-03-21T00:58:18.687538Z  INFO airbyte::driver::destination: exit status: 0
docker-destination-1  | 2023-03-21T00:58:18.687551Z  INFO airbyte::driver::destination: Replication completed in 0 minutes
docker-source-1 exited with code 0
docker-destination-1 exited with code 0

Let's verify

chmod +x assert.sh && ./assert.sh

Replication successful.
Please check ./local/_airbyte_raw_test.csv

Cleanup

docker compose down && rm -rf ./local/_airbyte_raw_test.csv

More example compose files here

Replication Operator

Again, as Airbyte replications are run as containers, we can run them using any Container platform.

My plan is to support running on various platforms like Amazon ECS, Fargate, etc.

But for our darling Kubernetes, Operator component is a Kubernetes Operator which runs the above driver as a Pod and takes care of the lifecycle of one replication.

Idea is that higher-level Orchestrators can use this as a building block to provide features like scheduling, configuration management, UI, etc.

And, Kubernetes is one of the pluggable storage options to store the replication state.

As an example, KubeVela can be used as that higher-level workflow engine. Check
details here.

A video demo is here.

Drivers

CLI

Driver has a CLI interface for some useful operations to view the State and Progress of replications.

CLI also provides some scheduling(cron) and config management capability for simple use cases.

And in the future, envisioned Airbyte Desktop, a way to store your personal data using Airbyte replications in your own Data Lake.

For all commands

./airbyte-replication-driver --help

Example: Check the status of the replication

./airbyte-replication-driver --command state --replication hello-airbyte-file-to-csv --store-path tmp

Finally

Hopefully, the above tutorial has helped you understand the usage examples of the replication Driver/Operator combo.

And more importantly, the value proposition.

So let us start contributing to the awesome Airbyte ecosystem in a unique way.

Shall we?

Check the Roadmap here.

Your feedback is much appreciated.

Yours truly,

Harsha Teja Kanna