from pyspark import SparkContext
from pyspark import SparkConf
import numpy as np
conf = SparkConf().set('spark.eventLog.enabled','true').set('spark.driver.maxResultSize', '8g')
sc = SparkContext(appName='cx_exp',conf=conf)
def clean(x):
x = str(x)
chunks = x.split(",")
# take the transpose (we want tall matrix)
return int(chunks[1]),int(chunks[0]),float(chunks[2])
def prepare_matrix(rdd):
gprdd = rdd.map(lambda x:(x[0],(x[1],x[2]))).groupByKey().map(lambda x :(x[0],list(x[1])))
flattened_rdd = gprdd.map(lambda x: (x[0],_indexed(x[1])))
return flattened_rdd
def _indexed(grouped_list):
indexed, values = [],[]
for tup in grouped_list:
indexed.append(tup[0])
values.append(tup[1])
return np.array(indexed), np.array(values)
data = sc.textFile('/scratch1/scratchdirs/msingh/sc_paper/experiments/striped_data/final_matrix').map(lambda x:clean(x))
grouped = data.map(lambda x:(x[0], (x))).groupByKey().map(lambda x:(x[0], list(x[1])))
srdd = prepare_matrix(data)
summation = srdd.map(lambda row: np.sum( row[ 1 ][ 1 ] )).collect()
np.savetxt('/global/homes/m/msingh/final_mappings/summation_tall', np.array(summation))