V1_0.java [src/java/m/ann/collect] Revision: default  Date:
/*
 * To change this template, choose Tools | Templates
 * and open the template in the editor.
 */
package m.ann.collect;

import csip.Config;
import utils.MongoAccess;
import csip.ModelDataService;
import static csip.ModelDataService.KEY_DESC;
import csip.ModelDataServiceCall;
import csip.ServiceException;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ws.rs.Path;
import oms3.annotations.Description;
import oms3.annotations.Name;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import utils.MongoUtils;
import utils.blockchain.application.java.FeNS_blockchain;

/**
 * Raw Data collection service.
 *
 * @author od, sidereus
 */
@Name("data collection service")
@Description("collect raw data")
@Path("m/collect/1.0")
public class V1_0 extends ModelDataService {

  // collect data fro files in chunks of 1000 per default
  static final int blockSize = Config.getInt("ann.collect.elements_per_block", 10000);

  protected String annName;
  protected java.util.Set paramNames;
  protected MongoUtils.ServiceFunction<String, String> getDescr;

  protected Boolean blockchain;


  // make sure the db connection is closed at the end.
  public static void onContextDestroy() {
    MongoAccess.closeMongo();
    System.out.println("Closed ANN Mongo.");
  }


  @Override
  public void preProcess() throws ServiceException, JSONException {
    if (parameter().getNames().contains("collect")) {
      annName = parameter().getJSONArray("collect").getJSONObject(0).getString("value");
      JSONArray coll = parameter().getJSONArray("collect");
      paramNames = new LinkedHashSet();
      Map<String, JSONObject> t = new LinkedHashMap();
      for (int i = 0; i < coll.length(); i++) {
        JSONObject obj = coll.getJSONObject(i);
        String name = obj.getString("name");
        if (name.equals("blockchain")) {
          blockchain = obj.getBoolean("value");
        } else {
          paramNames.add(name);
          t.put(name, obj);
        }
      }
      LocalPayloadParameter p = new LocalPayloadParameter(t);
      getDescr = p::getDescr;
    } else {
      annName = parameter().getString("annName");
      blockchain = parameter().getBoolean("blockchain", false);
      paramNames = (java.util.Set) parameter().getNames();
      getDescr = parameter()::getDescr;
    }
  }


  /**
   * Add a csv or just a row of data to a newly created or an already existing
   * database
   *
   * @throws ServiceException
   */
  @Override
  public void doProcess() throws ServiceException {

    if (attachments().getFilesCount() > 0) { // add a csv to the database
      // use the first file.
      attachments().getFiles().parallelStream()
          .filter(file -> file.getName().endsWith(".csv"))
          .forEach(file -> {
            collectFromFile(file);
          });
    } else { // add a row to the database
      MongoAccess.collectRawData(annName, (java.util.Set) parameter().getNames(), parameter()::getDouble, parameter()::getDescr);
    }
    results().put("status", "ok");

    if (blockchain) {
      FeNS_blockchain.init();
      FeNS_blockchain.submit("FeNS_collect", MongoAccess.getCollectionHash(annName, "raw"));
    }
  }


  @Override
  public void postProcess() throws ServiceException, Exception {
    if (parameter().getNames().contains("normalize")) {
      ModelDataServiceCall mdsc = new ModelDataServiceCall()
          .put("normalize", getCollection("normalize"))
          .put("train", getCollection("train"))
          .put("select", getCollection("select"))
          .url(Config.getString("ann.normalize.url",
              request().getCodebase() + "m/normalize/1.0"))
          .call();

      if (!mdsc.serviceFinished()) {
        throw new ServiceException("FeNS-normalize service error: " + mdsc.getError());
      }
    }

  }


  public void collectFromFile(File file) {
    try {
      MongoAccess.collectFromFile(
          annName,
          paramNames,
          getDescr,
          file,
          blockSize);
    } catch (IOException | ServiceException ex) {
      throw new RuntimeException(ex.getMessage());
    }
  }


  private Collection getCollection(String subPayload) throws ServiceException, JSONException {
    JSONArray n = parameter().getJSONArray(subPayload);
    List<JSONObject> nl = new ArrayList<>();
    for (int i = 0; i < n.length(); i++) {
      nl.add(n.getJSONObject(i));
    }
    return nl;
  }

  class LocalPayloadParameter {

    private final Map<String, JSONObject> paramMap;


    public LocalPayloadParameter(Map<String, JSONObject> paramMap) {
      this.paramMap = paramMap;
    }


    private Map<String, JSONObject> getParamMap() {
      return paramMap;
    }


    public String getDescr(String name) throws ServiceException {
      try {
        return get(name).getString(KEY_DESC);
      } catch (JSONException ex) {
        throw new ServiceException("No description for " + name);
      }
    }


    private JSONObject get(String name) throws ServiceException {
      JSONObject p = getParamMap().get(name);
      if (p == null) {
        throw new ServiceException("Parameter not found: '" + name + "'");
      }
      return p;
    }

  }

}