Submitting a job in Java using WS GRAM

The following is a general scenario for submitting a job using the Java stubs and APIs. Please consult the Java WS Core API, Delegation API, Reliable File Transfer API, and WS-GRAM API documentation for details on classes referenced in the code excerpts.

Also, it will probably be helpful to look at the GramJob class source code as a functioning example.

1. Class imports

The following imports will be needed for these examples:

    import java.io.File;
    import java.io.FileInputStream;
    import java.net.URL;
    import java.util.LinkedList;
    import java.util.List;
    import java.util.Vector;
    import java.security.cert.X509Certificate;
    import javax.xml.rpc.Stub;
    import javax.xml.soap.SOAPElement;
    import org.apache.axis.components.uuid.UUIDGenFactory;
    import org.apache.axis.message.addressing.AttributedURI;
    import org.apache.axis.message.addressing.EndpointReferenceType;
    import org.globus.delegation.DelegationUtil;
    import org.globus.exec.generated.CreateManagedJobInputType;
    import org.globus.exec.generated.CreateManagedJobOutputType;
    import org.globus.exec.generated.JobDescriptionType;
    import org.globus.exec.generated.ManagedJobFactoryPortType;
    import org.globus.exec.generated.ManagedJobPortType;
    import org.globus.exec.generated.ReleaseInputType;
    import org.globus.exec.utils.ManagedJobConstants;
    import org.globus.exec.utils.ManagedJobFactoryConstants;
    import org.globus.exec.utils.client.ManagedJobClientHelper;
    import org.globus.exec.utils.client.ManagedJobFactoryClientHelper;
    import org.globus.exec.utils.rsl.RSLHelper;
    import org.globus.wsrf.NotificationConsumerManager;
    import org.globus.wsrf.WSNConstants;
    import org.globus.wsrf.encoding.ObjectDeserializer;
    import org.globus.wsrf.impl.security.authentication.Constants;
    import org.globus.wsrf.impl.security.authorization.Authorization;
    import org.globus.wsrf.impl.security.authorization.HostAuthorization;
    import org.globus.wsrf.impl.security.authorization.IdentityAuthorization;
    import org.globus.wsrf.impl.security.authorization.SelfAuthorization;
    import org.globus.wsrf.impl.security.descriptor.ClientSecurityDescriptor;
    import org.globus.wsrf.impl.security.descriptor.GSISecureMsgAuthMethod;
    import org.globus.wsrf.impl.security.descriptor.GSITransportAuthMethod;
    import org.globus.wsrf.impl.security.descriptor.ResourceSecurityDescriptor;
    import org.gridforum.jgss.ExtendedGSSManager;
    import org.ietf.jgss.GSSCredential;
    import org.oasis.wsn.Subscribe;
    import org.oasis.wsn.SubscribeResponse;
    import org.oasis.wsn.SubscriptionManager;
    import org.oasis.wsn.TopicExpressionType;
    import org.oasis.wsn.WSBaseNotificationServiceAddressingLocator;
    import org.oasis.wsrf.lifetime.Destroy;
    import org.oasis.wsrf.properties.GetMultipleResourceProperties_Element;
    import org.oasis.wsrf.properties.GetMultipleResourcePropertiesResponse;
    import org.oasis.wsrf.properties.GetResourcePropertyResponse;

2. Loading the job description

File jobDescriptionFile = new File("myjobdesc.xml");
JobDescriptionType jobDescription = RSLHelper.readRSL(jobDescriptionFile);

The object jobDescription will be of sub-type MultiJobDescriptionType if the file contents is a multi-job description.

3. Creating the factory service stub

URL factoryUrl = ManagedJobFactoryClientHelper.getServiceURL(
    contactString).getURL();
String factoryType
    = ManagedJobFactoryConstants.FACTORY_TYPE.<factory type constant>;
EndpointReferenceType factoryEndpoint
    = ManagedJobFactoryClientHelper.getFactoryEndpoint(factoryUrl, factoryType);
ManagedJobFactoryPortType factoryPort
    = ManagedJobFactoryClientHelper.getPort(factoryEndpoint);

The format of contactString is [protocol://]host[:port][/servicepath].

4. Loading a proxy from a file

  • Default proxy file:

    ExtendedGSSManager manager =
        (ExtendedGSSManager)ExtendedGSSManager.getInstance();
            
    GSSCredential cred = manager.createCredential(
        GSSCredential.INITIATE_AND_ACCEPT);
    

  • Specific proxy file:

    File proxyFile = new File("proxy_file");
    byte[] proxyData = new byte[(int)proxyFile.length)];
    FileInputStream inputStream = new FileInputStream(proxyFile);
    inputStream.read(proxyData);
    inputStream.close();
            
    ExtendedGSSManager manager =
        (ExtendedGSSManager)ExtendedGSSManager.getInstance();
            
    GSSCredential proxy = manager.createCredential(
        proxyData,
        ExtendedGSSCredential.IMPEXP_OPAQUE,
        GSSCredential.DEFAULT_LIFETIME,
        null,
        GSSCredential.ACCEPT_ONLY);
    

5. Setting stub security parameters

ClientSecurityDescriptor secDesc = new ClientSecurityDescriptor();
secDesc.setGSITransport(Constants.<protection level constant>);
secDesc.setAuthz(<Authorization sub-class instance>);
if (proxy != null) {
    secDesc.setGSSCredential(proxy);
}
((Stub) port)._setProperty(Constants.CLIENT_DESCRIPTOR, secDesc);

Use setGSISecureMsg() for GSI Secure Message.

6. Querying for factory resource properties

6.1. One at a time

GetResourcePropertyResponse response
    = factoryport.getResourceProperty(ManagedJobConstants.<RP constant>);

SOAPElement[] any = response.get_any();

... = ObjectDeserializer.toObject(any[0], <RP type>.class);

6.2. Many at a time

GetMultipleResourceProperties_Element rpRequest
    = new GetMultipleResourceProperties_Element();
rpRequest.setResourceProperty(new QName[] {
    ManagedJobFactoryConstants.<RP constant #1>,
    ManagedJobFactoryConstants.<RP constant #2>,
    ManagedJobFactoryConstants.<RP constant #N>
});
GetMultipleResourcePropertiesResponse response
    = factoryPort.getMultipleResourceProperties(rpRequest);

SOAPElement[] any = response.get_any();

... = ObjectDeserializer.toObject(any[0], <RP #1 type>.class);
... = ObjectDeserializer.toObject(any[0], <RP #2 type>.class);
... = ObjectDeserializer.toObject(any[0], <RP #N type>.class);

7. Delegating credentials (if needed)

X509Certificate certToSign = DelegationUtil.getCertificateChainRP(
    delegFactoryEndpoint,   //EndpointReferenceType
    secDesc,                //ClientSecurityDescriptor
)[0];   //first element in the returned array
EndpointReferenceType credentialEndpoint = DelegationUtil.delegate(
    delegFactoryurl,        //String
    credential,             //GlobusCredential
    certToSign,             //X509Certificate
    lifetime,               //int (seconds)
    fullDelegation,         //boolean
    secDesc);               //ClientSecurityDescriptor

There are three types of delegated credentials:

  1. Credential used by the job to generate user-owned proxy:

    jobDescription.setJobCredential(credentialEndpoint);
  2. Credential used to contact RFT for staging and file clean up:

    jobDescription.setStagingCredentialEndpoint(credentialEndpoint);
  3. Credential used by RFT to contact GridFTP servers:

    TransferRequestType stageOut = jobDescription.getFileStageOut();
             stageOut.setTransferCredential(credentialEndpoint);

    Do the same for fileStageIn and fileCleanUp.

8. Creating the job resource

CreateManagedJobInputType jobInput = new CreateManagedJobInputType();
jobInput.setJobID(new AttributeURI("uuid: " +  UUIDGenFactory.getUUIDGen().nextUUID()));
jobInput.setInitialTerminationTime(<Calendar instance>);
if (multiJob) jobInput.setMultiJob(jobDescription) else jobInput.setJob(jobDescription);
if (subscribeOnCreate) jobInput.setSubscribe(subscriptionReq);
CreateManagedJobOutputType createResponse
    = factoryPort.createManagedJob(jobInput);
EndpointReferenceType jobEndpoint = createResponse.getManagedJobEndpoint();

9. Creating the job service stub

ManagedJobPortType jobPort = ManagedJobClientHelper.getPort(jobEndpoint);

You must set the appropriate security parameters for the job service stub (jobPort) as well.

10. Subscribing for job state notifications

NotificationConsumerManager notifConsumerManager
    = NotificationConsumerManager.getInstance();

notifConsumerManager.startListening();
List topicPath = new LinkedList();
topicPath.add(ManagedJobConstants.RP_STATE);

ResourceSecurityDescriptor resourceSecDesc = new ResourceSecurityDescriptor();
resourceSecDesc.setAuthz(Authorization.<authz type constant>);

Vector authMethods = new Vector();
authMethods.add(GSITransportAuthMethod.BOTH);
resourceSecDesc.setAuthMethods(authMethods);

EndpointReferenceType notificationConsumerEndpoint
    = notifConsumerManager.createNotificationConsumer(
        topicPath,
        this,
        resourceSecDesc);


Subscribe subscriptionReq = new Subscribe();
subscriptionReq.setConsumerReference(
    notificationConsumerEndpoint);

TopicExpressionType topicExpression = new TopicExpressionType(
    WSNConstants.SIMPLE_TOPIC_DIALECT,
    ManagedJobConstants.RP_STATE);
subscriptionReq.setTopicExpression(topicExpression);

EndpointReferenceType subscriptionEndpoint;

  • Subscribe on creation

    jobInput.setSubscribe(subscriptionReq);
  • Subscribe after creation

    SubscribeResponse subscribeResponse
            = jobPort.subscribe(subscriptionRequest);
         subscriptionEndpoint = subscribeResponse.getSubscriptionReference();
    

11. Releasing any state holds (if necessary)

jobPort.release(new ReleaseInputType());

12. Destroying resources

/*destroy subscription resource*/
SubscriptionManager subscriptionManagerPort
    = new WSBaseNotificationServiceAddressingLocator()
    .getSubscriptionManagerPort(subscriptionEndpoint);

//set stub security parameters on subscriptionManagerPort

subscriptionManagerPort.destroy(new Destroy());


/*destroy the job resource*/
jobPort.destroy(new Destroy());