diff --git a/training/gordon-group/seed/ActionRuntime/py-gordon-ML_2_0_0.json b/training/gordon-group/seed/ActionRuntime/py-gordon-ML_2_0_0.json new file mode 100644 index 00000000..34699175 --- /dev/null +++ b/training/gordon-group/seed/ActionRuntime/py-gordon-ML_2_0_0.json @@ -0,0 +1,24 @@ +{ + "language": "Python", + "runtimeVersion": "3.8.10", + "modules": { + "conda.scikit-learn":"=0.24.2", + "conda.pandas":"=1.3.0", + "conda.pytorch":"=1.12.1", + "conda.torchvision":"=0.13.1", + "conda.torchaudio":"=0.12.1", + "conda.cudatoolkit":"=11.3", + "conda.libgcc":"=7.2.0", + "conda.gpytorch":"=1.9.0", + "conda.dill":"=0.2.8.2" + }, + "repositories": [ + "https://repo.continuum.io/pkgs/main", + "conda-forge", + "pytorch", + "anaconda" + ], + "runtime": "CPython", + "name": "py-gordon-ML_2_0_0", + "id": "py-gordon-ML_2_0_0" +} \ No newline at end of file diff --git a/training/gordon-group/src/CustomMLPipeline/machineLearning/GPRegression/GaussianProcessRegressionPipe.c3typ b/training/gordon-group/src/CustomMLPipeline/machineLearning/GPRegression/GaussianProcessRegressionPipe.c3typ index 294929a2..3a6a55fe 100644 --- a/training/gordon-group/src/CustomMLPipeline/machineLearning/GPRegression/GaussianProcessRegressionPipe.c3typ +++ b/training/gordon-group/src/CustomMLPipeline/machineLearning/GPRegression/GaussianProcessRegressionPipe.c3typ @@ -5,13 +5,14 @@ /** * GaussianProcessRegressionPipe.c3typ * Performs Scikit-Learn's GP Regression. +* Bogus comment to reprovision app. */ @db(unique=['technique, dataSourceSpec']) entity type GaussianProcessRegressionPipe extends MLLeafPipe type key 'GPREG' { // the technique for this regression technique: !GaussianProcessRegressionTechnique // data source spec for this regression - dataSourceSpec: !GPRDataSourceSpec + dataSourceSpec: GPRDataSourceSpec // get features data @py(env='gordon-ML_1_0_0') @@ -28,4 +29,10 @@ entity type GaussianProcessRegressionPipe extends MLLeafPipe t // guarantee that process() is only allowed after train() @py(env='gordon-ML_1_0_0') isProcessable: ~ + // train large model with AOD staged data + @py(env='gordon-ML_1_0_0') + trainWithStagedAOD: member function(modelIds: any): integer + // train with list of GSTPs + @py(env='gordon-ML_1_0_0') + trainWithListOfAODModels: member function(modelIds: any, excludeFeatures: any): integer } diff --git a/training/gordon-group/src/CustomMLPipeline/machineLearning/GPRegression/GaussianProcessRegressionPipe.py b/training/gordon-group/src/CustomMLPipeline/machineLearning/GPRegression/GaussianProcessRegressionPipe.py index 8f032df0..5ba1084e 100644 --- a/training/gordon-group/src/CustomMLPipeline/machineLearning/GPRegression/GaussianProcessRegressionPipe.py +++ b/training/gordon-group/src/CustomMLPipeline/machineLearning/GPRegression/GaussianProcessRegressionPipe.py @@ -12,6 +12,18 @@ def train(this, input, targetOutput, spec): X = c3.Dataset.toNumpy(dataset=input) y = c3.Dataset.toNumpy(dataset=targetOutput) + if (technique.centerTarget): + targetMean = float(y.mean()) + y = y - y.mean() + + if (technique.validation): + rng = np.random.RandomState(technique.randomSeed) + rng.shuffle(X) + X = X[0:int((1.0 - technique.splitFraction)*len(X))] + rng.shuffle(y) + y = y[0:int((1.0 - technique.splitFraction)*len(y))] + + # get kernel object from c3, make it python again kernel = c3.PythonSerialization.deserialize(serialized=serializedKernel.pickledKernel) @@ -19,7 +31,17 @@ def train(this, input, targetOutput, spec): gp = GaussianProcessRegressor(kernel=kernel) gp.fit(X, y) - this.trainedModel = c3.MLTrainedModelArtifact(model=c3.PythonSerialization.serialize(obj=gp)) + if (technique.centerTarget): + params = {} + params["targetMean"] = targetMean + this.trainedModel = c3.MLTrainedModelArtifact( + model=c3.PythonSerialization.serialize(obj=gp), + parameters=params + ) + else: + this.trainedModel = c3.MLTrainedModelArtifact( + model=c3.PythonSerialization.serialize(obj=gp), + ) return this @@ -71,8 +93,26 @@ def getFeatures(this): import pandas as pd dataSourceSpec = c3.GPRDataSourceSpec.get(this.dataSourceSpec.id) - featuresType = dataSourceSpec.featuresType.toType() + + if (featuresType.name == "StagedFeatures"): + features = c3.StagedFeatures.fetch({ + "limit": -1, + "order": "id" + }).objs.toJson() + + df = pd.DataFrame(features) + keys = df.iloc[0]["features"].keys() + + for key in keys: + df[key] = df["features"].apply(lambda x: x[key]) + + df.drop("version", axis=1, inplace=True) + df = df.select_dtypes(["number"]) + + return c3.Dataset.fromPython(df) + + inputTableC3 = featuresType.fetch(dataSourceSpec.featuresSpec).objs.toJson() inputTablePandas = pd.DataFrame(inputTableC3) inputTablePandas = inputTablePandas.drop("version", axis=1) @@ -94,8 +134,26 @@ def getTarget(this): import pandas as pd dataSourceSpec = c3.GPRDataSourceSpec.get(this.dataSourceSpec.id) - targetType = dataSourceSpec.targetType.toType() + + if (targetType.name == "StagedTargets"): + targets = c3.StagedTargets.fetch({ + "limit": -1, + "order": "id" + }).objs.toJson() + + df = pd.DataFrame(targets) + keys = df.iloc[0]["targets"].keys() + + for key in keys: + df[key] = df["targets"].apply(lambda x: x[key]) + + df.drop("version", axis=1, inplace=True) + df = df.select_dtypes(["number"]) + + return c3.Dataset.fromPython(df) + + outputTableC3 = targetType.fetch(dataSourceSpec.targetSpec).objs.toJson() outputTablePandas = pd.DataFrame(outputTableC3) outputTablePandas = outputTablePandas.drop("version", axis=1) @@ -112,3 +170,126 @@ def getTarget(this): outputTablePandas = pd.DataFrame(outputTablePandas[dataSourceSpec.targetName]) return c3.Dataset.fromPython(outputTablePandas) + + +def trainWithStagedAOD(this, modelIds): + """ + This method trains a large model with data coming from previously trained + GPR models with AOD data. + + Inputs: + ids: list of GaussianProcessRegressionPipes ids + + Returns: + int: 0 if method worked, 1 otherwise + """ + from sklearn.gaussian_process import GaussianProcessRegressor + + # stage features and targets + c3.StagedFeatures.stageFromAODGPRModelIdsList(modelIds) + c3.StagedTargets.stageFromAODGPRModelIdsList(modelIds) + # get data + X = c3.Dataset.toNumpy(dataset=this.getFeatures()) + y = c3.Dataset.toNumpy(dataset=this.getTarget()) + + # generate training technique + technique = c3.GaussianProcessRegressionTechnique.get(this.technique.id) + serializedKernel = c3.SklearnGPRKernel.get(technique.kernel.id) + + if (technique.centerTarget): + targetMean = float(y.mean()) + y = y - y.mean() + + # get kernel object from c3, make it python again + kernel = c3.PythonSerialization.deserialize(serialized=serializedKernel.pickledKernel) + + # build and train GPR + gp = GaussianProcessRegressor(kernel=kernel) + gp.fit(X, y) + + if (technique.centerTarget): + params = {} + params["targetMean"] = targetMean + this.trainedModel = c3.MLTrainedModelArtifact( + model=c3.PythonSerialization.serialize(obj=gp), + parameters=params + ) + else: + this.trainedModel = c3.MLTrainedModelArtifact( + model=c3.PythonSerialization.serialize(obj=gp), + ) + + this.upsert() + + return 0 + +def trainWithListOfAODModels(this, modelIds, excludeFeatures): + """ + This method trains a large model with data coming from previously trained + GPR models with AOD data. + + Inputs: + ids: list of GaussianProcessRegressionPipes ids + + Returns: + int: 0 if method worked, 1 otherwise + """ + from sklearn.gaussian_process import GaussianProcessRegressor + import pandas as pd + from datetime import timedelta + + # get data + X = pd.DataFrame() + y = pd.DataFrame() + for model_id in modelIds: + model = c3.GaussianProcessRegressionPipe.get(model_id) + data_source_spec = c3.GPRDataSourceSpec.get(model.dataSourceSpec.id, "targetSpec") + gstp_id = data_source_spec.targetSpec.filter.split(" == ")[1].replace('"', '') + gstp = c3.GeoSurfaceTimePoint.get(gstp_id) + my_time = gstp.time.timetuple() + px = c3.Dataset.toPandas(model.getFeatures()) + px["latitude"] = gstp.latitude + px["longitude"] = gstp.longitude + px["time"] = timedelta( + days=my_time.tm_yday, + minutes=my_time.tm_min, + hours=my_time.tm_hour + ).total_seconds() / 3600 + X = pd.concat([X,px], ignore_index=True) + + py = c3.Dataset.toPandas(model.getTarget()) + y = pd.concat([y,py], ignore_index=True) + X.drop(columns=excludeFeatures, inplace=True) + X = X.to_numpy() + y = y.to_numpy() + + # generate training technique + technique = c3.GaussianProcessRegressionTechnique.get(this.technique.id) + serializedKernel = c3.SklearnGPRKernel.get(technique.kernel.id) + + if (technique.centerTarget): + targetMean = float(y.mean()) + y = y - y.mean() + + # get kernel object from c3, make it python again + kernel = c3.PythonSerialization.deserialize(serialized=serializedKernel.pickledKernel) + + # build and train GPR + gp = GaussianProcessRegressor(kernel=kernel) + gp.fit(X, y) + + if (technique.centerTarget): + params = {} + params["targetMean"] = targetMean + this.trainedModel = c3.MLTrainedModelArtifact( + model=c3.PythonSerialization.serialize(obj=gp), + parameters=params + ) + else: + this.trainedModel = c3.MLTrainedModelArtifact( + model=c3.PythonSerialization.serialize(obj=gp), + ) + + this.upsert() + + return 0 \ No newline at end of file diff --git a/training/gordon-group/src/CustomMLPipeline/machineLearning/GPRegression/GaussianProcessRegressionTechnique.c3typ b/training/gordon-group/src/CustomMLPipeline/machineLearning/GPRegression/GaussianProcessRegressionTechnique.c3typ index 71915854..6e1543f0 100644 --- a/training/gordon-group/src/CustomMLPipeline/machineLearning/GPRegression/GaussianProcessRegressionTechnique.c3typ +++ b/training/gordon-group/src/CustomMLPipeline/machineLearning/GPRegression/GaussianProcessRegressionTechnique.c3typ @@ -10,4 +10,16 @@ entity type GaussianProcessRegressionTechnique mixes MLTechnique schema name 'GP // the kernel object @ML(hyperParameter=true) kernel: SklearnGPRKernel + // center target data before fitting + @ML(hyperParameter=true) + centerTarget: boolean=false + // leave fraction of rows for post-validation + @ML(hyperParameter=true) + validation: boolean=false + // random seed to be used by numpy.shuffle + @ML(hyperParameter=true) + randomSeed: integer=42 + // fraction to be left for validation + @ML(hyperParameter=true) + splitFraction: float=0.2 } \ No newline at end of file diff --git a/training/gordon-group/src/Utils/AOD/AODGPRModelFinder.js b/training/gordon-group/src/Utils/AOD/AODGPRModelFinder.js index be6bbd64..29fe2001 100644 --- a/training/gordon-group/src/Utils/AOD/AODGPRModelFinder.js +++ b/training/gordon-group/src/Utils/AOD/AODGPRModelFinder.js @@ -23,7 +23,8 @@ function getPipe(excFeats, gstpId, targetName, technique) { }).objs.map(obj => obj.id); // find the techniques - filter = Filter.intersects("kernel.id", kernelIds); + filter = Filter.intersects("kernel.id", kernelIds) + .and().eq("centerTarget", technique.centerTarget); var techIds = GaussianProcessRegressionTechnique.fetch({ "filter": filter.value, "limit": -1, diff --git a/training/gordon-group/src/Utils/DataStaging/StagedFeatures.c3typ b/training/gordon-group/src/Utils/DataStaging/StagedFeatures.c3typ new file mode 100644 index 00000000..d0bd8c42 --- /dev/null +++ b/training/gordon-group/src/Utils/DataStaging/StagedFeatures.c3typ @@ -0,0 +1,15 @@ +/** +* Copyright (c) 2022, C3 AI DTI, Development Operations Team +* All rights reserved. License: https://github.com/c3aidti/.github +**/ +/** +* This type hosts data for models that cannot obtain Features +* directly from other entity types. +*/ +entity type StagedFeatures schema name "STGD_FTRS" { + // the features to be staged + features: map schema suffix "FTR" + // method to stage from gstp list + @py(env='gordon_1_0_0') + stageFromAODGPRModelIdsList: function(ids: !any): integer +} diff --git a/training/gordon-group/src/Utils/DataStaging/StagedFeatures.py b/training/gordon-group/src/Utils/DataStaging/StagedFeatures.py new file mode 100644 index 00000000..861e8942 --- /dev/null +++ b/training/gordon-group/src/Utils/DataStaging/StagedFeatures.py @@ -0,0 +1,46 @@ +def stageFromAODGPRModelIdsList(ids): + """ + Given a list of GaussianProcessRegressionPipes trained with + AOD data, stage the features for each model. + + Input: + ids: list of model ids + + Return: + int: zero if it worked, 1 if it failed + """ + import pandas as pd + from datetime import timedelta + + c3.StagedFeatures.removeAll() + + df = pd.DataFrame() + for model_id in ids: + model = c3.GaussianProcessRegressionPipe.get(model_id) + data_source_spec = c3.GPRDataSourceSpec.get(model.dataSourceSpec.id, "targetSpec") + gstp_id = data_source_spec.targetSpec.filter.split(" == ")[1].replace('"', '') + gstp = c3.GeoSurfaceTimePoint.get(gstp_id) + pdf = c3.Dataset.toPandas(model.getFeatures()) + pdf["latitude"] = gstp.latitude + pdf["longitude"] = gstp.longitude + my_time = gstp.time.timetuple() + pdf["time"] = timedelta( + days=my_time.tm_yday, + minutes=my_time.tm_min, + hours=my_time.tm_hour + ).total_seconds() / 3600 + df = pd.concat([df,pdf], ignore_index=True) + + def row_to_dict(row): + d = {} + for col in row.index: + d[col] = row[col] + return d + + df_final = pd.DataFrame() + df_final["features"] = df.apply(row_to_dict, axis=1) + df_final["id"] = df_final.index + output_records = df_final.to_dict(orient="records") + c3.StagedFeatures.upsertBatch(objs=output_records) + + return 0 \ No newline at end of file diff --git a/training/gordon-group/src/Utils/DataStaging/StagedTargets.c3typ b/training/gordon-group/src/Utils/DataStaging/StagedTargets.c3typ new file mode 100644 index 00000000..6d31dcab --- /dev/null +++ b/training/gordon-group/src/Utils/DataStaging/StagedTargets.c3typ @@ -0,0 +1,15 @@ +/** +* Copyright (c) 2022, C3 AI DTI, Development Operations Team +* All rights reserved. License: https://github.com/c3aidti/.github +**/ +/** +* This type hosts data for models that cannot obtain Targets +* directly from other entity types. +*/ +entity type StagedTargets schema name "STGD_TRGTS" { + // the staged targets + targets: map schema suffix "TRGT" + // method to stage from gstp list + @py(env='gordon_1_0_0') + stageFromAODGPRModelIdsList: function(ids: !any): integer +} diff --git a/training/gordon-group/src/Utils/DataStaging/StagedTargets.py b/training/gordon-group/src/Utils/DataStaging/StagedTargets.py new file mode 100644 index 00000000..cafd3dad --- /dev/null +++ b/training/gordon-group/src/Utils/DataStaging/StagedTargets.py @@ -0,0 +1,35 @@ +def stageFromAODGPRModelIdsList(ids): + """ + Given a list of GaussianProcessRegressionPipes trained with + AOD data, stage the targets for each model. + + Input: + ids: list of model ids + + Return: + int: zero if it worked, 1 if it failed + """ + import pandas as pd + + c3.StagedTargets.removeAll() + + df = pd.DataFrame() + for model_id in ids: + model = c3.GaussianProcessRegressionPipe.get(model_id) + pdf = c3.Dataset.toPandas(model.getTarget()) + df = pd.concat([df,pdf], ignore_index=True) + + def row_to_dict(row): + d = {} + for col in row.index: + d[col] = row[col] + return d + + df_final = pd.DataFrame() + df_final["targets"] = df.apply(row_to_dict, axis=1) + df_final["id"] = df_final.index + + output_records = df_final.to_dict(orient="records") + c3.StagedTargets.upsertBatch(objs=output_records) + + return 0 diff --git a/training/gordon-group/src/Utils/Predict/PredictAODGPR.c3typ b/training/gordon-group/src/Utils/Predict/PredictAODGPR.c3typ new file mode 100644 index 00000000..608df842 --- /dev/null +++ b/training/gordon-group/src/Utils/Predict/PredictAODGPR.c3typ @@ -0,0 +1,22 @@ +/** +* Copyright (c) 2022, C3 AI DTI, Development Operations Team +* All rights reserved. License: https://github.com/c3aidti/.github +**/ +/** +* This finds {@link GaussianProcessRegressionPipe}s that were trained +* with {@link Simulation3HourlyAODOutput} as targets, +* {@link SimulationModelParameters} as features, +* via a {@link AODGaussianMLTrainingJob} +*/ +type PredictAODGPR { + // Retrieve models based on exluded features, {@link GeoSurfaceTimePoint} instance, target name and training technique + getPipe: function(excFeats: [string], gstpId: string, targetName: string, technique: any): any js server + // Retrieve all models for a certain {@link GeoSurfaceTimePoint} filter + getPipes: function(excFeats: [string], gstpFilter: any, targetName: string, technique: any): any js server + // Extract learned parameters from trained {@link GaussianProcessRegressionPipe}s specified by {@link GeoSurfaceTimePoint} filter, target name, excluded features and {@link GaussianProcessRegressionTechnque} + @py(env='gordon-ML_1_0_0') + makePredictionsJob: function(excFeats: [string], gstpFilter: any, targetName: string, synthDataset: any, technique: any, batchSize: int): any + // Build a pandas dataframe with the hyper parameters once job is complete + @py(env='gordon-ML_1_0_0') + getPredictionsDataframeFromJob: inline function(job: any): any +} \ No newline at end of file diff --git a/training/gordon-group/src/Utils/Predict/PredictAODGPR.js b/training/gordon-group/src/Utils/Predict/PredictAODGPR.js new file mode 100644 index 00000000..93b9c7a0 --- /dev/null +++ b/training/gordon-group/src/Utils/Predict/PredictAODGPR.js @@ -0,0 +1,59 @@ +function getPipe(excFeats, gstpId, targetName, technique) { + // identical to the methods used in AODGPRModelFinder.js + + // find the data source specs + var gstpKey = "geoSurfaceTimePoint.id == \"" + gstpId + "\""; + var filter = Filter.eq("featuresType.typeName", "SimulationModelParameters") + .and().eq("targetType.typeName", "Simulation3HourlyAODOutput") + .and().intersects("excludeFeatures", excFeats) + .and().eq("targetName", targetName) + .and().eq("targetSpec.filter", gstpKey); + + var sourceSpecIds = GPRDataSourceSpec.fetch({ + "filter": filter, + "limit": -1, + "include": "id" + }).objs.map(obj => obj.id); + + // find the kernels + filter = Filter.eq("pickledKernel", technique.kernel.pickledKernel); + var kernelIds = SklearnGPRKernel.fetch({ + "filter": filter.value, + "limit": -1, + "include": "id" + }).objs.map(obj => obj.id); + + // find the techniques + filter = Filter.intersects("kernel.id", kernelIds) + .and().eq("centerTarget", technique.centerTarget); + var techIds = GaussianProcessRegressionTechnique.fetch({ + "filter": filter.value, + "limit": -1, + "include": "id" + }).objs.map(obj => obj.id); + + // now find the models + filter = Filter.intersects("technique.id", techIds) + .and().intersects("dataSourceSpec.id", sourceSpecIds); + var pipes = GaussianProcessRegressionPipe.fetch({ + "filter": filter.value, + "limit": -1 + }).objs; + + return pipes +} + +function getPipes(excFeats, gstpFilter, targetName, technique) { + var gstpIds = GeoSurfaceTimePoint.fetch({ + "filter": gstpFilter, + "limit": -1, + "include": "id" + }).objs.map(obj => obj.id); + + var pipes = gstpIds.map(id => AODGPRModelFinder.getPipe(excFeats, id, targetName, technique)); + var nonNulls = pipes.filter(function (el) { + return el.length != 0; + }); + + return nonNulls +} \ No newline at end of file diff --git a/training/gordon-group/src/Utils/Predict/PredictAODGPR.py b/training/gordon-group/src/Utils/Predict/PredictAODGPR.py new file mode 100644 index 00000000..4daba9c8 --- /dev/null +++ b/training/gordon-group/src/Utils/Predict/PredictAODGPR.py @@ -0,0 +1,103 @@ +def makePredictionsJob( + excFeats, gstpFilter, targetName, synthDataset, technique, batchSize +): + """ + Dynamic map-reduce job to get predictions on synthDataset. + """ + + def cassandra_mapper(batch, objs, job): + models = [] + for obj in objs: + model = c3.AODGPRModelFinder.getPipe( + job.context.value["excludeFeatures"], + obj.id, + job.context.value["targetName"], + job.context.value["technique"] + ) + models.append(model) + + return {batch: models} + + def cassandra_reducer(key, interValues, job): + values = [] + synthDataframe = c3.Dataset.toPandas(job.context.value["syntheticDataset"]) + for iv in interValues: + for val in iv: + for m in val: + # predictions + model_id = m["id"] + centered = m["technique"]["centerTarget"] + if centered: + center = m["trainedModel"].parameters["targetMean"].asfloat() + else: + center = 0 + pickledModel = m["trainedModel"]["model"] + model = c3.PythonSerialization.deserialize(serialized=pickledModel) + mean, sd = model.predict(synthDataframe, return_std=True) + + # location + dssId = m["dataSourceSpec"]["id"] + dss = c3.GPRDataSourceSpec.get(dssId) + gstpId = dss.targetSpec.filter.split(" == ")[1].replace('"', '') + gstp = c3.GeoSurfaceTimePoint.get(gstpId) + lat = gstp.latitude + lon = gstp.longitude + time = gstp.time + values.append((model_id, mean, center, sd, lat, lon, time)) + + + return values + + map_lambda = c3.Lambda.fromPython(cassandra_mapper) + reduce_lambda = c3.Lambda.fromPython(cassandra_reducer, runtime="gordon-ML_1_0_0") + + job_context = c3.MappObj( + value={ + 'excludeFeatures': excFeats, + 'targetName': targetName, + 'technique': technique, + 'syntheticDataset': synthDataset + } + ) + job = c3.DynMapReduce.startFromSpec( + c3.DynMapReduceSpec( + targetType="GeoSurfaceTimePoint", + filter=gstpFilter, + mapLambda=map_lambda, + reduceLambda=reduce_lambda, + batchSize=batchSize, + context=job_context + ) + ) + + return job + + +def getPredictionsDataframeFromJob(job): + """ + Iterates over job result and builds dataframe. + """ + import pandas as pd + import numpy as np + + predictions = [] + + if job.status().status == "completed": + for key, value in job.results().items(): + for subvalue in value: + df_m = pd.DataFrame() + df_m["meanResponse"] = np.array(subvalue[1]).flatten() + df_m["meanResponse"] += subvalue[2] + df_m["sdResponse"] = subvalue[3] + df_m["latitude"] = subvalue[4] + df_m["longitude"] = subvalue[5] + df_m["time"] = subvalue[6] + df_m["modelId"] = subvalue[0] + df_m["variant"] = list(range(df_m.shape[0])) + + predictions.append(df_m) + + df = pd.concat(predictions, axis=0).reset_index(drop=True) + return df + else: + return False \ No newline at end of file