4 min read

How to schedule R script with Docker and Apache Airflow

As a data analyst, you may be required to send report as email at a regular basis. In R, you may use cronR to achieve this. It’s simple and easy to use. But today I will introduce Apache Airflow (written in python) to schedule R scripts as an alternative.

We will use Apache Airflow to schedule a task to start a container to run your R scripts. Here is the steps:

  • Run Apache Airflow in docker
  • Prepare your R docker image
  • Write the dag file (Python) and R script

Run Apache Airflow in docker

Apache Airflow is written in Python, so it is better to know a little Python. We will run Apache Airflow in docker in linux. So you will need docker installed.

I will use my custom Apache Airflow. It’s a forked repository of puckel’s.

You should build your custom image too. Because you will need to change the dockerfile. You can check my dockerfile.

The key to allow Apache Airflow (which run as a container) to start a docker container is the following line

groupadd --gid 119 docker

I borrowed it from other’s dockerfile. Sorry, I didn’t remember the details. You should change the groud id of the docker file, it 119 in mine. Run id command and you will get it (you may see something like 119(docker) ).

After buiding your image, pull it and run Apache Airflow

docker pull shizidushu/docker-airflow:1.10.2

Then you deploy Apache Airflow. I use docker swarm. Here is the yml file. I omnit some details and you should edit it according to your own need. You may need to read Apache Airflow’s document.

version: "3.7"
services:
  redis:
    image: redis:5-alpine
    ports:
      - "6379"
    networks:
      - apache-airflow-overlay
    command: redis-server
    deploy:
      replicas: 1
      update_config:
        parallelism: 2
        delay: 10s
      restart_policy:
        condition: on-failure
  
  postgres:
    image: postgres:9.6
    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow
      - PGDATA=/var/lib/postgresql/data/pgdata
    volumes:
      - ../persistent-data/airflow/pgdata:/var/lib/postgresql/data
    networks:
      - apache-airflow-overlay
    deploy:
      placement:
        constraints: [node.role == manager]

  webserver:
    image: shizidushu/docker-airflow:1.10.2
    depends_on:
      - postgres
      - redis
    networks:
      - apache-airflow-overlay
    environment:
      - DOCKER_VOLUME_BASE=/home/airflow/airflow-in-docker/data_volume
      - LOAD_EX=n
      - AIRFLOW_HOME=/usr/local/airflow
      - FERNET_KEY=E2XgDaOqfPVC51gE2XgDaOqfPVC51gE2XgDaOqfPVC51g
      - EXECUTOR=Celery
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      - /usr/bin/docker:/usr/bin/docker
      - ./dags:/usr/local/airflow/dags
    ports:
      - target: 8080
        published: 8080
        mode: host
    deploy:
      placement:
        constraints: [node.role == manager]
    command: webserver

  flower:
    image: shizidushu/docker-airflow:1.10.2
    depends_on:
      - postgres
      - redis
    networks:
      - apache-airflow-overlay
    environment:
      - DOCKER_VOLUME_BASE=/home/airflow/airflow-in-docker/data_volume
      - LOAD_EX=n
      - AIRFLOW_HOME=/usr/local/airflow
      - FERNET_KEY=E2XgDaOqfPVC51gE2XgDaOqfPVC51gE2XgDaOqfPVC51g
      - EXECUTOR=Celery
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      - /usr/bin/docker:/usr/bin/docker
      - ./dags:/usr/local/airflow/dags
    ports:
      - target: 5555
        published: 5555
        mode: host
    deploy:
      replicas: 1
      restart_policy:
        condition: on-failure
      placement:
        constraints: [node.role == manager]
    command: flower

  scheduler:
    image: shizidushu/docker-airflow:1.10.2
    depends_on:
      - postgres
      - redis
      - webserver
    networks:
      - apache-airflow-overlay
    environment:
      - DOCKER_VOLUME_BASE=/home/airflow/airflow-in-docker/data_volume
      - LOAD_EX=n
      - AIRFLOW_HOME=/usr/local/airflow
      - FERNET_KEY=E2XgDaOqfPVC51gE2XgDaOqfPVC51gE2XgDaOqfPVC51g
      - EXECUTOR=Celery
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      - /usr/bin/docker:/usr/bin/docker
      - ./dags:/usr/local/airflow/dags
    deploy:
      replicas: 1
      restart_policy:
        condition: on-failure
      placement:
        constraints: [node.role == manager]
    command: scheduler

  worker:
    image: shizidushu/docker-airflow:1.10.2
    depends_on:
      - postgres
      - redis
      - webserver
      - scheduler
    networks:
      - apache-airflow-overlay
    environment:
      - DOCKER_VOLUME_BASE=/home/airflow/airflow-in-docker/data_volume
      - LOAD_EX=n
      - AIRFLOW_HOME=/usr/local/airflow
      - FERNET_KEY=E2XgDaOqfPVC51gE2XgDaOqfPVC51gE2XgDaOqfPVC51g
      - EXECUTOR=Celery
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      - /usr/bin/docker:/usr/bin/docker
      - ./dags:/usr/local/airflow/dags
    deploy:
      replicas: 2
      restart_policy:
        condition: on-failure
    command: worker

networks:
  apache-airflow-overlay:
    driver: overlay
    external: true

Notice I set an environment variable DOCKER_VOLUME_BASE which I use it to refer the location of my r script. You can omit it if you prefer.

After deploy Apache Airflow, it is time to write the dag file which defines the schedule job. And you also need to prepare a R docker image to run your R script. I use a docker image based on rocker/r-ver.

Here is an example of Dag file.

from airflow import DAG
from airflow.operators.docker_operator import DockerOperator
from datetime import datetime, timedelta
import os


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 1, 22, 1),
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=15),
}


dag = DAG(
        dag_id ='r-script',
        default_args= default_args,
        schedule_interval='5 1 * * *')
        
test = DockerOperator(
        api_version='auto',
        image='rocker/r-ver',
        network_mode='bridge',
        volumes=[os.path.join(os.environ['DOCKER_VOLUME_BASE'], 'input', 'rscript') + ':/root/rscript'],
        command='Rscript script.R -d "{{ next_execution_date }}"',
        task_id='run-r-script',
        working_dir = '/root/rscript',
        dag=dag)

The next_execution_date is a parameter passed to R script. Please check Apache Airflow’s document for more.

Here is an example of R script:

library(optparse)

# parse arguments
## refer to https://www.r-bloggers.com/passing-arguments-to-an-r-script-from-command-lines/
option_list <- list(
  make_option(opt_str = c("-d", "--date"), type="character", help="Execution date")
)
opt_parser = OptionParser(option_list = option_list)
opt = parse_args(opt_parser)
# check parameter
if (is.null(opt$date)) {
  print_help(date)
  stop("The next execution date must be provided!", call.=FALSE)
}

print(paste("The next execution date is", opt$date))