Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
c60d7aa
try staging data
babreu-ncsa Aug 31, 2022
cbc7b46
lil fix
babreu-ncsa Aug 31, 2022
56cbdb2
frankeinteining
babreu-ncsa Aug 31, 2022
2425ad7
try targets now
babreu-ncsa Aug 31, 2022
3e2601c
adding target center
babreu-ncsa Sep 16, 2022
7029e87
lil fix
babreu-ncsa Sep 16, 2022
b1f76b6
typo
babreu-ncsa Sep 16, 2022
33f8e38
another fix
babreu-ncsa Sep 16, 2022
392c4ee
add centerTarget to getPipe method
babreu-ncsa Sep 23, 2022
ce27a66
call fix
babreu-ncsa Sep 26, 2022
a43051c
first stab at method
babreu-ncsa Oct 4, 2022
f6bf080
typo
babreu-ncsa Oct 4, 2022
0d3cea2
fix syntax bug
babreu-ncsa Oct 4, 2022
22da146
making it more flexible
babreu-ncsa Oct 4, 2022
dc1fa70
try without timestamps
babreu-ncsa Oct 4, 2022
abae3a9
clear table before start
babreu-ncsa Oct 4, 2022
ee487af
okay
babreu-ncsa Oct 4, 2022
e464e52
simpler!
babreu-ncsa Oct 4, 2022
ee35cf2
extra method call
babreu-ncsa Oct 4, 2022
6c6c4ed
cleaner bur likely slower
babreu-ncsa Oct 4, 2022
9c8539c
forgot function def
babreu-ncsa Oct 4, 2022
72d9d93
method to train with staged data
babreu-ncsa Oct 5, 2022
13c7fdf
change argument name
babreu-ncsa Oct 5, 2022
6e5e09c
env for gpytorch -- first of many
babreu-ncsa Oct 12, 2022
36e9bad
right
babreu-ncsa Oct 12, 2022
fc68638
add pandas
babreu-ncsa Oct 12, 2022
bb93223
forgot something
babreu-ncsa Oct 12, 2022
6483836
adding validation to GPR train method
babreu-ncsa Oct 12, 2022
2c13dd5
shuffling feats and targs
babreu-ncsa Oct 12, 2022
1bd5d28
fix typos
babreu-ncsa Oct 12, 2022
b2d3452
add torch
babreu-ncsa Oct 13, 2022
07698ce
typo
babreu-ncsa Oct 13, 2022
cc690e8
ligcc
babreu-ncsa Oct 14, 2022
46d0525
add gpytorch
babreu-ncsa Oct 14, 2022
ccb8795
adding dill for serialization
babreu-ncsa Oct 14, 2022
4485189
keep data in memory
babreu-ncsa Oct 27, 2022
97146a3
add pandas
babreu-ncsa Oct 27, 2022
a76dc7e
another trial
babreu-ncsa Oct 27, 2022
81f4219
source spec not required
babreu-ncsa Oct 27, 2022
321a00e
Include time in hours as staged feature
JamesCarzon Oct 28, 2022
d3f0dd4
add excl feats
babreu-ncsa Nov 3, 2022
b71cbb9
making it more readable
babreu-ncsa Nov 3, 2022
3127792
Include time in hours as action feature
JamesCarzon Nov 3, 2022
4e86563
Predict using DynMapReduce
JamesCarzon Nov 6, 2022
0c5316b
Match order of arguments
JamesCarzon Nov 6, 2022
e022e7f
Try prediction with unpickled model
JamesCarzon Nov 7, 2022
f9370fe
Grab synthDataset from job context
JamesCarzon Nov 7, 2022
72b6593
Import pandas for method
JamesCarzon Nov 7, 2022
cd34430
Keep predictions without pandas
JamesCarzon Nov 7, 2022
872eb02
Typo: returned unused object
JamesCarzon Nov 7, 2022
928e552
Restructure resultant df of predictions
JamesCarzon Nov 7, 2022
c7eb9e1
Add lat, lon, time to prediction results
JamesCarzon Nov 8, 2022
67e11bf
Don't save synthDataset in results
JamesCarzon Nov 8, 2022
0b2489d
Indexing subvalues correctly
JamesCarzon Nov 8, 2022
f02f52a
Clean up predictions data frame
JamesCarzon Nov 9, 2022
049256e
Return one df rather than list of dfs
JamesCarzon Nov 9, 2022
787a0bb
Track model variants in predictions
JamesCarzon Nov 14, 2022
95438ad
bogus comment to kick workflow
babreu-ncsa Jan 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions training/gordon-group/seed/ActionRuntime/py-gordon-ML_2_0_0.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"language": "Python",
"runtimeVersion": "3.8.10",
"modules": {
"conda.scikit-learn":"=0.24.2",
"conda.pandas":"=1.3.0",
"conda.pytorch":"=1.12.1",
"conda.torchvision":"=0.13.1",
"conda.torchaudio":"=0.12.1",
"conda.cudatoolkit":"=11.3",
"conda.libgcc":"=7.2.0",
"conda.gpytorch":"=1.9.0",
"conda.dill":"=0.2.8.2"
},
"repositories": [
"https://repo.continuum.io/pkgs/main",
"conda-forge",
"pytorch",
"anaconda"
],
"runtime": "CPython",
"name": "py-gordon-ML_2_0_0",
"id": "py-gordon-ML_2_0_0"
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
/**
* GaussianProcessRegressionPipe.c3typ
* Performs Scikit-Learn's GP Regression.
* Bogus comment to reprovision app.
*/
@db(unique=['technique, dataSourceSpec'])
entity type GaussianProcessRegressionPipe extends MLLeafPipe<Dataset, Dataset> type key 'GPREG' {
// the technique for this regression
technique: !GaussianProcessRegressionTechnique
// data source spec for this regression
dataSourceSpec: !GPRDataSourceSpec
dataSourceSpec: GPRDataSourceSpec

// get features data
@py(env='gordon-ML_1_0_0')
Expand All @@ -28,4 +29,10 @@ entity type GaussianProcessRegressionPipe extends MLLeafPipe<Dataset, Dataset> t
// guarantee that process() is only allowed after train()
@py(env='gordon-ML_1_0_0')
isProcessable: ~
// train large model with AOD staged data
@py(env='gordon-ML_1_0_0')
trainWithStagedAOD: member function(modelIds: any): integer
// train with list of GSTPs
@py(env='gordon-ML_1_0_0')
trainWithListOfAODModels: member function(modelIds: any, excludeFeatures: any): integer
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,36 @@ def train(this, input, targetOutput, spec):
X = c3.Dataset.toNumpy(dataset=input)
y = c3.Dataset.toNumpy(dataset=targetOutput)

if (technique.centerTarget):
targetMean = float(y.mean())
y = y - y.mean()

if (technique.validation):
rng = np.random.RandomState(technique.randomSeed)
rng.shuffle(X)
X = X[0:int((1.0 - technique.splitFraction)*len(X))]
rng.shuffle(y)
y = y[0:int((1.0 - technique.splitFraction)*len(y))]


# get kernel object from c3, make it python again
kernel = c3.PythonSerialization.deserialize(serialized=serializedKernel.pickledKernel)

# build and train GPR
gp = GaussianProcessRegressor(kernel=kernel)
gp.fit(X, y)

this.trainedModel = c3.MLTrainedModelArtifact(model=c3.PythonSerialization.serialize(obj=gp))
if (technique.centerTarget):
params = {}
params["targetMean"] = targetMean
this.trainedModel = c3.MLTrainedModelArtifact(
model=c3.PythonSerialization.serialize(obj=gp),
parameters=params
)
else:
this.trainedModel = c3.MLTrainedModelArtifact(
model=c3.PythonSerialization.serialize(obj=gp),
)

return this

Expand Down Expand Up @@ -71,8 +93,26 @@ def getFeatures(this):
import pandas as pd

dataSourceSpec = c3.GPRDataSourceSpec.get(this.dataSourceSpec.id)

featuresType = dataSourceSpec.featuresType.toType()

if (featuresType.name == "StagedFeatures"):
features = c3.StagedFeatures.fetch({
"limit": -1,
"order": "id"
}).objs.toJson()

df = pd.DataFrame(features)
keys = df.iloc[0]["features"].keys()

for key in keys:
df[key] = df["features"].apply(lambda x: x[key])

df.drop("version", axis=1, inplace=True)
df = df.select_dtypes(["number"])

return c3.Dataset.fromPython(df)


inputTableC3 = featuresType.fetch(dataSourceSpec.featuresSpec).objs.toJson()
inputTablePandas = pd.DataFrame(inputTableC3)
inputTablePandas = inputTablePandas.drop("version", axis=1)
Expand All @@ -94,8 +134,26 @@ def getTarget(this):
import pandas as pd

dataSourceSpec = c3.GPRDataSourceSpec.get(this.dataSourceSpec.id)

targetType = dataSourceSpec.targetType.toType()

if (targetType.name == "StagedTargets"):
targets = c3.StagedTargets.fetch({
"limit": -1,
"order": "id"
}).objs.toJson()

df = pd.DataFrame(targets)
keys = df.iloc[0]["targets"].keys()

for key in keys:
df[key] = df["targets"].apply(lambda x: x[key])

df.drop("version", axis=1, inplace=True)
df = df.select_dtypes(["number"])

return c3.Dataset.fromPython(df)


outputTableC3 = targetType.fetch(dataSourceSpec.targetSpec).objs.toJson()
outputTablePandas = pd.DataFrame(outputTableC3)
outputTablePandas = outputTablePandas.drop("version", axis=1)
Expand All @@ -112,3 +170,126 @@ def getTarget(this):
outputTablePandas = pd.DataFrame(outputTablePandas[dataSourceSpec.targetName])

return c3.Dataset.fromPython(outputTablePandas)


def trainWithStagedAOD(this, modelIds):
"""
This method trains a large model with data coming from previously trained
GPR models with AOD data.

Inputs:
ids: list of GaussianProcessRegressionPipes ids

Returns:
int: 0 if method worked, 1 otherwise
"""
from sklearn.gaussian_process import GaussianProcessRegressor

# stage features and targets
c3.StagedFeatures.stageFromAODGPRModelIdsList(modelIds)
c3.StagedTargets.stageFromAODGPRModelIdsList(modelIds)
# get data
X = c3.Dataset.toNumpy(dataset=this.getFeatures())
y = c3.Dataset.toNumpy(dataset=this.getTarget())

# generate training technique
technique = c3.GaussianProcessRegressionTechnique.get(this.technique.id)
serializedKernel = c3.SklearnGPRKernel.get(technique.kernel.id)

if (technique.centerTarget):
targetMean = float(y.mean())
y = y - y.mean()

# get kernel object from c3, make it python again
kernel = c3.PythonSerialization.deserialize(serialized=serializedKernel.pickledKernel)

# build and train GPR
gp = GaussianProcessRegressor(kernel=kernel)
gp.fit(X, y)

if (technique.centerTarget):
params = {}
params["targetMean"] = targetMean
this.trainedModel = c3.MLTrainedModelArtifact(
model=c3.PythonSerialization.serialize(obj=gp),
parameters=params
)
else:
this.trainedModel = c3.MLTrainedModelArtifact(
model=c3.PythonSerialization.serialize(obj=gp),
)

this.upsert()

return 0

def trainWithListOfAODModels(this, modelIds, excludeFeatures):
"""
This method trains a large model with data coming from previously trained
GPR models with AOD data.

Inputs:
ids: list of GaussianProcessRegressionPipes ids

Returns:
int: 0 if method worked, 1 otherwise
"""
from sklearn.gaussian_process import GaussianProcessRegressor
import pandas as pd
from datetime import timedelta

# get data
X = pd.DataFrame()
y = pd.DataFrame()
for model_id in modelIds:
model = c3.GaussianProcessRegressionPipe.get(model_id)
data_source_spec = c3.GPRDataSourceSpec.get(model.dataSourceSpec.id, "targetSpec")
gstp_id = data_source_spec.targetSpec.filter.split(" == ")[1].replace('"', '')
gstp = c3.GeoSurfaceTimePoint.get(gstp_id)
my_time = gstp.time.timetuple()
px = c3.Dataset.toPandas(model.getFeatures())
px["latitude"] = gstp.latitude
px["longitude"] = gstp.longitude
px["time"] = timedelta(
days=my_time.tm_yday,
minutes=my_time.tm_min,
hours=my_time.tm_hour
).total_seconds() / 3600
X = pd.concat([X,px], ignore_index=True)

py = c3.Dataset.toPandas(model.getTarget())
y = pd.concat([y,py], ignore_index=True)
X.drop(columns=excludeFeatures, inplace=True)
X = X.to_numpy()
y = y.to_numpy()

# generate training technique
technique = c3.GaussianProcessRegressionTechnique.get(this.technique.id)
serializedKernel = c3.SklearnGPRKernel.get(technique.kernel.id)

if (technique.centerTarget):
targetMean = float(y.mean())
y = y - y.mean()

# get kernel object from c3, make it python again
kernel = c3.PythonSerialization.deserialize(serialized=serializedKernel.pickledKernel)

# build and train GPR
gp = GaussianProcessRegressor(kernel=kernel)
gp.fit(X, y)

if (technique.centerTarget):
params = {}
params["targetMean"] = targetMean
this.trainedModel = c3.MLTrainedModelArtifact(
model=c3.PythonSerialization.serialize(obj=gp),
parameters=params
)
else:
this.trainedModel = c3.MLTrainedModelArtifact(
model=c3.PythonSerialization.serialize(obj=gp),
)

this.upsert()

return 0
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,16 @@ entity type GaussianProcessRegressionTechnique mixes MLTechnique schema name 'GP
// the kernel object
@ML(hyperParameter=true)
kernel: SklearnGPRKernel
// center target data before fitting
@ML(hyperParameter=true)
centerTarget: boolean=false
// leave fraction of rows for post-validation
@ML(hyperParameter=true)
validation: boolean=false
// random seed to be used by numpy.shuffle
@ML(hyperParameter=true)
randomSeed: integer=42
// fraction to be left for validation
@ML(hyperParameter=true)
splitFraction: float=0.2
}
3 changes: 2 additions & 1 deletion training/gordon-group/src/Utils/AOD/AODGPRModelFinder.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ function getPipe(excFeats, gstpId, targetName, technique) {
}).objs.map(obj => obj.id);

// find the techniques
filter = Filter.intersects("kernel.id", kernelIds);
filter = Filter.intersects("kernel.id", kernelIds)
.and().eq("centerTarget", technique.centerTarget);
var techIds = GaussianProcessRegressionTechnique.fetch({
"filter": filter.value,
"limit": -1,
Expand Down
15 changes: 15 additions & 0 deletions training/gordon-group/src/Utils/DataStaging/StagedFeatures.c3typ
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/**
* Copyright (c) 2022, C3 AI DTI, Development Operations Team
* All rights reserved. License: https://github.com/c3aidti/.github
**/
/**
* This type hosts data for models that cannot obtain Features
* directly from other entity types.
*/
entity type StagedFeatures schema name "STGD_FTRS" {
// the features to be staged
features: map<string, double> schema suffix "FTR"
// method to stage from gstp list
@py(env='gordon_1_0_0')
stageFromAODGPRModelIdsList: function(ids: !any): integer
}
46 changes: 46 additions & 0 deletions training/gordon-group/src/Utils/DataStaging/StagedFeatures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
def stageFromAODGPRModelIdsList(ids):
"""
Given a list of GaussianProcessRegressionPipes trained with
AOD data, stage the features for each model.

Input:
ids: list of model ids

Return:
int: zero if it worked, 1 if it failed
"""
import pandas as pd
from datetime import timedelta

c3.StagedFeatures.removeAll()

df = pd.DataFrame()
for model_id in ids:
model = c3.GaussianProcessRegressionPipe.get(model_id)
data_source_spec = c3.GPRDataSourceSpec.get(model.dataSourceSpec.id, "targetSpec")
gstp_id = data_source_spec.targetSpec.filter.split(" == ")[1].replace('"', '')
gstp = c3.GeoSurfaceTimePoint.get(gstp_id)
pdf = c3.Dataset.toPandas(model.getFeatures())
pdf["latitude"] = gstp.latitude
pdf["longitude"] = gstp.longitude
my_time = gstp.time.timetuple()
pdf["time"] = timedelta(
days=my_time.tm_yday,
minutes=my_time.tm_min,
hours=my_time.tm_hour
).total_seconds() / 3600
df = pd.concat([df,pdf], ignore_index=True)

def row_to_dict(row):
d = {}
for col in row.index:
d[col] = row[col]
return d

df_final = pd.DataFrame()
df_final["features"] = df.apply(row_to_dict, axis=1)
df_final["id"] = df_final.index
output_records = df_final.to_dict(orient="records")
c3.StagedFeatures.upsertBatch(objs=output_records)

return 0
15 changes: 15 additions & 0 deletions training/gordon-group/src/Utils/DataStaging/StagedTargets.c3typ
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/**
* Copyright (c) 2022, C3 AI DTI, Development Operations Team
* All rights reserved. License: https://github.com/c3aidti/.github
**/
/**
* This type hosts data for models that cannot obtain Targets
* directly from other entity types.
*/
entity type StagedTargets schema name "STGD_TRGTS" {
// the staged targets
targets: map<string, double> schema suffix "TRGT"
// method to stage from gstp list
@py(env='gordon_1_0_0')
stageFromAODGPRModelIdsList: function(ids: !any): integer
}
Loading