ExecutionResource.java

/*
 * Copyright (c) 2007-2017 MetaSolutions AB
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.entrystore.rest.resources;

import org.entrystore.AuthorizationException;
import org.entrystore.Data;
import org.entrystore.Entry;
import org.entrystore.EntryType;
import org.entrystore.GraphType;
import org.entrystore.PrincipalManager.AccessProperty;
import org.entrystore.ResourceType;
import org.entrystore.transforms.Pipeline;
import org.entrystore.transforms.TransformException;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.restlet.data.MediaType;
import org.restlet.data.Status;
import org.restlet.ext.json.JsonRepresentation;
import org.restlet.representation.Representation;
import org.restlet.resource.Post;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;


/**
 * This resource executes pipelines etc.
 * 
 * @author Hannes Ebner
 */
public class ExecutionResource extends BaseResource {

	static Logger log = LoggerFactory.getLogger(ExecutionResource.class);

	List<MediaType> supportedMediaTypes = new ArrayList<MediaType>();

	@Override
	public void doInit() {
		supportedMediaTypes.add(MediaType.APPLICATION_JSON);
	}

	// TODO add support for GET to query currently running pipelines; this should
	// be supported together with asynchronous processing of pipeline executions
		
	@Post
	public void acceptRepresentation(Representation r) {
		if (context == null) {
			getResponse().setStatus(Status.SERVER_ERROR_INTERNAL);
			return;
		}

		MediaType preferredMediaType = getRequest().getClientInfo().getPreferredMediaType(supportedMediaTypes);
		if (!MediaType.APPLICATION_JSON.equals(preferredMediaType)) {
			getResponse().setStatus(Status.CLIENT_ERROR_UNSUPPORTED_MEDIA_TYPE);
			return;
		}

		JSONObject request = null;
		try {
			request = new JSONObject(getRequest().getEntity().getText());
		} catch (Exception e) {
			getResponse().setStatus(Status.CLIENT_ERROR_UNPROCESSABLE_ENTITY);
			log.error(e.getMessage());
			return;
		}

		String pipeline;
		String source = null;
//		String destination;
//		boolean async = false;

		try {
			pipeline = request.getString("pipeline"); // Pipeline Entry URI
			if (request.has("source")) {
				source = request.getString("source"); // Data source Entry URI
			}
//			destination = request.getString("destination"); // Destination Entry URI
//			async = "async".equalsIgnoreCase(request.getString("async")); // sync is default
		} catch (JSONException e) {
			getResponse().setStatus(Status.CLIENT_ERROR_UNPROCESSABLE_ENTITY);
			return;
		}

		// Parameter pipeline is required
		if (pipeline == null) {
			getResponse().setStatus(Status.CLIENT_ERROR_BAD_REQUEST);
			return;
		}

		String contextResUri = context.getEntry().getResourceURI().toString();

		// Pipeline have to be located in the same context
		if (!pipeline.startsWith(contextResUri)) {
			getResponse().setStatus(Status.CLIENT_ERROR_BAD_REQUEST);
			return;
		}

		// Source (if provided= have to be located in the same context
		if ( source != null && !source.startsWith(contextResUri)) {
			getResponse().setStatus(Status.CLIENT_ERROR_BAD_REQUEST);
			return;
		}

		try {
			// Pipeline execution requires write-rights on context's resource
			getPM().checkAuthenticatedUserAuthorized(context.getEntry(), AccessProperty.WriteResource);

			// Load pipeline entry from context
			Entry pipelineEntry = context.getByEntryURI(URI.create(pipeline));
			if (pipelineEntry == null) {
				getResponse().setStatus(Status.SERVER_ERROR_INTERNAL);
				return;
			}
			// Load source entry from context
			Entry sourceEntry = null;
			URI listURI = null;
			String sourceMimeType = null;
			Data data = null;
			if (source != null) {
				sourceEntry = context.getByEntryURI(URI.create(source));
				if (sourceEntry == null) {
					getResponse().setStatus(Status.SERVER_ERROR_INTERNAL);
					return;
				}
				Set<URI> lists = sourceEntry.getReferringListsInSameContext();
				if (lists.size() == 1) {
					listURI = lists.iterator().next();
				}
				sourceMimeType = sourceEntry.getMimetype();
				data = (Data) sourceEntry.getResource();
				if (!EntryType.Local.equals(sourceEntry.getEntryType()) ||
						!ResourceType.InformationResource.equals(sourceEntry.getResourceType()) ||
						sourceMimeType == null ||
						data == null) {
					getResponse().setStatus(Status.CLIENT_ERROR_CONFLICT);
					return;
				}
			}

			// TODO add support for non-local resources

			if (!GraphType.Pipeline.equals(pipelineEntry.getGraphType())) {
				getResponse().setStatus(Status.CLIENT_ERROR_CONFLICT);
				return;
			}

			Set<Entry> processedEntries = null;
			try {
				processedEntries = new Pipeline(pipelineEntry).run(sourceEntry, listURI);
			} catch (IllegalStateException iae) {
				log.error(iae.getMessage());
				getResponse().setStatus(Status.CLIENT_ERROR_BAD_REQUEST);
				return;
			} catch (TransformException te) {
				log.error(te.getMessage());
				getResponse().setStatus(Status.SERVER_ERROR_INTERNAL);
				return;
			}

			if (processedEntries != null && processedEntries.size() > 0) {
				JSONObject result = new JSONObject();
				JSONArray processedEntriesArr = new JSONArray();
				for (Entry e : processedEntries) {
					processedEntriesArr.put(e.getEntryURI().toString());
				}
				try {
					result.put("result", processedEntriesArr);
				} catch (JSONException e) {
					log.error(e.getMessage());
				}

				getResponse().setEntity(new JsonRepresentation(result));
				getResponse().setStatus(Status.SUCCESS_CREATED);
			}

			// TODO support execution status for async executions; perhaps the thread executioner can be
			// shared between listeners and pipelines? this would allow to set a reasonable maximum of
			// concurrent threads per EntryStore instance

		} catch(AuthorizationException e) {
			log.debug("Unauthorized POST");
			unauthorizedPOST();
		}
	}

}