Friday, 15 February 2008

Generic compute server in Scala using remote actors

So here's a very basic generic compute server in Scala. The idea is that a "server" accepts some task from a client, runs the task and then returns the result. The server is then ready again to accept another task from another client. The "work" can be anything and the server does not have to be compiled with the classes. It makes use of Scala actors running on the client and server. In this example there is one client and one server but there could be lots of servers, splitting up some parallelizable task.

The tricky bit is getting the classes that represent the task to be run over to the server. As of Scala release 2.7.0-RC2, its now possible to specify the class loader for the remote actor and, in this example, the client sends a message to the server classloader to tell it where to get the classes from (if it hasn't got them already). The server uses a URLClassloader to fetch the classes via http from the client.


Prerequisites:
You need at least Scala 2.7.0-RC2
You need Java 1.5 or above.

There are three Scala files and two java files, all are listed in full in this blog post.

Put the two scala files below on the server. Command.scala contains the commands that the client can send to the server. The "Execute" command takes a function to be executed on the server and could be anything at all. As a result the server will need to be able to dynamically fetch classes from the client in order to execute the function. The AddClassLoaderUrl command allows the client to tell the server where to get the classes from.


/* Commands.scala */

@serializable
case class Message(body: Any)

@serializable
case class AddClassLoaderUrl(url: String)

@serializable
case class Execute(f: () => Any)

@serializable
case class Stop(body: String)


and

/* Server.scala */

import scala.actors.Actor
import scala.actors.Actor._
import scala.actors.remote.RemoteActor
import scala.actors.remote.RemoteActor._
import scala.actors.Debug
import java.net.URLClassLoader
import java.io._
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;

// Extend URLClassLoader so we can add a URL dynamically
class MyURLClassLoader(urls:Array[URL], parent:ClassLoader) extends URLClassLoader(urls,parent) {

def add(urlString: String) {
// This is a protected method on URLClassLoader
addURL(new URL(urlString))
}

// override this method for debugging purposes, to see what classes are missing and where we are looking for them
override protected def findClass(name:String) : Class[Any] = {
println("Classloader looking for class: " +name + " at URL: " + getURLs.mkString)
return super.findClass(name).asInstanceOf[Class[Any]]
}
}

object Server extends Application {

// See the low level communication messages between client and server
// Debug.level = 9

// Create a new URLClassLoader whose list of URLs we add to dynamically
val cl = new MyURLClassLoader(new Array[URL](0),getClass().getClassLoader())

// Tell the RemoteActor all about our classloader
// (If we don't as a minimum say RemoteActor.classLoader = getClass().getClassLoader()
// then no application classes are found). See issue https://lampsvn.epfl.ch/trac/scala/ticket/50
RemoteActor.classLoader = cl

actor {
alive(12345)
register('MYSERVICE, self)
// RemoteActor.classLoader is bound to TcpService and JavaSerializer here,
// and classloader cannot then be reassigned
// thats why we have to extend URLClassLoader to get access
// to protected addURL method

loop {
react {
case AddClassLoaderUrl(url) =>
cl.add(url)
// synchronous reply
reply("Thanks for the classloader url:" + url)
case Message(message) =>
// asynchronous reply
sender ! Message("Thanks for the message: " + message)
case Execute(f) =>
val result = f()
// asynchronous reply
sender ! result
case Stop(message) =>
exit()
}
}
}
}

then compile and run

fsc *.scala
scala Server


Now for the client. Put Commands.scala (above) and Client.scala (below) in some directory on the client


/* Client.scala */

import scala.actors.Actor._
import scala.actors.Debug
import scala.actors.remote.Node
import scala.actors.remote.TcpService._
import scala.actors.remote.RemoteActor
import scala.actors.remote.RemoteActor._
import java.net.InetAddress


@serializable
class UnknownToServer {
override def toString = { "I'm a class that the server does not know about, and so has to load dynamically" }
}

object Client extends Application {

// See the low level communication messages between client and server
// Debug.level = 9

// If we don't as a minimum say RemoteActor.classLoader = getClass().getClassLoader() then nothing works
RemoteActor.classLoader =getClass().getClassLoader()


actor {
val computeServer = select(Node("192.168.30.8", 12345), 'MYSERVICE)

// URL of the class file server, where the unkown class files can be server from
// InetAddress.getLocalHost().getHostAddress() should just be this machine's IP address
val classFileServerUrl = "http://" + InetAddress.getLocalHost().getHostAddress() + ":2001/"

// Tell the computeServer how it can find any unknown classes.
// This message is sent syncronously (using !?), because the server needs the classloader setup
// before any further messages are *received* as messages are deserialized *before* being processed.
val reply = computeServer !? AddClassLoaderUrl(classFileServerUrl)
println(reply)

// Send the server a message with a class it knows nothing about
computeServer ! Message(new UnknownToServer)
receive {
case Message(message) =>
println(message)
}

// This function is represented by an anonynous class which has to be loaded dynamically
def fn = () => (1 to 99).foldLeft(0) {println("Hello from a generic function passed dynamically to the server"); _+_}

computeServer ! Execute(fn)
receive {
case x =>
println("Result of generic compute function excuted on server:" + x)
}

// computeServer ! Stop("foo")
exit
}
}


IMPORTANT: Change the 192.168.30.8 above to whatever the IP address of your server is (Find out with ipconfig on Window/ifconfig on Linux).

Next we have two Java files that need to go in the same directory on the client. This is code from Sun that implements a very basic http server that serves up class files for that the server for any classes that it does not know about. This code could be translated into Scala, I just haven't got round to it yet.


/*
* ClassServer.java
*
* Copyright (c) 1996, 1996, 1997 Sun Microsystems, Inc. All Rights Reserved.
*
* SUN MAKES NO REPRESENTATIONS OR WARRANTIES ABOUT THE SUITABILITY OF THE
* SOFTWARE, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
* IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
* PURPOSE, OR NON-INFRINGEMENT. SUN SHALL NOT BE LIABLE FOR ANY DAMAGES
* SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING OR DISTRIBUTING
* THIS SOFTWARE OR ITS DERIVATIVES.
*
* CopyrightVersion 1.1_beta
*/

import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URL;
import java.net.URLClassLoader;

/**
* ClassServer is an abstract class that provides the basic functionality of a
* mini-webserver, specialized to load class files only. A ClassServer must be
* extended and the concrete subclass should define the getBytes method
* which is responsible for retrieving the bytecodes for a class.
*


*
* The ClassServer creates a thread that listens on a socket and accepts HTTP
* GET requests. The HTTP response contains the bytecodes for the class that
* requested in the GET header.
*


*
* For loading remote classes, an RMI application can use a concrete subclass of
* this server in place of an HTTP server.
*


*
* @see ClassFileServer
*/
public abstract class ClassServer implements Runnable {

private ServerSocket server = null;
private int port;

/**
* Constructs a ClassServer that listens on port and obtains a
* class's bytecodes using the method getBytes.
*
* @param port
* the port number
* @exception IOException
* if the ClassServer could not listen on port.
*/
protected ClassServer(int port) throws IOException {
this.port = port;
server = new ServerSocket(port);
newListener();
}

/**
* Returns an array of bytes containing the bytecodes for the class
* represented by the argument path. The path is a dot
* separated class name with the ".class" extension removed.
*
* @return the bytecodes for the class
* @exception ClassNotFoundException
* if the class corresponding to path could not be
* loaded.
* @exception IOException
* if error occurs reading the class
*/
public abstract byte[] getBytes(String path) throws IOException,
ClassNotFoundException;

/**
* The "listen" thread that accepts a connection to the server, parses the
* header to obtain the class file name and sends back the bytecodes for the
* class (or error if the class is not found or the response was malformed).
*/
public void run() {
Socket socket;

// accept a connection
try {
socket = server.accept();
} catch (IOException e) {
System.out.println("Class Server died: " + e.getMessage());
e.printStackTrace();
return;
}

// create a new thread to accept the next connection
newListener();

try {
DataOutputStream out = new DataOutputStream(socket
.getOutputStream());
try {
// get path to class file from header
DataInputStream in = new DataInputStream(socket
.getInputStream());
String path = getPath(in);
// retrieve bytecodes
byte[] bytecodes = getBytes(path);
// send bytecodes in response (assumes HTTP/1.0 or later)
try {
out.writeBytes("HTTP/1.0 200 OK\r\n");
out.writeBytes("Content-Length: " + bytecodes.length
+ "\r\n");
out.writeBytes("Content-Type: application/java\r\n\r\n");
out.write(bytecodes);
out.flush();
} catch (IOException ie) {
return;
}

} catch (Exception e) {
// write out error response
out.writeBytes("HTTP/1.0 400 " + e.getMessage() + "\r\n");
out.writeBytes("Content-Type: text/html\r\n\r\n");
out.flush();
}

} catch (IOException ex) {
// eat exception (could log error to log file, but
// write out to stdout for now).
System.out.println("error writing response: " + ex.getMessage());
ex.printStackTrace();

} finally {
try {
socket.close();
} catch (IOException e) {
}
}
}

/**
* Create a new thread to listen.
*/
private void newListener() {
(new Thread(this)).start();
}

/**
* Returns the path to the class file obtained from parsing the HTML header.
*/
private static String getPath(DataInputStream in) throws IOException {

BufferedReader d = new BufferedReader(new InputStreamReader(in));

String line = d.readLine();
String path = "";

System.out.println(line);

// extract class from GET line
if (line.startsWith("GET /")) {
line = line.substring(5, line.length() - 1).trim();

int index = line.indexOf(".class ");
if (index != -1) {
path = line.substring(0, index).replace('/', '.');
}
}

// eat the rest of header
do {
line = d.readLine();
System.out.println(line);
} while ((line.length() != 0) && (line.charAt(0) != '\r')
&& (line.charAt(0) != '\n'));

if (path.length() != 0) {
return path;
} else {
throw new IOException("Malformed Header");
}
}
}


and

/*
* ClassFileServer.java
*
* Copyright (c) 1996, 1996, 1997 Sun Microsystems, Inc. All Rights Reserved.
*
* SUN MAKES NO REPRESENTATIONS OR WARRANTIES ABOUT THE SUITABILITY OF THE
* SOFTWARE, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
* IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
* PURPOSE, OR NON-INFRINGEMENT. SUN SHALL NOT BE LIABLE FOR ANY DAMAGES
* SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING OR DISTRIBUTING
* THIS SOFTWARE OR ITS DERIVATIVES.
*/

import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;

/**
* The ClassFileServer implements a ClassServer that reads class files from the
* file system. See the doc for the "Main" method for how to run this server.
*/
public class ClassFileServer extends ClassServer {

private String classpath;

private static int DefaultServerPort = 2001;

/**
* Constructs a ClassFileServer.
*
* @param classpath
* the classpath where the server locates classes
*/
public ClassFileServer(int port, String classpath) throws IOException {
super(port);
this.classpath = classpath;
}

/**
* Returns an array of bytes containing the bytecodes for the class
* represented by the argument path. The path is a dot
* separated class name with the ".class" extension removed.
*
* @return the bytecodes for the class
* @exception ClassNotFoundException
* if the class corresponding to path could not be
* loaded.
*/
public byte[] getBytes(String path) throws IOException,
ClassNotFoundException {
System.out.println("reading: " + path);
String prefix = classpath.length() == 0 ? "" : classpath
+ File.separator;
File f = new File(prefix + path.replace('.', File.separatorChar)
+ ".class");
int length = (int) (f.length());
if (length == 0) {
Exception e = new IOException("File length is zero: " + path);
System.out.println(e);
throw new IOException("File length is zero: " + path);
} else {
FileInputStream fin = new FileInputStream(f);
DataInputStream in = new DataInputStream(fin);

byte[] bytecodes = new byte[length];
in.readFully(bytecodes);
return bytecodes;
}
}

/**
* Main method to create the class server that reads class files. This takes
* two command line arguments, the port on which the server accepts requests
* and the root of the classpath. To start up the server:

*

*
* java ClassFileServer
*


*

*
* The codebase of an RMI server using this webserver would simply contain a
* URL with the host and port of the web server (if the webserver's
* classpath is the same as the RMI server's classpath):

*

*
* java -Djava.rmi.server.codebase=http://zaphod:2001/ RMIServer
*

*

*

*
* You can create your own class server inside your RMI server application
* instead of running one separately. In your server main simply create a
* ClassFileServer:

*

*
* new ClassFileServer(port, classpath);
*

*/
public static void main(String args[]) {
int port = DefaultServerPort;
String classpath = "";

if (args.length >= 1) {
port = Integer.parseInt(args[0]);
}

if (args.length >= 2) {
classpath = args[1];
}

try {
new ClassFileServer(port, classpath);
} catch (IOException e) {
System.out
.println("Unable to start ClassServer: " + e.getMessage());
e.printStackTrace();
}
}
}




and compile the scala and java with


fsc *.scala
javac *.java


Run the java class server with

java ClassFileServer

This will serve up the Scala classes that the server requests. This doesn't terminate so you'll have to open up another console/command window to run the Scala client. (Ideally this could be incorporated into the client.)

Then run the client with

scala Client

Which will run a simple function on the server and print the results.

2 comments:

Daniel Green said...
This comment has been removed by the author.
Daniel Green said...

I rewrote the Client, and while everything else works, I get a NonSerializableException when I try to send Execute(f)

http://www.nabble.com/NotSerializableException-with-Remote-actor-td15644079.html

Hope to hear from someone! I think I might have stumbled across some kind of bug...