Hey,
I was recently trying to store the data in the flow MetaflowData
object which gets stored at the end of a flow. I was using setattr(self,some_id,dict)
to store a large number of objects as a part of the MetaflowData
object at the end of the flow. When loading the values from the datum via getattr
the first 1000 elements finish within a 1 second. But after that, it took 4 minutes to load the next 3000. I am just simply iterating through the values via getattr
. I am assuming that it's getting the data via the metadata and picklising objects. But what could be the reason for such latency gaps ?
Hey guys, I am a newbie using metaflow, just finished first three tutorials. I have following confusions when I am using metaflow. (1) In metaflow, document says using Includefile to read a .csv local file. I noticed, "IncludeFile" will generate a compress file in the .metaflow folder, but the speed of using pd.read_csv to read this compress file is slower than using pd.read_csv to directly read original csv file especially when the .csv file's size is large. Then what's the reason that metaflow using this "IncludeFile" function to pre-read the file? (2) If I want to read a like 7Gb csv file, directly using pd.read_csv is fine. However, if I use pd.read_csv under the metaflow, after couple of minutes, it will give me like "memory outflow" error. What's the reason of this error, how to avoid this and if there is any way to read this kind of files faster? Thanks!
Plus I used the resouces decorator (memory=16000, cpu=8)to ask more resouces, but it still didn't work
Hey everyone, what's Metaflow's recommended approach for installing a dependency in a remote Conda environment when the package does not exist in Conda or the specific version does not exist in Conda, but does exist in Pip?
From the documentation I've found that I have two options:
os.system('pip install my_package')
in my Step's code which looks like it should work but does not look like a great solution.Are there any options I'm not considering? Perhaps a cleaner approach?
Hi, I'm not able to view the logs from the metadata_service when running it in a docker container. A minimal example can be achieved by creating a virtual env, entering it and running pip install, I also did
docker pull netflixoss/metaflow_metadata_service
ds up -d
ds logs
gives memetadata_service | 2020/09/21 15:33:31 OK 1_create_tables.sql
metadata_service | 2020/09/21 15:33:31 OK 20200603104139_add_str_id_cols.sql
metadata_service | 2020/09/21 15:33:31 goose: no migrations to run. current version: 20200603104139
my_postgres | The files belonging to this database system will be owned by user "postgres".
my_postgres | This user must also own the server process.
...
But not serving on ('0.0.0.0', 8080)
which I would expect.
On MacOs catalina 10.15.6
Hey guys, is there a recommended way to pass secrets to a remote Batch job? I found this being asked in the Gitter back in 2019-12-07 here:
https://gitter.im/metaflow_org/community?at=5dee3c74d64a052fb6e2afaf
Basically the same question but with a very specific focus on the values of the environment being treated as sensitive. Therefore the environment_decorator
or any other solution that stores the secret in code does not work.
I read that Savin said the AWS folks are able to seamlessly switch between running locally and remote so there must be a solution to this. Just wanted to ask you guys before trying to come up with any custom way.
Thanks!
Hi all, I'm working on converting a legacy pipeline to Metaflow and was wondering whether there is any way to do something like the following.
@step
def map_step(self):
self.vars = ['a', 'b']
self.next(self.do_compute, foreach='vars')
@step
def do_compute(self):
self.var = self.input
self.artifact1 = do_something(self.var)
self.artifact2 = do_something_else(self.var)
self.artifact3 = do_something_else_yet(self.var)
self.next(self.join_step)
@step
def join_step(self, inputs):
self.artifact_dict = dict()
for inp in inputs:
self.artifact_dict[inp.var] = inp
I was hoping that this would give me programmatic, lazily-loading access to the artifacts computed in do_compute
for each value of var
(a la self.artifact_dict['a'].artifact1
), but of course I am getting this error message:
Flows can't be serialized. Maybe you tried to assign self or one of the inputs to an attribute? Instead of serializing the whole flow, you should choose specific attributes, e.g. input.some_var, to be stored.
Is there a recommended way to achieve this programmatic, lazy access? I see a workaround programmatically defining names and calling setattr
and getattr
, and searching through this gitter's history, this approach seems to have been recommended before. Is that still the recommended approach? Thanks!
Hey Metaflow,
I'm having an issue with running one of my flows on AWS Batch. The issue is as follows
mkdir: cannot create directory ‘metaflow’: Permission denied
/bin/sh: 1: [: -le: unexpected operator
tar: job.tar: Cannot open: No such file or directory
tar: Error is not recoverable: exiting now
I run it on a pre-built image hosted on ECR. The Dockerfile
contains a WORKDIR
command which points at /home/my_proj/code
. I can successfully build my image locally, bash
into it and mkdir metaflow
(under the default /home/my_proj/code/
directory) without an issue.
What I suspect might be happening is that the WORKDIR
statement is somehow ignored and the Metaflow command ["/bin/sh","-c","set -e ...
is run from within /
.
It's worth noting that I have several flows running on AWS Batch already with no problem at all. Their Dockerfile
s are almost identical to the one that is having the problem.
Not really sure if it's a Metaflow issue but hoping for somebody to have seen this already.
Thank you.
Hey Metaflow,
I am new to Metaflow, and trying to update a parameter in a step
, but get "AttributeError: can't set attribute".
Here is the snippet:
class TestFlow(FlowSpec):
param = Parameter(
'param',
type=str,
help='test parameter',
default='OD'
)
@step
def start(self):
self.param = self.param.lower()
...
....
This seems a common use cases. Probably there's some mistake in my usage. What am I missing?
Hi there,
I have a Flow that is supposed to run completely on batch. I'd like to run one of the steps in a different Docker container than the default one. Is it possible to run something like this:
# pipeline.py
from metaflow import FlowSpec, step, batch
class TestFlow(FlowSpec):
@batch(image='python:3.6.12-buster')
@step
def start(self):
import sys
print(sys.version)
self.next(self.end)
@step
def end(self):
import sys
print(sys.version)
if __name__ == "__main__":
TestFlow()
with the following command: python pipeline.py run --with batch
Here the end step should run with the default image and the start step with python:3.6.12-buster
.
Hey Metaflow, I stumbled upon a situation that I was hoping you guys could comment on. I'm trying out Metaflow in a code base that already exists, which provides numerous utility files and functionality.
I was read ingthrough this page on managing external libraries (https://docs.metaflow.org/metaflow/dependencies) and was wondering if there is any other way to let a Metaflow flow import these utility files without having to copy them from all over the repository into the same folder the flow is defined in.
I'm aware of symbolic links and file system approaches but was wondering if there was any other Metaflow approach for a scenario like this
Hi, I recently upgraded metaflow
from 2.0.1 to 2.2.3, and when I execute a parameterized flow I got the following error which I haven't seen before (truncated to the last few lines):
...
File "/home/ji.xu/.conda/envs/logan_env/lib/python3.7/site-packages/metaflow/includefile.py", line 229, in convert
param_ctx = context_proto._replace(parameter_name=self.parameter_name)
AttributeError: 'NoneType' object has no attribute '_replace'
Any suggestions?
production:FlowName-<deployment #>-<hash>
. This can be worked around by specifying the whole production token for the namespace, but curious what your all's thoughts are for this usage.
[Errno 62] Too many levels of symbolic links
when running a job with AWS batch on my mac. It worked fine before but just started today. Any ideas why? Only for environment=conda. Not obvious at the moment-RetryStrategy
at the batch job level, which doesn't support customization, whereas the step functions API allows for step-level retry logic that also supports delay intervals, exponential backoffs, etc. It may not be feasible to switch this over, but wanted to run it by you all and see if it's something you've considered.
Hey guys, I'm aware of the resources
decorator for memory, CPU and GPU requests when running on AWS Batch but was wondering how Metaflow recommends handling the need of more disk space?
I've read that one can modify the AMI deployed on batch to get a larger than 8GB default volume size.
Is there a more friendly way to achieve this? I find myself working with datasets that are bigger than 8GB for some experiments but others use much less than 8GB.
Thanks!
Is there any documentation, procedure or scripts for transferring one Metdata service to another?
Imagine a user stood up all the infra in region A on AWS and wanted to move to region B without data loss.
I can write the S3 and postgres transfer scripts myself but was hoping to not re-invent the wheel.
Thanks!
Hi guys, I created a preprocessing Flow with some (default) parameters. It works well in my local machine and I able to run steps on the AWS. Now I want to integrate AWS Step functions to schedule my preprocessing and I created step function in the AWS with step-functions create
command but when I execute manually from AWS console, I will get '$.Parameters'
Error in AWS. Here is the whole message:
"An error occurred while executing the state 'start' (entered at the event id #2). The JSONPath '$.Parameters' specified for the field 'Value.$' could not be found in the input '{\n \"Comment\": \"Insert your JSON here\"\n}'"
When I checked the state machine generated in AWS, I see that there is item in the Environment section as follow:
{
"Name": "METAFLOW_PARAMETERS",
"Value.$": "$.Parameters"
}
I checked following AWS resource : https://docs.aws.amazon.com/step-functions/latest/dg/input-output-inputpath-params.html
But I couldn't solve my problem. I believe I don't need to provide any parameters because I provided a default value for all parameters.
Do you have any idea? I appreciate your help.
Hey Metaflow, has anyone been able to create a single Batch Job Queue and Compute Environment that handles both CPU and GPU jobs, say with a p3.2xlarge
?
I ask as I've seen others suggest online using two separate Job Queues, one for CPU and one for GPU jobs but Metaflow only supports a single Job Queue.
While my Compute Environment has successfully spun up p3.2xlarge
instances, I have been unable to get a single GPU Step to leave the RUNNABLE
state . I've been exploring if this is related to the AWS Launch Template I created to increase the disk size of my instances.
If anyone has any advice, documentation or examples of running GPU jobs along side CPU jobs in the same Batch Job Queue and Compute Environment with Metaflow, I'd very much appreciate it
Hey guys, nearly forgot to follow up and share some advice when creating Batch compute environments that was especially relevant towards my previous issues when having SFN-executed, highly parallel and short-running jobs being co-located on large instances:
Not using the default Batch ECS-optimized AMIs that are still using the soon-to-be deprecated Amazon Linux 1 AMIs instead of the latest ECS-optimized Amazon Linux 2 AMIs.
The Linux 1 AMI uses the Docker devicemapper
storage driver, and preallocates 10GB of per-container storage. The Linux 2 AMIs use the Docker overlay2
storage driver, which exposes all unused space on the disk to running containers.
Manually setting my Batch compute environments to use the latest ECS-optimized Linux 2 AMIs seems to be the cleanest approach, rather than playing with custom ECS Agent docker cleanup parameters. I also reached out to AWS support to see if there’s a reason why Batch hasn’t updated their default AMI, even though the Linux 1 AMI is end-of-life in 2 months. No information was given, but mentioned that they have an internal feature request for it without any guarantees or ETA on when it’d be changed.
Sharing in case this is useful for anyone else!
python parameter_flow.py --with retry step-functions create
but I get no such command step function, can some one maybe refer me to a good documntation?
Hey People. I would like to know a way to restrict the amount of parallelization that should be done in my local instance at any point in time. Parallelization meaning amount of cpu-cores used by the program. Say I have a task that has to executed parallelly as 50 threads, each requires 2 core to process and if my machine is a 32 core machine, Metaflow runs ~15-16 threads at a time utilizing all the processing-cores in the machine. I would like to restrict this parallelization to, say 12 threads at any given point of time.
In python's multiprocessing library, there is an option of setting the number of pool workers as a required number. Is there a way to achieve the same with Metaflow?
Hello, I see that Metaflow snapshots the code used in a run
From the docs: "Code package is an immutable snapshot of the relevant code in the working directory, stored in the datastore, at the time when the run was started. A convenient side-effect of the snapshot is that it also works as a code distribution mechanism for runs that happen in the cloud."
How would I access the code from previous runs?
Thanks!
Hey all, I assessed MetaFlow as an alternative to our Kedro + Airflow infra. Thought I'd share my assessment. One blocker for adopting MetaFlow is the inability to separate parameters from pipeline definitions.
For context, we currently use Kedro to generate many "flavors" of the same pipeline for different scenarios. For instance, we use the same template inference pipeline for model validation, active learning, detecting label noise, etc. We do this by defining our parameters separately from our DAGs. It would be nice if MetaFlow had integrations with (say) Facebook's Hydra so that we could easily compose config files and separate parameter definitions from DAG definitions.
Hey all, I have a question about logging. In our project, we are using python standard logging. (https://docs.python.org/3/howto/logging.html) When we send a warning, debug vs. logs with it, Metaflow overrides these logs and sends it's to info.
Here is a code example;
import logging.config
from metaflow import FlowSpec, step
LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'standard': {
'format': '[%(levelname)s] %(name)s: %(message)s'
},
},
'handlers': {
'default': {
'level': 'INFO',
'formatter': 'standard',
'class': 'logging.StreamHandler',
'stream': 'ext://sys.stdout',
},
},
'loggers': {
'': { # root logger
'handlers': ['default'],
'level': 'INFO',
'propagate': False
},
}
}
class DebugFlow(FlowSpec):
@step
def start(self):
self.next(self.a, self.b)
@step
def a(self):
logger.debug("Hello Debug log")
self.x = 1
self.next(self.join)
@step
def b(self):
self.x = int('2')
logger.warning("Hello warning log")
self.next(self.join)
@step
def join(self, inputs):
logger.info('a is %s', inputs.a.x)
logger.info('b is %s', inputs.b.x)
logger.info('total is %d', sum(input.x for input in inputs))
logger.error("Hello error log")
self.next(self.end)
@step
def end(self):
pass
if __name__ == '__main__':
logger = logging.getLogger('DebugFlow')
DebugFlow()
When I took a look at how Metaflow handles logging, I realized that Metaflow uses different logging systems. I also tested logging configuration with --event-logger
. it looks like it doesn't work.
import logging.config
from metaflow.plugins import LOGGING_SIDECAR, SIDECAR
from metaflow import FlowSpec, step
LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'standard': {
'format': '[%(levelname)s] %(name)s: %(message)s'
},
},
'handlers': {
'default': {
'level': 'INFO',
'formatter': 'standard',
'class': 'logging.StreamHandler',
'stream': 'ext://sys.stdout',
},
},
'loggers': {
'': { # root logger
'handlers': ['default'],
'level': 'INFO',
'propagate': False
},
}
}
class DebugFlow(FlowSpec):
@step
def start(self):
self.next(self.a, self.b)
@step
def a(self):
logger.debug("Hello Debug log")
self.x = 1
self.next(self.join)
@step
def b(self):
self.x = int('2')
logger.warning("Hello warning log")
self.next(self.join)
@step
def join(self, inputs):
logger.info('a is %s', inputs.a.x)
logger.info('b is %s', inputs.b.x)
logger.info('total is %d', sum(input.x for input in inputs))
logger.error("Hello error log")
self.next(self.end)
@step
def end(self):
pass
class CustomEventLogger(object):
TYPE = 'customEventLogger'
def __init__(self):
self.logger = logging.getLogger('DebugFlow')
def log(self, msg):
self.logger.info('event_logger: %s', str(msg))
def process_message(self, msg):
# type: (Message) -> None
self.log(msg.payload)
def shutdown(self):
pass
def setup_logger():
logger_config = {
'customEventLogger': CustomEventLogger
}
LOGGING_SIDECAR.update(logger_config)
SIDECAR.update(logger_config)
logging.config.dictConfig(LOGGING_CONFIG)
if __name__ == '__main__':
setup_logger()
logger = logging.getLogger('DebugFlow')
DebugFlow()
python debug_flow.py --event-logger=customEventLogger run
How can I configure the Metaflow logger? if it is not possible, how can I send debug, warning logs with Metaflow logger? Thanks.
Hello everyone ! I am exploring options for my next project implementation. Based on initial documentation metaflow seems to hit all the points my team is looking for in a framework. The only question I have is:
Our team uses Azure and not AWS. Are there going to be issues in deploying and scaling metaflow based solutions on Azure ?
hi all,
I'd like to know what the best way of passing a variable defined in a step that gets split and then use it after joining.
I could do something like use self.merge_artifacts(inputs,include=[<vars>])
? Im sure inputs[0].<var>
also works. These are fine, but Im not sure how efficient it is, or how it will cope with many more splits
Fuller simple example to see what I mean:
from metaflow import FlowSpec, step
class Foo(FlowSpec):
@step
def start(self):
self.msg = 'hi %s'
self.steps = list(range(0,10))
self.next(self.bar, foreach='steps')
@step
def bar(self):
print (self.input)
print (self.msg%(' from bar'))
self.next(self.join)
@step
def join(self,inputs):
#to be able to use self.mg in the next step, use merge_artifacts
self.merge_artifacts(inputs,include=['msg'])
self.next(self.end)
@step
def end(self):
print (self.msg%(' from end'))
print ('end')
if __name__ == "__main__":
Foo()
I want to make sure I'm doing this in the best way
cheers. Loving metaflow btw , top work on all the docs!
Hello Metaflow community! After setting up Airflow for a proof of concept and evaluating the other obvious/recent options, I am trying to decide between Prefect (self-hosted) and Metaflow for next steps.
There seems to be a gap when it comes to monitoring Metaflow jobs (no ui/dashboard). How do you handle this? Am I missing something or do you fall back on AWS monitoring features?