Skip to content

jxio can not handle asynchronized ClientSession.sendRequest()  #24

Description

@lynus

While EventQueueHandler thread is running, asynchronized ClientSession.sendRequest() in another thread cannot be handled correctly, if this method is issued too fast.
I modified the 'JXIO/examples/org/accelio/jxio/SimpleClient.java' to reproduce this issue:

import java.net.URI;
import java.net.URISyntaxException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.io.IOException;
import java.io.UnsupportedEncodingException;

import org.accelio.jxio.Msg;
import org.accelio.jxio.MsgPool;
import org.accelio.jxio.EventName;
import org.accelio.jxio.EventReason;
import org.accelio.jxio.ClientSession;
import org.accelio.jxio.EventQueueHandler;

public class SimpleClient {

    private final static Log LOG = LogFactory.getLog(SimpleClient.class.getCanonicalName());
    private final MsgPool mp;
    private final EventQueueHandler eqh;
    private ClientSession client;
    public int exitStatus = 1;

    public static long numberofReqs = 0;
    public static long numberofRsps = 0;
    public static long maxNumberofReqs = 1000;
    public static long PRINT_COUNTER = 10;
    class EventLoop extends Thread {        
        EventLoop(){}
        public void run() {
        eqh.run();
        }
    }
    public static void main(String[] args) {        
        final String serverhostname = "172.18.0.14";
        final int port = 9900;

        URI uri = null;
        try {
            uri = new URI("rdma://" + serverhostname + ":" + port + "/");
        } catch (URISyntaxException e) {
            e.printStackTrace();
            return;
        }

        SimpleClient client = new SimpleClient();
        // Connect
        client.connect(uri);
        // Wait for a single connection response
        client.run();       
        // Send
        for (numberofReqs = 1; numberofReqs <= maxNumberofReqs; ++numberofReqs){
            LOG.trace("sending req " + numberofReqs);
            client.send();
            /* XXX if uncomment sleep the program would pop up random error message */
            Thread.sleep(10);      
        }
        // Finish
        LOG.info("Closing the session...");
        client.close();
        LOG.info("Client is releasing JXIO resources and exiting");
        client.releaseResources();
        System.exit(client.exitStatus);
    }

    SimpleClient() {
        this.eqh = new EventQueueHandler(null);
        this.mp = new MsgPool(1024, 100, 100);
    }

    public void connect(URI uri) {
        LOG.info("Try to establish a new session to '" + uri + "'");
        this.client = new ClientSession(eqh, uri, new MyClientCallbacks(this));
    }

    public void send(){
        Msg msg = this.mp.getMsg();
        try {
            msg.getOut().put("Simple request".getBytes("UTF-8"));
        } catch (UnsupportedEncodingException e) {
            // Just suppress the exception handling in this demo code
        }
        try {
            client.sendRequest(msg);
        } catch (IOException e) {
            //all exceptions thrown extend IOException
            LOG.error(e.toString());
        }
    }

    public void run() {
        EventLoop loop = new EventLoop();
        loop.start();
    }

    public void close(){
        client.close(); 
    }

    public void releaseResources() {
        mp.deleteMsgPool();
        eqh.close();
    }

    class MyClientCallbacks implements ClientSession.Callbacks {
        private final SimpleClient client;

        MyClientCallbacks(SimpleClient client) {
            this.client = client;
        }

        public void onSessionEstablished() {
            LOG.info("[SUCCESS] Session established! Hurray !");
        }

        public void onResponse(Msg msg) {
            ++numberofRsps;

            if (numberofRsps % PRINT_COUNTER == 0){
                // Read reply message String
                byte ch;
                StringBuffer buffer = new StringBuffer();
                while (msg.getIn().hasRemaining() && ((ch = msg.getIn().get()) > -1)) {
                    buffer.append((char) ch);
                }
                LOG.info("Got message response " + numberofRsps + ": '" + buffer.toString() + "'");
            }

            msg.returnToParentPool();
            exitStatus = 0; // Success, we got our message response back
        }

        public void onSessionEvent(EventName event, EventReason reason) {
            String str = "[EVENT] Got event " + event + " because of " + reason;
            if (event == EventName.SESSION_CLOSED) { // normal exit
                LOG.info(str);
            } else {
                this.client.exitStatus = 1; // Failure on any kind of error
                LOG.error(str);
            }
            this.client.eqh.stop();
        }

        public void onMsgError(Msg msg, EventReason reason) {
            LOG.info("[ERROR] onMsgErrorCallback. reason=" + reason);
            if (reason == EventReason.MSG_FLUSHED) {
                LOG.info("[STATUS] getIsClosing() = " + this.client.client.getIsClosing());
            }
            msg.returnToParentPool();
            this.client.exitStatus = 1; // Failure on any kind of error
            System.exit(exitStatus);
        }
    }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions