These are chat archives for thunder-project/thunder

22nd
Dec 2016
chenminyeh
@chenminyeh
Dec 22 2016 18:04

Question using spark: I am doing ICA on 10 GB of data with a cluster of 3TB memory. My executor memory is 100gb and driver is 100gb. I got error as follow and can anyone suggest me how to configure spark for the work? error Traceback (most recent call last)

<ipython-input-16-bb7862118cb1> in <module>()
----> 1 algorithm = ICA(k=6, k_pca=6, svd_method='auto', max_iter=20, tol=0.1, seed=None).fit(data)

/home/cyeh/anaconda3/lib/python3.5/site-packages/factorization/base.py in fit(self, X, return_parallel)
27
28 if isinstance(data, BoltArraySpark):
---> 29 results = list(self._fit_spark(data))
30
31 # handle output types

/home/cyeh/anaconda3/lib/python3.5/site-packages/factorization/algorithms/ICA.py in _fit_spark(self, data)
52
53 # reduce dimensionality
---> 54 u, s, v = SVD(k=self.k_pca, method=self.svd_method).fit(data)
55 u = Series(u)
56

/home/cyeh/anaconda3/lib/python3.5/site-packages/factorization/base.py in fit(self, X, return_parallel)
27
28 if isinstance(data, BoltArraySpark):
---> 29 results = list(self._fit_spark(data))
30
31 # handle output types

/home/cyeh/anaconda3/lib/python3.5/site-packages/factorization/algorithms/SVD.py in _fit_spark(self, mat)
93
94 # compute (xx')^-1 through a map reduce
---> 95 xx = mat.times(c_inv).gramian().toarray()
96 xx_inv = inv(xx)
97

/home/cyeh/anaconda3/lib/python3.5/site-packages/thunder/series/series.py in gramian(self)
757 mat += outer(x, x)
758
--> 759 rdd.values().foreach(outer_sum)
760 return self._constructor(mat.value, index=self.index)
761

/home/cyeh/spark/python/pyspark/rdd.py in foreach(self, f)
748 f(x)
749 return iter([])
--> 750 self.mapPartitions(processPartition).count() # Force evaluation
751
752 def foreachPartition(self, f):

/home/cyeh/spark/python/pyspark/rdd.py in count(self)
1006 3
1007 """
-> 1008 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
1009
1010 def stats(self):

/home/cyeh/spark/python/pyspark/rdd.py in sum(self)
997 6.0
998 """
--> 999 return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
1000
1001 def count(self):

/home/cyeh/spark/python/pyspark/rdd.py in fold(self, zeroValue, op)
871 # zeroValue provided to each partition is unique from the one provided
872 # to the final reduce call
--> 873 vals = self.mapPartitions(func).collect()
874 return reduce(op, vals, zeroValue)
875

/home/cyeh/spark/python/pyspark/rdd.py in collect(self)
774 """
775 with SCCallSiteSync(self.context) as css:
--> 776 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
777 return list(_load_from_socket(port, self._jrdd_deserializer))
778

/home/cyeh/spark/python/pyspark/rdd.py in _jrdd(self)
2401
2402 wrapped_func = _wrap_function(self.ctx, self.func, self._prev_jrdd_deserializer,
-> 2403 self._jrdd_deserializer, profiler)
2404 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), wrapped_func,
2405 self.preservesPartitioning)

/home/cyeh/spark/python/pyspark/rdd.py in _wrap_function(sc, func, deserializer, serializer, profiler)
2334 assert serializer, "serializer should not be empty"
2335 command = (func, profiler, deserializer, serializer)
-> 2336 pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
2337 return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
2338 sc.pythonVer, broadcast_vars, sc._javaAccumulator)

/home/cyeh/spark