Skip to main content

Get the most up-to-date crypto data

· 3 min read
Stable Diffusion img2img prompt: a colorful pinwheel spinning in a pile of gold bitcoins, 3d render trending on artstation, octane beautifully detailed render, post-processing, extremely hyperdetailed
Jeremiah Cheng
JJ Brosnan
Using Apache Airflow to transform up-to-date crypto data

The biggest players in the crypto space all use AI to predict prices and manage investments. So should you. Doing so isn't as difficult as you might think.

With the right tools, AI workflows are intuitive and easy to manage.

This is the first in a six-part blog series on real-time crypto price predictions with AI. In this blog, I'll cover the use of Apache Airflow for the acquisition of up-to-date crypto data.

Throughout this series, you'll learn how to:

  1. Acquire up-to-date crypto data with Apache Airflow
  2. Implement real-time AI with TensorFlow
  3. Implement real-time AI with Nvidia RAPIDS
  4. Test the models on simulated real-time data
  5. Implement the models on real-time crypto data from Coinbase
  6. Share AI predictions with URIs

In this blog, I will walk you through the first stage of obtaining crypto data. Apache Airflow is a powerful tool for automating the acquisition of crypto data.

Automate data acquisition with Apache Airflow

To get started with Apache Airflow, follow the setup instructions. Once setup is complete, run:

airflow webserver --port 8080
airflow scheduler

This starts the Apache Airflow server.

A directed acyclic graph (DAG) is a concept in computing that is used to define the order of calculations. Apache Airflow uses DAGs to define the relationships, order, and scheduling of tasks in a workflow. Deephaven uses DAGs to organize the flow of data through queries.

The code below uses a simple Airflow DAG to fetch and transform BTC data on the Gemini exchange from cryptodatadownload. The DAG defines the order and scheduling of operations.

Click to see the code!
# Imports
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow import DAG
from datetime import datetime, timedelta
import pyarrow.parquet as pq
import pyarrow as pa
import pandas as pd
import requests
import csv

url = ""

def get_data(**kwargs):
with requests.Session() as s:
download = s.get(url)
decoded_content = download.content.decode('utf-8')
cr = csv.reader(decoded_content.splitlines(), delimiter=',')
my_list = list(cr)[2:]
return my_list

def transform_data(**kwargs):
ti = kwargs['ti']
BTC_gemini_data = ti.xcom_pull(key=None, task_ids=['fetch_btc_gemini'])[0]
now ="%m%d%Y%H%M%S")
price = []
date = []

for element in BTC_gemini_data:
time = element[1]
time = datetime.strptime(time, "%Y-%m-%d %H:%M:%S")
p = float(element[-2])

d = {"Date":date,"Price":price}
df = pd.DataFrame(data=d)
table = pa.Table.from_pandas(df)
file_name = "all_data/data"+now+".parquet"
pq.write_table(table, file_name)

default_args = {
'owner': 'jeremiah',
'start_date': datetime(2021, 10, 4, 11, 00, 00),
'concurrency': 1,
'retries': 0

with DAG('crypto',
catchup = False,
default_args = default_args,
schedule_interval = '*/1 * * * *',
# schedule_interval=None,
) as dag:
fetch_binance_ohlcv = PythonOperator(task_id = 'fetch_btc_gemini',
python_callable = get_data)
transform_data = PythonOperator(task_id = 'transform_data',
python_callable = transform_data)

fetch_binance_ohlcv >> transform_data

The workflow presented in the code is straightforward:

  • Obtain BTC data from cryptodatadownload.
  • Transform the data and save timestamps and prices to Parquet.

Airflow manages this workflow by calling it at scheduled intervals. This is managed by the schedule_interval input to the DAG. Check port 8080 in your web browser to see more details about the DAG, including number of runs, previous run, next run, interval, and more.


With the dynamic data pipeline set up, next comes the fun part: implementing machine learning models. Stay tuned for the next blog in the series where I'll implement TensorFlow and PyTorch LSTM models to predict prices.