V1_0.java [src/java/m/ann/collect] Revision: default Date:
/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/
package m.ann.collect;
import csip.Config;
import utils.MongoAccess;
import csip.ModelDataService;
import static csip.ModelDataService.KEY_DESC;
import csip.ModelDataServiceCall;
import csip.ServiceException;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ws.rs.Path;
import oms3.annotations.Description;
import oms3.annotations.Name;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import utils.MongoUtils;
import utils.blockchain.application.java.FeNS_blockchain;
/**
* Raw Data collection service.
*
* @author od, sidereus
*/
@Name("data collection service")
@Description("collect raw data")
@Path("m/collect/1.0")
public class V1_0 extends ModelDataService {
// collect data fro files in chunks of 1000 per default
static final int blockSize = Config.getInt("ann.collect.elements_per_block", 10000);
protected String annName;
protected java.util.Set paramNames;
protected MongoUtils.ServiceFunction<String, String> getDescr;
protected Boolean blockchain;
// make sure the db connection is closed at the end.
public static void onContextDestroy() {
MongoAccess.closeMongo();
System.out.println("Closed ANN Mongo.");
}
@Override
public void preProcess() throws ServiceException, JSONException {
if (parameter().getNames().contains("collect")) {
annName = parameter().getJSONArray("collect").getJSONObject(0).getString("value");
JSONArray coll = parameter().getJSONArray("collect");
paramNames = new LinkedHashSet();
Map<String, JSONObject> t = new LinkedHashMap();
for (int i = 0; i < coll.length(); i++) {
JSONObject obj = coll.getJSONObject(i);
String name = obj.getString("name");
if (name.equals("blockchain")) {
blockchain = obj.getBoolean("value");
} else {
paramNames.add(name);
t.put(name, obj);
}
}
LocalPayloadParameter p = new LocalPayloadParameter(t);
getDescr = p::getDescr;
} else {
annName = parameter().getString("annName");
blockchain = parameter().getBoolean("blockchain", false);
paramNames = (java.util.Set) parameter().getNames();
getDescr = parameter()::getDescr;
}
}
/**
* Add a csv or just a row of data to a newly created or an already existing
* database
*
* @throws ServiceException
*/
@Override
public void doProcess() throws ServiceException {
if (attachments().getFilesCount() > 0) { // add a csv to the database
// use the first file.
attachments().getFiles().parallelStream()
.filter(file -> file.getName().endsWith(".csv"))
.forEach(file -> {
collectFromFile(file);
});
} else { // add a row to the database
MongoAccess.collectRawData(annName, (java.util.Set) parameter().getNames(), parameter()::getDouble, parameter()::getDescr);
}
results().put("status", "ok");
if (blockchain) {
FeNS_blockchain.init();
FeNS_blockchain.submit("FeNS_collect", MongoAccess.getCollectionHash(annName, "raw"));
}
}
@Override
public void postProcess() throws ServiceException, Exception {
if (parameter().getNames().contains("normalize")) {
ModelDataServiceCall mdsc = new ModelDataServiceCall()
.put("normalize", getCollection("normalize"))
.put("train", getCollection("train"))
.put("select", getCollection("select"))
.url(Config.getString("ann.normalize.url",
request().getCodebase() + "m/normalize/1.0"))
.call();
if (!mdsc.serviceFinished()) {
throw new ServiceException("FeNS-normalize service error: " + mdsc.getError());
}
}
}
public void collectFromFile(File file) {
try {
MongoAccess.collectFromFile(
annName,
paramNames,
getDescr,
file,
blockSize);
} catch (IOException | ServiceException ex) {
throw new RuntimeException(ex.getMessage());
}
}
private Collection getCollection(String subPayload) throws ServiceException, JSONException {
JSONArray n = parameter().getJSONArray(subPayload);
List<JSONObject> nl = new ArrayList<>();
for (int i = 0; i < n.length(); i++) {
nl.add(n.getJSONObject(i));
}
return nl;
}
class LocalPayloadParameter {
private final Map<String, JSONObject> paramMap;
public LocalPayloadParameter(Map<String, JSONObject> paramMap) {
this.paramMap = paramMap;
}
private Map<String, JSONObject> getParamMap() {
return paramMap;
}
public String getDescr(String name) throws ServiceException {
try {
return get(name).getString(KEY_DESC);
} catch (JSONException ex) {
throw new ServiceException("No description for " + name);
}
}
private JSONObject get(String name) throws ServiceException {
JSONObject p = getParamMap().get(name);
if (p == null) {
throw new ServiceException("Parameter not found: '" + name + "'");
}
return p;
}
}
}