ExecutionResource.java
- /*
- * Copyright (c) 2007-2017 MetaSolutions AB
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.entrystore.rest.resources;
- import org.entrystore.AuthorizationException;
- import org.entrystore.Data;
- import org.entrystore.Entry;
- import org.entrystore.EntryType;
- import org.entrystore.GraphType;
- import org.entrystore.PrincipalManager.AccessProperty;
- import org.entrystore.ResourceType;
- import org.entrystore.transforms.Pipeline;
- import org.entrystore.transforms.TransformException;
- import org.json.JSONArray;
- import org.json.JSONException;
- import org.json.JSONObject;
- import org.restlet.data.MediaType;
- import org.restlet.data.Status;
- import org.restlet.ext.json.JsonRepresentation;
- import org.restlet.representation.Representation;
- import org.restlet.resource.Post;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.net.URI;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Set;
- /**
- * This resource executes pipelines etc.
- *
- * @author Hannes Ebner
- */
- public class ExecutionResource extends BaseResource {
- static Logger log = LoggerFactory.getLogger(ExecutionResource.class);
- List<MediaType> supportedMediaTypes = new ArrayList<MediaType>();
- @Override
- public void doInit() {
- supportedMediaTypes.add(MediaType.APPLICATION_JSON);
- }
- // TODO add support for GET to query currently running pipelines; this should
- // be supported together with asynchronous processing of pipeline executions
-
- @Post
- public void acceptRepresentation(Representation r) {
- if (context == null) {
- getResponse().setStatus(Status.SERVER_ERROR_INTERNAL);
- return;
- }
- MediaType preferredMediaType = getRequest().getClientInfo().getPreferredMediaType(supportedMediaTypes);
- if (!MediaType.APPLICATION_JSON.equals(preferredMediaType)) {
- getResponse().setStatus(Status.CLIENT_ERROR_UNSUPPORTED_MEDIA_TYPE);
- return;
- }
- JSONObject request = null;
- try {
- request = new JSONObject(getRequest().getEntity().getText());
- } catch (Exception e) {
- getResponse().setStatus(Status.CLIENT_ERROR_UNPROCESSABLE_ENTITY);
- log.error(e.getMessage());
- return;
- }
- String pipeline;
- String source = null;
- // String destination;
- // boolean async = false;
- try {
- pipeline = request.getString("pipeline"); // Pipeline Entry URI
- if (request.has("source")) {
- source = request.getString("source"); // Data source Entry URI
- }
- // destination = request.getString("destination"); // Destination Entry URI
- // async = "async".equalsIgnoreCase(request.getString("async")); // sync is default
- } catch (JSONException e) {
- getResponse().setStatus(Status.CLIENT_ERROR_UNPROCESSABLE_ENTITY);
- return;
- }
- // Parameter pipeline is required
- if (pipeline == null) {
- getResponse().setStatus(Status.CLIENT_ERROR_BAD_REQUEST);
- return;
- }
- String contextResUri = context.getEntry().getResourceURI().toString();
- // Pipeline have to be located in the same context
- if (!pipeline.startsWith(contextResUri)) {
- getResponse().setStatus(Status.CLIENT_ERROR_BAD_REQUEST);
- return;
- }
- // Source (if provided= have to be located in the same context
- if ( source != null && !source.startsWith(contextResUri)) {
- getResponse().setStatus(Status.CLIENT_ERROR_BAD_REQUEST);
- return;
- }
- try {
- // Pipeline execution requires write-rights on context's resource
- getPM().checkAuthenticatedUserAuthorized(context.getEntry(), AccessProperty.WriteResource);
- // Load pipeline entry from context
- Entry pipelineEntry = context.getByEntryURI(URI.create(pipeline));
- if (pipelineEntry == null) {
- getResponse().setStatus(Status.SERVER_ERROR_INTERNAL);
- return;
- }
- // Load source entry from context
- Entry sourceEntry = null;
- URI listURI = null;
- String sourceMimeType = null;
- Data data = null;
- if (source != null) {
- sourceEntry = context.getByEntryURI(URI.create(source));
- if (sourceEntry == null) {
- getResponse().setStatus(Status.SERVER_ERROR_INTERNAL);
- return;
- }
- Set<URI> lists = sourceEntry.getReferringListsInSameContext();
- if (lists.size() == 1) {
- listURI = lists.iterator().next();
- }
- sourceMimeType = sourceEntry.getMimetype();
- data = (Data) sourceEntry.getResource();
- if (!EntryType.Local.equals(sourceEntry.getEntryType()) ||
- !ResourceType.InformationResource.equals(sourceEntry.getResourceType()) ||
- sourceMimeType == null ||
- data == null) {
- getResponse().setStatus(Status.CLIENT_ERROR_CONFLICT);
- return;
- }
- }
- // TODO add support for non-local resources
- if (!GraphType.Pipeline.equals(pipelineEntry.getGraphType())) {
- getResponse().setStatus(Status.CLIENT_ERROR_CONFLICT);
- return;
- }
- Set<Entry> processedEntries = null;
- try {
- processedEntries = new Pipeline(pipelineEntry).run(sourceEntry, listURI);
- } catch (IllegalStateException iae) {
- log.error(iae.getMessage());
- getResponse().setStatus(Status.CLIENT_ERROR_BAD_REQUEST);
- return;
- } catch (TransformException te) {
- log.error(te.getMessage());
- getResponse().setStatus(Status.SERVER_ERROR_INTERNAL);
- return;
- }
- if (processedEntries != null && processedEntries.size() > 0) {
- JSONObject result = new JSONObject();
- JSONArray processedEntriesArr = new JSONArray();
- for (Entry e : processedEntries) {
- processedEntriesArr.put(e.getEntryURI().toString());
- }
- try {
- result.put("result", processedEntriesArr);
- } catch (JSONException e) {
- log.error(e.getMessage());
- }
- getResponse().setEntity(new JsonRepresentation(result));
- getResponse().setStatus(Status.SUCCESS_CREATED);
- }
- // TODO support execution status for async executions; perhaps the thread executioner can be
- // shared between listeners and pipelines? this would allow to set a reasonable maximum of
- // concurrent threads per EntryStore instance
- } catch(AuthorizationException e) {
- log.debug("Unauthorized POST");
- unauthorizedPOST();
- }
- }
- }