Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • 16:49
    dstandish commented #26627
  • 16:48
    dstandish commented #26627
  • 16:42
    nikhi-suthar synchronize #26269
  • 16:38
    boring-cyborg[bot] labeled #26681
  • 16:38
    boring-cyborg[bot] labeled #26681
  • 16:38
    pierrejeambrun review_requested #26681
  • 16:38
    pierrejeambrun review_requested #26681
  • 16:38
    pierrejeambrun review_requested #26681
  • 16:38
    pierrejeambrun opened #26681
  • 16:36
    dstandish commented #26627
  • 16:34
    pauldalewilliams synchronize #26576
  • 16:27
    jedcunningham labeled #26670
  • 16:26
    jedcunningham milestoned #26670
  • 16:26
    jedcunningham milestoned #26670
  • 16:24
    dstandish commented #26627
  • 16:20
    pierrejeambrun commented #26678
  • 16:19
    argibbs synchronize #25489
  • 16:15
    argibbs commented #25489
  • 16:14
    pierrejeambrun commented #26678
  • 16:12
    argibbs commented #25489
simisoz
@simisoz
Ho to fix the error fom production docker imagesModuleNotFoundError: No module named 'airflow.providers'
@alltej
@llntjn_twitter
I have Admin role in Airflow. Recently, I just noticed that I cannot pause/unpause a DAG in the Airflow UI. I am using v 1.10.10.
1 reply
I can pause or unpause it using the airflow cli.
naveeng68
@naveeng68
hi, how we pass parameter from Jenkins.
matrixbot
@matrixbot

adan_geno Hi everybody,

check out the new platform:
https://nearventur.com/auth/signup

chenzuoli
@chenzuoli
hi, all, my airflow version is 1.10.8, and there are 3000 dags, the duration between task and task is about 3mins, it's a bit too long for us, so we want to optimize the platform, is there any idea?
airflow.cfg:
parallelism=32
dag_concurrency=16
max_active_runs_per_dag=32
run_duration = -1
num_runs = -1
processor_poll_interval = 1
min_file_process_interval = 0
dag_dir_list_interval = 300
max_thread=15
store_serialized_dags = True
min_serialized_dag_update_interval = 600
chenzuoli
@chenzuoli
i found a bug in the new version above 1.10.8: if you run the no_status task whose upstream tasks are already success in the UI, then you will get an dependencies not met error, is there any one get this situation?
i update airflow source code:
vim airflow/serialization/serialized_objects.py +582
replace the code :dag.task_dict[task_id]._upstream_task_ids.add(task_id) # pylint: disable=protected-access
ManiBharataraju
@ManiBharataraju
Hi All, I had a question regarding the ExternalTaskSensor. The operator requires the parent DAG's run date to exactly match the child DAG's run date(along with timestamp) , otherwise it keeps poking the parent task. Is that correct? I was thinking it will consider the most recent run for the same date if timedelta/execution_dt_fn is not provided.
Mohit Bhandari
@mohitbhandari37
Hi Everyone , good evening i am new to apache-airflow , i have setup it on my local env but when i run it from UI it got stucked at Adding at queue
[2020-08-14 19:30:09,730] {base_executor.py:58} INFO - Adding to queue: ['airflow', 'run', 'hello_world', 'hello_task', '2020-08-14T14:00:07.217726+00:00', '--local', '--pool', 'default_pool', '-sd', '/Users/user/airflow/dags/hello_world.py']
same i can run with command line
any help?
any help
lopesdiego122
@lopesdiego122
Hi All, anybody doing deploy of dags folders from AWS S3 with celery executor? Which strategy are you doing, rsync, cp or mount s3 folder on airflow server?
Kenney He
@kenney2_gitlab
any one can help me with bitnami airflow oauth setup? I used webserver_config.py with airflow webserver_config instruction and environment but still fail
Ankur Nayyar
@anayyar82
Hello.. in Airflow how to capture ldap user name
neil90
@neil90
Can somebody help me understand what the point is of the official Airflow Docker Image?
From my understanding I still need to do my own level of orchestration
of which daemons i want to start etc
Ashish Mishra
@ashismi
I just want to make sure of one thing. Suppose a dag.py file which contain lets say two task A and B. is dag.py parsed/run for each task. I see in log in each task "Filling up the DagBag from /usr/local/airflow/dags/dag.py". is it correct or we have configured it wrong
if dag.py file has sleep(3000) then will it sleep for both task run. or parsing happens only once.
Thiago Salgado
@ScrimForever
Hello all, have a new docker image on hub. Anyone know how can i deploy this image correctly ?
Elhay Efrat
@shdowofdeath
someone saw this isssue when running long tasks [2020-09-14 16:45:26,189] {{taskinstance.py:1150}} ERROR - (psycopg2.OperationalError) could not translate host name "airflow-dev-postgresql" to address: Temporary failure in name resolution
Nikolaas Steenbergen
@nikste

Hi all, I'm trying to run a complete dag from within a unit test.
For a simple sample graph it works, however for the actual test graph i get:

[2020-09-24 16:55:57,343] {backfill_job.py:461} ERROR - Task instance <TaskInstance: dag_name.print_the_context 2020-05-22 12:00:00+00:00 [failed]> with failed state

Is there a default log directory with more information on what exactly goes wrong ?
I cannot seem to find anything..

Nikolaas Steenbergen
@nikste
ok, it seems its very important to call .clear() on the dag before execution, then this wont fail
Daniel Papp
@jakab922
Hello
I deployed airflow via the helm chart from the stable repository.
It seems to be working fine
I just don't know how to use the airflow cli tool to upload a dag
Can someone help me with that?
Akshay Krishnan R
@akshaykrishnanr_twitter
Hi, what is bind_password in Apache Airflow? And why is the official documentation asking to set it to insecure?
https://airflow.apache.org/docs/stable/security.html#ldap,
Ganapathi Chidambaram
@ganapathichidambaram
Anybody pls guide me to get the filename using pattern through filesensor
i need to listen for the files on particular local folder and get the list of filenames and call it to another task to process that file.
Ganapathi Chidambaram
@ganapathichidambaram
please help me to write plugin on airflow 1.10.6 version
ebernhardson
@ebernhardson
What will airflow do if a task completes, but the SQLAlchemy connection is unavailable? Our DB is going down for maintenance for a couple hours tomorrow and i'm planning to stop the scheduler so nothing is running then, but wondering what would happen if we just let it go
Emma Grotto
@GrottoEmma_twitter
Hey Everyone!
I downgraded airflow from 1.10.12 to 1.10.10 and I am getting:
sqlalchemy.exc.ProgrammingError: (psycopg2.errors.DuplicateColumn) column "operator" of relation "task_instance" already exists
running resetdb is not an option for me. Does anyone know how to fix this?
1 reply
Neo-vijayk
@Neo-vijayk
Hello teram,
Hello Tram, Can anyone helpmeet on this issue. We are facing an issue where the tasks are not getting started they just queued due to celery task timeout. Our airflow scheduler encounters this AirflowTaskTimeout error. "name":"airflow.executors.celery_executor.CeleryExecutor", "level":"ERROR", "message":"Error sending Celery task:Timeout, PID: 3844\nCelery Task ID: ('tpf_daily_price', 'start', datetime.datetime(2020, 11, 3, 2, 30, tzinfo=<Timezone [UTC]>), 1)\nTraceback (most recent call last):\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/kombu\/utils\/functional.py\", line 42, in call\n return self.value\nAttributeError: 'ChannelPromise' object has no attribute 'value'\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/kombu\/transport\/virtual\/base.py\", line 921, in create_channel\n return self._avail_channels.pop()\nIndexError: pop from empty list\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/airflow\/executors\/celery_executor.py\", line 118, in send_task_to_executor\n result = task.apply_async(args=[command], queue=queue)\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/celery\/app\/task.py\", line 565, in apply_async\n options\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/celery\/app\/base.py\", line 718, in send_task\n amqp.send_task_message(P, name, message, options)\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/celery\/app\/amqp.py\", line 547, in send_task_message\n properties\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/kombu\/messaging.py\", line 181, in publish\n exchange_name, declare,\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/kombu\/connection.py\", line 518, in _ensured\n return fun(*args, kwargs)\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/kombu\/messaging.py\", line 187, in _publish\n channel = self.channel\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/kombu\/messaging.py\", line 209, in _get_channel\n channel = self._channel = channel()\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/kombu\/utils\/functional.py\", line 44, in call\n value = self.value = self.contract()\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/kombu\/messaging.py\", line 224, in <lambda>\n channel = ChannelPromise(lambda: connection.default_channel)\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/kombu\/connection.py\", line 866, in default_channel\n self.ensure_connection(conn_opts)\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/kombu\/connection.py\", line 430, in ensure_connection\n callback, timeout=timeout)\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/kombu\/utils\/functional.py\", line 343, in retry_over_time\n return fun(*args, kwargs)\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/kombu\/connection.py\", line 283, in connect\n return self.connection\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/kombu\/connection.py\", line 837, in connection\n self._connection = self._establish_connection()\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/kombu\/connection.py\", line 792, in _establish_connection\n conn = self.transport.establish_connection()\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/kombu\/transport\/virtual\/base.py\", line 941, in establish_connection\n self._avail_channels.append(self.create_channel(self))\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/kombu\/transport\/virtual\/base.py\", line 923, in create_channel\n channel = self.Channel(connection)\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/kombu\/transport\/redis.py\", line 521, in init\n self.client.ping()\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/redis\/client.py\", line 1351, in ping\n return self.execute_command('PING')\n File \"\/usr\/lo
Appreciate any advise or workaround on this issue please
Neo-vijayk
@Neo-vijayk
Our is 1.10.3 with Redis and mysql and nodes on ec2 instances.
Neo-vijayk
@Neo-vijayk
hello all, Can anyone help me on one issue. We are facing an issue where the tasks are not getting started they just queued due to celery task timeout. the celery commands getting failed/timedout. error shown airflow-"name":"airflow.executors.celery_executor.CeleryExecutor", "level":"ERROR", "message":"Error sending Celery task:Timeout, PID: 3844\nCelery Task ID: ('tpf_daily_price', 'start', datetime.datetime(2020, 11, 3, 2, 30, tzinfo=<Timezone [UTC]>), 1)\nTraceback (most recent call last):\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/kombu\/utils\/functional.py\", line 42, in call\n return self.value\nAttributeError: 'ChannelPromise' object has no attribute 'value'\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/kombu\/transport\/virtual\/base.py\", line 921, in create_channel\n return self._avail_channels.pop()\nIndexError: pop from empty list\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/airflow\/executors\/celery_executor.py\", line 118, in send_task_to_executor\n result = task.apply_async(args=[command], queue=queue)\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/celery\/app\/task.py\", line 565, in apply_async\n options\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/celery\/app\/base.py\", line 718, in send_task\n amqp.send_task_message(P, name, message, options)\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/celery\/app\/amqp.py\", line 547, in send_task_message\n properties\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/kombu\/messaging.py\", line 181, in publish\n exchange_name, declare,\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/kombu\/connection.py\", line 518, in _ensured\n return fun(*args, kwargs)\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/kombu\/messaging.py\", line 187, in _publish\n channel = self.channel\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/kombu\/messaging.py\", line 209, in _get_channel\n channel = self._channel = channel()\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/kombu\/utils\/functional.py\", line 44, in call\n value = self.value = self.contract()\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/kombu\/messaging.py\", line 224, in <lambda>\n channel = ChannelPromise(lambda: connection.default_channel)\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/kombu\/connection.py\", line 866, in default_channel\n self.ensure_connection(conn_opts)\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/kombu\/connection.py\", line 430, in ensure_connection\n callback, timeout=timeout)\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/kombu\/utils\/functional.py\", line 343, in retry_over_time\n return fun(*args, kwargs)\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/kombu\/connection.py\", line 283, in connect\n return self.connection\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/kombu\/connection.py\", line 837, in connection\n self._connection = self._establish_connection()\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/kombu\/connection.py\", line 792, in _establish_connection\n conn = self.transport.establish_connection()\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/kombu\/transport\/virtual\/base.py\", line 941, in establish_connection\n self._avail_channels.append(self.create_channel(self))\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/kombu\/transport\/virtual\/base.py\", line 923, in create_channel\n channel = self.Channel(connection)\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/kombu\/transport\/redis.py\", line 521, in init\n self.client.ping()\n File \"\/usr\/local\/lib\/python3.7\/site-packages\/redis\/client.py\", line 1351, in ping\n return self.execute_command('PING')\n File \"\/usr\/lo
aakashroy-ds
@aakashroy-ds

Hi Experts,

I have Apache Airflow running on an EC2 instance (Ubuntu). Everything is running fine. The DB is SQLite and the executor is Sequential Executor (provided as default). But now I would like to run some DAGs which needs to be run at the same time every hour and every 2 minutes. My question is how can I upgrade my current setup to Celery executor and postgres DB to have the advantage of parallel execution?

Will it work, if I install and setup the postgres, rabbitmq and celery. And make the necessary changes in the airflow.cfg configuration file?

Or do I need to re-install everything from scratch (including airflow)?

Please guide me on this.

Kirill
@KIRILLxBREAK
Hi all.
I try to deploy airflow using docker (build image with Dockerfile from official repo https://github.com/apache/airflow). Can I change user in container to root? How can I do this?
Ashish
@ashish05_gitlab

Hi all -
I'm trying to deploy airflow on kubernetes, and I'm running into this error:

Traceback (most recent call last):
  File "/app/py3ven/bin/airflow", line 21, in <module>
    from airflow import configuration
  File "/app/py3ven/lib/python3.6/site-packages/airflow/__init__.py", line 31, in <module>
    from airflow.utils.log.logging_mixin import LoggingMixin
  File "/app/py3ven/lib/python3.6/site-packages/airflow/utils/__init__.py", line 24, in <module>
    from .decorators import apply_defaults as _apply_defaults
  File "/app/py3ven/lib/python3.6/site-packages/airflow/utils/decorators.py", line 34, in <module>
    from airflow import settings
  File "/app/py3ven/lib/python3.6/site-packages/airflow/settings.py", line 83, in <module>
    prefix=conf.get('scheduler', 'statsd_prefix'))
  File "/app/py3ven/lib/python3.6/site-packages/statsd/client/udp.py", line 35, in __init__
    host, port, fam, socket.SOCK_DGRAM)[0]
  File "/usr/local/lib/python3.6/socket.py", line 745, in getaddrinfo
    for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
socket.gaierror: [Errno -5] No address associated with hostname

My config is as follows:
base_url = http://localhost:8080
web_server_host = 0.0.0.0
web_server_port = 8080

Does anybody know what could be wrong?

Ramonzin
@RamonBarros19_twitter

To run the jar in this way, you need to:

Either change Spark Master Address in template projects or simply delete it. Currently, they are hard coded to local[4] which means run locally with 4 cores.

Change the dependency packaging scope of Apache Spark from "compile" to "provided". This is a common packaging strategy in Maven and SBT which means do not package Spark into your fat jar. Otherwise, this may lead to a huge jar and version conflicts!

Make sure the dependency versions in build.sbt and POM.xml are consistent with your Spark version.

My difficulty is in this part of submitting, I don't know where I change these steps you sent, if you can explain the locations, I would be grateful
ramonbarrosk
@ramonbarrosk

How do I create a polygonRDD from a shapefile entry?
input_location = ShapefileReader.readToGeometryRDD(sc, "Estados",)
in python

The data in the shapefile is of the multi-polygon type, does the polygonRDD function in python support this data type as well?

Anurag-Shetty
@Anurag-Shetty
i am getting this error while running example dags "from airflow.operators.bash import BashOperator
ModuleNotFoundError: No module named 'airflow.operators.bash"