Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
    Paul Rhodes
    Sorry it's been a few days. It would be good to catch up with it.
    Sid Anand
    This message was deleted
    Vinay Kumar
    I am trying to schedule following task however its not firing ansible function, Could anyone suggest if pythonbranchoperator have been used appropriately here or not

    import os
    import ConfigParser
    from time import time
    import time as sleep_time
    import yaml

    Airflow Related Imports

    from builtins import range
    from airflow.operators import BashOperator, DummyOperator,BranchPythonOperator,PythonOperator
    from airflow.models import DAG
    from datetime import datetime, timedelta

    def fire_ansible():

    print "Ansible fired"

    Assume Condition 1,2,3 are functions which return true of false

    def should_run( ):
    print "firing ansible...."
    return 'fire_ansible_task'

            return 'fire_ansible_task'
            print "Waiting for files.. or next slot:"
            return 'sleep_for_task'
        print "Waiting for files2.. or next slot:"
        return 'sleep_for_task'

    def sleep_for():
    '''This is a function that will run within the DAG execution'''


    seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
    args = {
    'owner': 'Episource',

    # If last task is succesful then  only run this task
    'depends_on_past': True,
    'start_date': seven_days_ago,
    'retries': 1,


    dag = DAG(dag_id='Cron',
    default_args=args,schedule_interval='0 ')
    fire_ansible_task = PythonOperator(

    sleep_for_task = PythonOperator(
    for i in range(3):
    task = DummyOperator(taskid='runme'+str(i), dag=dag)
    cond = BranchPythonOperator(taskid='condition'+str(i),python_callable=should_run,dag=dag)

    if name == "main":

    Paul Rhodes
    @r39132 heya
    Sid Anand
    Hiya.. btw.. we can't use private rooms anymore :-0
    Saw your email. my schedule is a bit packed during my short london stay.. let me see if we can meet. I'll respond to your email shortly.
    the private room thing is an apache rule..
    @withnale ^
    Sam Elamin
    Hi @here I am trying to launch an emr cluster with airflow and I am just wondering if there is a package to do that, I can see an example here https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/example_dags/example_emr_job_flow_manual_steps.py but that has dependencies on other parts of the repo. Is there an easier way other than duplicating the example and all its dependencies?
    Sam Elamin

    Hi folks

    We have had a discussion within the team about Airflow best practices and there seems to be a confusion about the responsibilities of Airflow. Basically we want to run airflow tasks on a spark cluster and the discussion revoles around whether Airflow should launch the cluster or not. We narrowed it down to 2 options. Option 1) Spin up a machine with airflow that then launches a spark cluster(EMR in our case) then do task A,B,C then terminate the cluster. The other option is to have the cluster already up then something(Data Pipeline in AWS) launches a machine and when airflow comes up it get a url for the Spark cluster then submits jobs/tasks A,B,C. Again it comes down to whether Airflow should be responsible for the cluster creationg. It seems to me that Option 1 is simpler but my colleague thinks that Airflow shouldnt be responsible for cluster creation. He suggested thinking of a cluster as an end database, and in that scenario Airflow should not really be creating databases, it should just run the tasks. I am inclinded to agree with him but cant help but feel Option 1 still feels simpler. Which is why I am asking the opinion of the experts here :)

    @/all ^^
    Sam Elamin
    Hello Airflow community, a quick question. I built and installed airflow from master and the contrib folder has alot of useful scripts to run on aws. My question is based around https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/emr_create_job_flow_operator.py it seems all emrs are being created in us-west-1. Looking at the initdb script it seems its because when airflow is initally installed the emr default is set to us-west-1. How do I launch new emrs in eu-west-1? Do I have to manually add an entry to the database, or is there a away to pass it in via the JOB_FLOW_OVERRIDES?
    Al Johri
    hey @samelamin did you figure out how to use EMR with Airflow?
    charles adetiloye
    @all i am using the EMR create cluster how do i set the EMR emr_conn_id ?
    Saar Levy
    Some what related to @charlesa101 question. I'm trying to create an EMR cluster, but I have no idea what's aws_conn_id and emr_conn_id suppose to be, or where they can be found. Thanks
    Tim Chan
    @saarlevy_twitter @charlesa101 the aws_conn_id can bet set up in the airflow web ui under Admin -> Connections. Use aws access key id as the username and aws secret key as the password. Also put region preference in the extras field, like so: {"region_name": "us-east-1"}. You would name it whatever you wanted and use that name in your DAG as your aws_conn_id.
    The emr_conn_id is created in the same way except all that is being used is the extras field. In that field you would basically put json like this: https://github.com/apache/incubator-airflow/blob/master/airflow/utils/db.py#L208, but customized to your situation.
    Saar Levy
    @timchan-lumoslabs thank you so much for your reply, I will try that right away, that seems to be the last thing i need to configure :)
    Shubham goyal
    hey anyone can tell me how we can completely delete the dag
    is there a database exist?
    in airflow
    i am having an dag in which ui is not updating..it runs correct on first instance then at second instance the ui remains same of first instance but the tasks run in background. so is there a way we can clean it up
    anyone please
    Sergio Herrera
    Hi everyone, i suppose this question could be asked before but, how can i test a dag against EMR without perform calls to real EMR service? (sth like a mocked service)
    Hi @timchan-lumoslabs , for emr_default, is that aws secret key is required? or if the Airflow instance I setup has enough permission (IAM role to support boto3 to do assume role) that can launch a EMR cluster, then this Airflow default setting should also be able to create the cluster without providing the aws secret? Thanks
    Sam Elamin
    @AlJohri yes we ended up just doing spark submit commands via a bash operator
    Hi Everyone, I am trying to migrate some tables from ms sql to s3 ? What are the best options, sqoop or spark sql or anything else in EMR?
    Junyoung Park
    In my case, Spark SQL was faster. EMR is just managed cluster platform.
    Hey guys, has anyone ever seen this:
    Partial credentials found in explicit
    when using EmrCreateJobFlowOperator?
    This is happening no matter what I put in for login and password in the connection for mits_emr_6
    and I'm using it like this:
    guys I figured this out actually
    had to delete and re-add the aws_default connection
    Shridhar Manvi
    Hey, I am unable to use EmrStepSensor to check the status of a step. The error I get is
    [2018-04-05 19:48:16,896] {base_task_runner.py:95} INFO - Subtask:     raise NoRegionError()
    [2018-04-05 19:48:16,896] {base_task_runner.py:95} INFO - Subtask: botocore.exceptions.NoRegionError: You must specify a region.
    I have mentioned "region_name" in aws connection on the UI
    Has anyone come across this issue?
    I have edited aws_default connection to this
    { "region_name":"us-west-1", "aws_access_key_id":"XXXX", "aws_secret_access_key": "YYYY"}
    Saran Pal
    Hello All
    task_emr_spinup = EmrOperator(
    here cluster_action='spinup' means starting EMR?
    hey guys
    I am running an EMR cluster to run spark jobs, but after sometimes. in the YARN resource manager UI, all the nodes drp down, stating , ActiveNodes - 0 or Active Nodes=1
    and my job got stucked..
    Hello there, I am facing issue with apache airflow. I did my research on scheduler logs, as the output says " DagFileProcessor1 INFO - Failing jobs without heartbeat after 'some date' ". one of my coleague said that to clear all tasks and restart as fresh dag. I did that too, but it is not running the job. Do i need to change the procedure to test the tasks through dags?
    here is my procedure:
    having start and stop scripts for EMR under start_stop_emr directory
    then the test.py is the dag having tasks to run
    running as a bash command 'python /path/to/dir/start.py
    sleep 120
    running as a bash command 'python /path/to/dir/stop.py
    the above three task has to run by it self. if it is scheduled for a perticular time then how can i change the time. there is no error as per now :( can any one help me on this please i can share my screen if any one using hangouts
    Anybody please 🤗
    Hi all
    Can anyone help on troubleshooting airflow dags please?
    hey @panduraju50 , are you sure you have setup airflow properly
    @shridharmanvi - Please check this blog, I was able to set up EMR cluster basic using it. https://vulpala.com/2019/08/22/spin-aws-emr-cluster-using-apache-airflow/
    Vivek Mittal
    Hi Guys. I am trying to run a simple spark application that subscribe to a kafka topic and write stream it to another Kafka topic with exact same details. I am using AWS EMR but when I run the step, the step completes even after use of awaitTermination. Any idea what I might be doing wrong here? I want the step to not terminate and keep running as a stream. I am using Structured Streaming. Any help is greatly appreciated.
    The job doesn't give any errors or failed. It is completed.