My co-worker and I have been setting up, configuring, and testing Dask for a week or so now, and everything is working great (can't speak highly enough about how easy, straightforward, and powerful it is), but now we are trying to leverage it for more than just testing and are running into an issue. We believe it's a fairly simple one related to syntax and an understanding gap. Any help to get it running is greatly appreciated. Any support in evolving our understanding of more optimal paths is also greatly appreciated.
We got fairly close with these two posts:
High level flow:
- Open data in pandas & clean it (we plan on moving this to a pipeline)
- From there, convert the cleaned data set for regression into a dask data frame
- Set the x & y variables and create all unique x combination sets
- Create all unique formulas (y ~ x1 + x2 +0)
- Run each individual formula set with the data through a linear lasso lars model to get the AIC for each formula for ranking
Current Issue:
- Run each individual formula set (~1700 formulas) with the data (1 single data set which doesn’t vary with each run) on the dask cluster and get the results back
- Optimize the calculation & return the final data
Code:
# In[]
# Imports:
import logging as log
import datetime as dat
from itertools import combinations
import numpy as np
import pandas as pd
from patsy import dmatrices
import sklearn as sk
from sklearn.linear_model import LogisticRegression, SGDClassifier, LinearRegression
import dask as dask
import dask.dataframe as dk
from dask.distributed import Client
# In[]
# logging, set the dask client, open & clean the data, pass into a dask dataframe
log.basicConfig(level=log.INFO,
format='%(asctime)s %(message)s',
datefmt="%m-%d %H:%M:%S"
)
c = Client('ip:port')
ST = dat.datetime.now()
data_pd = pd.read_csv('some.txt', sep="\t")
#fill some na/clean up the data a bit
data_pd['V9'] = data_pd.V9.fillna("Declined")
data_pd['y'] = data_pd.y.fillna(0)
data_pd['x1'] = data_pd.x1.fillna(0)
#output the clean data and re-import into dask, we could alse use from_pandas to get to dask dataframes
data_pd.to_csv('clean_rr_cp.csv')
data = dk.read_csv(r'C:\path\*.csv', sep=",")
# set x & y variables - the below is truncated
y_var = "y"
x_var = ['x1',
'x2',
'x3',
'x4',......
#list of all variables
all_var = list(y_var) + x_var
#all unique combinations
x_var_combos = [combos for combos in combinations(x_var,2)]
#add single variables for testing as well
for i in x_var:
x_var_combos.append((i,""))
# create formulas from our y, x variables
def formula(y_var, combo):
combo_len = len(combo)
if combo_len == 2:
formula = y_var +"~"+combo[0] +"+"+ combo[1]+"+0"
else:
formula = y_var +"~"+combo[0]+"+0"
return formula
@dask.delayed
def model_aic(dt, formula):
k = 2
y_df, x_df = dmatrices(formula, dt, return_type = 'dataframe')
y_df = np.ravel(y_df)
log.info('dmatrices successful')
LL_model = sk.linear_model.LassoLarsIC(max_iter = 100)
AIC_Value = min(LL_model.fit(x_df, y_df).criterion_) + ( (2*(k**2)+2*(k)) / (len(x_df)-k-1) )
log.info('AIC_Value: %s', AIC_Value)
oup = [formula ,AIC_Value, len(dt)-AIC_Value]
return oup
# ----------------- here's where we're stuck ---------------------
# ----------------- we think this is correct ----------------------
# ----------------- create a list of all formula to execute -------
# In[]
out = []
for i in x_var_combos:
var = model_aic(data, formula(y_var, i))
out.append(var)
# ----------------- but we're stuck figuring out how to -----------
# ------------------make it compute & return the result -----------
ans = c.compute(*out)
ans2 = c.compute(out[1])
print (ans2)