《大数据+AI在大健康领域中最佳实践前瞻》( 二 )

= bestthresh).astype(int)precision, recall, thresholds = precision_recall_curve(y_test, y_pred)average_precision = average_precision_score(y_test, y_pred)# Compute confusion matrixcnf_matrix = confusion_matrix(y_test, y_pred)np.set_printoptions(precision=2)# Plot non-normalized confusion matrixplt.figure()plot_confusion_matrix(cnf_matrix, classes=[0,1],title='Confusion matrix, without normalization')# Plot normalized confusion matrixplt.figure()plot_confusion_matrix(cnf_matrix, classes=[0,1], normalize=True,title='Normalized confusion matrix')plt.show()plt.step(recall, precision, color='b', alpha=0.2,where='post')plt.fill_between(recall, precision, step='post', alpha=0.2,color='b')plt.xlabel('Recall')plt.ylabel('Precision')plt.ylim([0.0, 1.05])plt.xlim([0.0, 1.0])plt.title('2-class Precision-Recall curve: AP={0:0.5f}'.format(average_precision))plt.show()auc = roc_auc_score(y_test, y_pred_proba[:,1])print('AUC: %.3f' % auc)# calculate roc curvefpr, tpr, thresholds = roc_curve(y_test, y_pred_proba[:,1])# plot no skillplt.plot([0, 1], [0, 1], linestyle='--')# plot the roc curve for the modelplt.plot(fpr, tpr, marker='.')# show the plotplt.show()unique, counts = np.unique(data['label'], return_counts=True)cdict = dict(zip(unique, counts))pos_weight = cdict[0]/cdict[1]full_model = XGBClassifier(scale_pos_weight= pos_weight)full_model.fit(data[feats], data['label'])return full_model, bestthreshdef setup_spark_session(param_dict):"""Description : Used to setup spark sessionInput : param_dict - parameter dictionaryOutput : Spark Session, Spark Context, and SQL Context"""pd.set_option('display.max_rows', 500)pd.set_option('display.max_columns', 500)os.environ["PYSPARK_PYTHON"] = "/home/hadoop/anaconda/envs/playground_py36/bin/python"try:spark.stop()print("Stopped a SparkSession")except Exception as e:print("No existing SparkSession")SPARK_DRIVER_MEMORY = param_dict["SPARK_DRIVER_MEMORY"]# "10G"SPARK_DRIVER_CORE = param_dict["SPARK_DRIVER_CORE"]# "5"SPARK_EXECUTOR_MEMORY = param_dict["SPARK_EXECUTOR_MEMORY"]# "3G"SPARK_EXECUTOR_CORE = param_dict["SPARK_EXECUTOR_CORE"]# "1"AWS_ACCESS_KEY = param_dict["AWS_ACCESS_KEY"]AWS_SECRET_KEY = param_dict["AWS_SECRET_KEY"]AWS_S3_ENDPOINT = param_dict["AWS_S3_ENDPOINT"]conf = SparkConf().\setAppName(param_dict["APP_NAME"]).\setMaster('yarn-client').\set('spark.executor.cores', SPARK_EXECUTOR_CORE).\set('spark.executor.memory', SPARK_EXECUTOR_MEMORY).\set('spark.driver.cores', SPARK_DRIVER_CORE).\set('spark.driver.memory', SPARK_DRIVER_MEMORY).\set('spark.driver.maxResultSize', '0')spark = SparkSession.builder.\config(conf=conf).\getOrCreate()sc = spark.sparkContexthadoop_conf = sc._jsc.hadoopConfiguration()hadoop_conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")hadoop_conf.set("fs.s3a.access.key", AWS_ACCESS_KEY)hadoop_conf.set("fs.s3a.secret.key", AWS_SECRET_KEY)hadoop_conf.set("fs.s3a.endpoint", AWS_S3_ENDPOINT)hadoop_conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")sqlContext = SQLContext(sc)return spark, sc, sqlContextdef loadDataset(vw_cl_lines_df, datefield, param_dict):"""Description : Runs data through appropriate transforms to convert itto a suitable formatInput : vw_cl_lines_df - input dataframedatefield - field used to establish a window for the addFeatsfunctionparam_dict - parameter dictionaryOutput : Properly formatted dataframe"""vw_cl_lines_df = (with_transform(vw_cl_lines_df, param_dict))vw_cl_lines_df = vw_cl_lines_df.withColumn(datefield + "_unix",(F.unix_timestamp(F.col(datefield),format='yyyy-MM-dd HH:mm:ss.000')))return vw_cl_lines_dfdef addOneHotsTest(df, oneHots):for item in oneHots:field = item.split('~')[0]df[item] = np.where(df[field] == item, 1, 0)return dfdef addCodes(res_df, codeField, topCodes, optUDF, knownPref):# Need to get same codes for testingfor code in topCodes:likeCode = "%" + code + "%"res_df = res_df.withColumn(code,F.when(res_df[codeField].like(likeCode),1).otherwise(0))def checkOtherCodes(x):if not x:return 0x = set(x)if x.issubset(topCodes):return 0else:return 1otherCodesUDF = F.udf(checkOtherCodes, IntegerType())if knownPref is not None:otherlabel = knownPref + "_" + codeFieldelse:otherlabel = codeFieldres_df = res_df.withColumn("OTHER_" + otherlabel,otherCodesUDF(res_df[codeField]))codesAdded = topCodesreturn res_df, codesAddeddef codeExtract(df, codeField, topCount, optUDF=None, knownPref=None):"""Description : Function to extract code featuresInput : df - input dataframecodeField - field used to establish a window for the addFeatsfunctiontopCount - number of code features to be addedoptUDF - optional udf to apply to the fieldknownPref - prefix characterizing a field, if anyOutput : dataframe with code features added"""codeEx_df = dfif optUDF is not None:codeEx_df = codeEx_df.withColumn(codeField, optUDF(codeEx_df[codeField]))codeEx_df = codeEx_df.withColumn(codeField, F.explode(F.split(codeEx_df[codeField], ",")))code_counts = codeEx_df.groupBy(codeField).count().sort(F.desc("count"))if knownPref is not None:code_counts = code_counts.filter(code_counts[codeField].like("%" + knownPref + "%"))# code_counts.show(10)xy = code_counts.toPandas()# Generating a list of the top 20 most frequently occuring Reject CodestopCodes = xy[codeField].head(topCount).tolist()topCodes = [x.strip() for x in topCodes]res_df = dfreturn addCodes(res_df, codeField, topCodes, optUDF, knownPref)# checks for presence of values in a fielddef isVal(df, field, value):return df.withColumn("is_" + field + "_" + value,F.when(F.col(field) == value,F.lit(1)).otherwise(F.lit(0)))# sums values of a field within a specified windowdef sumVal(df, field, windowval):return df.withColumn("TOT_" + field, F.sum(field).over(windowval))# finds the maximum value of a field within a specified windowdef maxVal(df, field, windowval):return df.withColumn("MAX_" + field, F.max(field).over(windowval))# finds the average value of a field within a specified windowdef meanVal(df, field, windowval):return df.withColumn("MEAN_" + field, F.mean(field).over(windowval))# finds the ratio between two fields of a recorddef fracVal(df, numfield, denomfield, fracName):return df.withColumn(fracName, F.col(numfield) / F.col(denomfield))# adds required fields to the dataframedef addFeatsTrain(vw_cl_lines_df, param_dict):orig = vw_cl_lines_dfwindowval = (Window.partitionBy(param_dict["groupField"]).orderBy(param_dict["windowField"] + "_unix").rangeBetween(Window.unboundedPreceding, -1))codes_df = orig.withColumn("NUM_LINES", F.sum(F.lit(1)).over(windowval))for field in param_dict["isFields"]:codes_df = isVal(codes_df, field[0], field[1])for field in param_dict["sumFields"]:codes_df = sumVal(codes_df, field, windowval)for field in param_dict["maxFields"]:codes_df = maxVal(codes_df, field, windowval)for field in param_dict["meanFields"]:codes_df = meanVal(codes_df, field, windowval)for fracTuple in param_dict["fracTuples"]:codes_df = fracVal(codes_df, fracTuple[0], fracTuple[1], fracTuple[2])def remPref(x):if x is None:return ""x = x.split(",")y = []for item in x:if (('T' not in item) & ('M' not in item)):y.append(item.strip())y = ','.join(y)return yremPrefUDF = F.udf(remPref, StringType())allCodes = {}for code in param_dict["codeFields"]:if len(code) == 1:codes_df, toAdd = codeExtract(codes_df, code[0], 20)if code[0] in allCodes:allCodes[code[0]] = allCodes[code[0]] + toAddelse:allCodes[code[0]] = toAddelse:codes_df, toAdd = codeExtract(codes_df,code[0],20,optUDF=remPrefUDF,knownPref=code[1])if code[0] in allCodes:allCodes[code[0]] = allCodes[code[0]] + toAddelse:allCodes[code[0]] = toAddaddedCols = list(set(codes_df.columns) - set(vw_cl_lines_df.columns))return codes_df, addedCols, allCodesdef addFeatsTest(vw_cl_lines_df, param_dict, summary_df):orig = vw_cl_lines_dfjoinfields = [param_dict['groupField'], "NUM_LINES"]for field in param_dict["sumFields"]:joinfields.append("TOT_"+field)for field in param_dict["maxFields"]:joinfields.append("MAX_"+field)for field in param_dict["meanFields"]:joinfields.append("MEAN_"+field)for fracTuple in param_dict["fracTuples"]:joinfields.append(fracTuple[2])codes_df = orig.join(summary_df[joinfields], param_dict['groupField'],how='left')for field in param_dict["isFields"]:codes_df = isVal(codes_df, field[0], field[1])def remPref(x):if x is None:return ""x = x.split(",")y = []for item in x:if (('T' not in item) & ('M' not in item)):y.append(item.strip())y = ','.join(y)return yremPrefUDF = F.udf(remPref, StringType())allCodes = {}for code in param_dict["codeFields"]:presentInTrain = param_dict["allCodes"][code[0]]if len(code) == 1:codes_df, added = addCodes(codes_df, code[0], presentInTrain, None,None)else:codes_df, added = addCodes(codes_df, code[0], presentInTrain,optUDF=remPrefUDF, knownPref=code[1])addedCols = list(set(codes_df.columns) - set(vw_cl_lines_df.columns))return codes_df, addedCols# prepares the data for use in a training or inference by adding features# and appropriate labelsdef prepTrainData(df, baseFeatures, param_dict):trainData = http://www.kingceram.com/post/loadDataset(df, param_dict["custSegOrder"], param_dict)negCount = trainData.filter(trainData[param_dict["labelField"]] ==param_dict["negativeLabel"]).count()posCount = trainData.filter(trainData[param_dict["labelField"]] ==param_dict["positiveLabel"]).count()pos_weight = negCount/posCounttrainData, extraCols, param_dict["allCodes"] = addFeatsTrain(trainData,param_dict)vw_cl_lines_pd = trainData.toPandas()prep_labelled_data_pd = pd.get_dummies(vw_cl_lines_pd,columns=param_dict["BASE_FEATURES_CATEGORICAL"],drop_first=False,prefix_sep="~~")featureCols = extraCols + checkContain(baseFeatures,prep_labelled_data_pd.columns.tolist(),param_dict["LOG_TRANSFORM_FEATURES"] +param_dict["BASE_FEATURES_CATEGORICAL"])param_dict["oneHots"] = [x for x in prep_labelled_data_pd.columns.tolist()if "~~" in x]leakageFeats = ["is_"+str(x[0])+"_"+str(x[1]) for x inparam_dict["isFields"] if x[0] == param_dict["labelField"]]featureCols = [x for x in featureCols if x not in leakageFeats]return prep_labelled_data_pd, featureCols, pos_weight, param_dictdef prepTestData(df, summary, baseFeatures, param_dict):trainData = http://www.kingceram.com/post/loadDataset(df, param_dict["custSegOrder"], param_dict)trainData, extraCols = addFeatsTest(trainData, param_dict , summary)vw_cl_lines_pd = trainData.toPandas()prep_labelled_data_pd = addOneHotsTest(vw_cl_lines_pd,param_dict["oneHots"])featureCols = extraCols + checkContain(baseFeatures,prep_labelled_data_pd.columns.tolist(),param_dict["LOG_TRANSFORM_FEATURES"] +param_dict["BASE_FEATURES_CATEGORICAL"])leakageFeats = ["is_"+str(x[0])+"_"+str(x[1]) for x inparam_dict["isFields"] if x[0] == param_dict["labelField"]]featureCols = [x for x in featureCols if x not in leakageFeats]return prep_labelled_data_pd, featureCols# trains and returns an XGBoost Classifierdef trainXGBModel(df, param_dict):# ,onlyWarn = False):pdf, feats, ratio, param_dict = prepTrainData(df, param_dict["baseFeatures"], param_dict)for col in param_dict["BASE_FEATURES_TIMESTAMP"]:pdf[col] = pd.to_datetime(pdf[col], errors='coerce')adf = pdf.replace([np.inf,-np.inf], 0)cols = pdf[feats].columnslabel = np.where(adf[param_dict["labelField"]] ==param_dict["positiveLabel"], 1, 0)x = adf[feats].values #returns a numpy arraystandard_scaler = preprocessing.StandardScaler()x_scaled = standard_scaler.fit_transform(x)adf = pd.DataFrame(x_scaled, columns=adf[feats].columns)adf['label'] = label#X_train, y_train = adf[feats], adf['label']xgb_model, bestThresh = run_xgboost(adf[feats + ['label']], feats, scale_pos_weight= ratio)param_dict["trainedCols"] = list(feats)return xgb_model, feats, param_dict, bestThreshdef updateXGBModel(df, param_dict, model):pandas_df, featureCols, pos_weight = prepTestData(df, param_dict["baseFeatures"], param_dict)pandas_df['label'] = np.where(pandas_df[param_dict["labelField"]] ==param_dict["positiveLabel"], 1, 0)pandas_df = pandas_df.fillna(0)y_train = pandas_df['label'].valuesX_train_pd = pandas_df.drop('label', 1)if len(X_train_pd) > 100000 :X = np.array_split(X_train_pd, 100000)y = np.array_split(y_train, 100000)for i in range(len(X)):xgb_class = XGBClassifier(scale_pos_weight=pos_weight)model = xgb_class.fit(X[i],y[i], xgb_model = model)xgb_model = modelreturn xgb_model, featureCols, param_dict# uses a model to predict valuesdef modelPredict(model, test_df, summary, param_dict, posThresh):test_pdf, feats1 = prepTestData(test_df, summary, param_dict["baseFeatures"], param_dict)for col in param_dict["BASE_FEATURES_TIMESTAMP"]:test_pdf[col] = pd.to_datetime(test_pdf[col], errors='coerce')test_adf = test_pdf.replace([np.inf,-np.inf], 0)x = test_adf[feats1].values #returns a numpy arraystandard_scaler = preprocessing.StandardScaler()x_scaled = standard_scaler.fit_transform(x)test_adf = pd.DataFrame(x_scaled, columns=test_adf[feats1].columns)X_test = test_adf[param_dict["trainedCols"]]result_proba = model.predict_proba(X_test)result = []result = (result_proba[:,1] >= posThresh).astype(int)#result = model.predict(X_test)return result, result_proba