Distributed programming

From Rosetta Code
Revision as of 19:25, 7 March 2009 by rosettacode>ShinTakezou (+lang tag massively)
Task
Distributed programming
You are encouraged to solve this task according to the task description, using any language you may know.

Given two computers on a network, send messages between them. The protocol used may be language-specific or not, and should be suitable for general distributed programming.

D

Socket

Works with: Tango

Server

<lang d>module distributedserver ; import tango.net.ServerSocket, tango.text.convert.Integer,

      tango.text.Util, tango.io.Stdout ;

void main() {

 auto Ip = new InternetAddress("localhost", 12345) ;    
 auto server = new ServerSocket(Ip) ;
 auto socket = server.accept ;     
 auto buffer = new char[socket.bufferSize] ;
 bool quit = false ;
 
 while(!quit) {
   bool error = false ;    
   
   try {
     auto len = socket.input.read(buffer) ;
     auto cmd = (len > 0) ? delimit(buffer[0..len], " ") : [""] ;              
     Stdout(cmd).newline.flush ;
     switch (cmd[0]) {
       case "square":
         socket.output.write(toString(toInt(cmd[1]) * toInt(cmd[1]))) ; break ;
       case"add":
         socket.output.write(toString(toInt(cmd[1]) + toInt(cmd[2]))) ; break ;
       case "quit": 
         socket.output.write("Server Shut down") ;           
         quit = true ; break ;
       default: error = true ;
     }
   } catch (Exception e) 
     error = true ;
   if(error) socket.output.write("<Error>") ;    
   if(socket) socket.close ;
   if(!quit) socket = server.accept ;     
 } 
 if(socket) socket.close ;

}</lang>

Client

<lang d>module distributedclient ; import tango.net.SocketConduit, tango.net.InternetAddress,

      tango.text.Util, tango.io.Stdout ;

void main(char[][] args) {

 if(args.length> 1) {
   try {
     auto Ip = new InternetAddress("localhost", 12345) ;    
     auto socket = new SocketConduit ;     
     socket.connect(Ip) ;
     auto buffer = new char[socket.bufferSize] ;
 
     socket.output.write(join(args[1..$]," ")) ;
     auto len = socket.input.read(buffer) ;    
     if(len > 0) Stdout(buffer[0..len]).newline ;
   
     if(socket) socket.close ;
   } catch(Exception e) 
     Stdout(e.msg).newline ;
 } else
   Stdout("usage: supply argument as,\n\tquit\n"
     "\tsquare <number>\n\tadd <number> <number>").newline ;

}</lang>

E

Protocol: Pluribus

This service cannot be used except by clients which know the URL designating it, messages are encrypted, and the client authenticates the server. However, it is vulnerable to denial-of-service by any client knowing the URL.

Server

(The protocol is symmetric; this program is the server only in that it is the one which is started first and exports an object.)

<lang e> def storage := [].diverge()

def logService {
  to log(line :String) {
    storage.push([timer.now(), line])
  }
  to search(substring :String) {
    var matches := []
    for [time, line] ? (line.startOf(substring) != -1) in storage {
      matches with= [time, line]
    }
    return matches
  }
}

introducer.onTheAir()
def sturdyRef := makeSturdyRef.temp(logService)
println(<captp>.sturdyToURI(sturdyRef))
interp.blockAtTop()</lang>

This will print the URL of the service and run it until aborted.

Client

The URL provided by the server is given as the argument to this program.

<lang e> def [uri] := interp.getArgs()

introducer.onTheAir()
def sturdyRef := <captp>.sturdyFromURI(uri)
def logService := sturdyRef.getRcvr()

logService <- log("foot")
logService <- log("shoe")

println("Searching...")
when (def result := logService <- search("foo")) -> {
  for [time, line] in result {
    println(`At $time: $line`)
  }
}</lang>

Erlang

The protocol is erlang's own

Server

srv.erl

<lang erlang> -module(srv).

-export([start/0, wait/0]).

start() ->
   net_kernel:start([srv,shortnames]),
   erlang:set_cookie(node(), rosetta),
   Pid = spawn(srv,wait,[]),
   register(srv,Pid),
   io:fwrite("~p ready~n",[node(Pid)]),
   ok.

wait() ->
   receive
       {echo, Pid, Any} ->
           io:fwrite("-> ~p from ~p~n", [Any, node(Pid)]),
           Pid ! {hello, Any},
           wait();
       Any -> io:fwrite("Error ~p~n", [Any])
   end.</lang>

Client

client.erl

<lang erlang> -module(client).

-export([start/0, wait/0]).

start() ->
   net_kernel:start([client,shortnames]),
   erlang:set_cookie(node(), rosetta),
   {ok,Srv} = init:get_argument(server),
   io:fwrite("conencting to ~p~n", [Srv]),
   {srv, list_to_atom(Srv)} ! {echo,self(), hi},
   wait(),
   ok.

wait() ->
   receive
       {hello, Any} -> io:fwrite("Received ~p~n", [Any]);
       Any -> io:fwrite("Error ~p~n", [Any])
   end.</lang>

running it (*comes later)

|erlc srv.erl
|erl -run srv start -noshell
 srv@agneyam ready
*-> hi from client@agneyam
|erlc client.erl
|erl -run client start -run init stop -noshell -server srv@agneyam
 conencting to "srv@agneyam"
 Received hi

OCaml

Works with: JoCaml

Minimalistic distributed logger with synchronous channels using the join calculus on top of OCaml.

Server

<lang ocaml> open Printf

 let create_logger () =
   def log(text) & logs(l) =
       printf "Logged: %s\n%!" text;
       logs((text, Unix.gettimeofday ())::l) & reply to log
 
    or search(text) & logs(l) =
       logs(l) & reply List.filter (fun (line, _) -> line = text) l to search
   in
     spawn logs([]);
     (log, search)
 
 def wait() & finished() = reply to wait
 
 let register name service = Join.Ns.register Join.Ns.here name service

 let () =
   let log, search = create_logger () in
     register "log" log;
     register "search" search;
     Join.Site.listen (Unix.ADDR_INET (Join.Site.get_local_addr(), 12345));
     wait ()</lang>

Client

<lang ocaml> open Printf

 let ns_there = Join.Ns.there (Unix.ADDR_INET (Join.Site.get_local_addr(), 12345))

 let lookup name = Join.Ns.lookup ns_there name
 
 let log : string -> unit = lookup "log"
 let search : string -> (string * float) list = lookup "search"

 let find txt =
   printf "Looking for %s...\n" txt;
   List.iter (fun (line, time) ->
                printf "Found: '%s' at t = %f\n%!" (String.escaped line) time)
     (search txt)

 let () =
   log "bar";
   find "foo";
   log "foo";
   log "shoe";
   find "foo"</lang>

Python

Works with: Python version 2.4 and 2.6

XML-RPC

Protocol: XML-RPC

Server

<lang python>

  1. !/usr/bin/env python
  2. -*- coding: utf-8 -*-

import SimpleXMLRPCServer

class MyHandlerInstance:

   def echo(self, data):
       Method for returning data got from client
       return 'Server responded: %s' % data
   def div(self, num1, num2):
       Method for divide 2 numbers
       return num1/num2

def foo_function():

   A function (not an instance method)
   return True

HOST = "localhost" PORT = 8000

server = SimpleXMLRPCServer.SimpleXMLRPCServer((HOST, PORT))

  1. register built-in system.* functions.

server.register_introspection_functions()

  1. register our instance

server.register_instance(MyHandlerInstance())

  1. register our function as well

server.register_function(foo_function)

try:

   # serve forever
   server.serve_forever()

except KeyboardInterrupt:

   print 'Exiting...'
   server.server_close()

</lang>

Client

<lang python>

  1. !/usr/bin/env python
  2. -*- coding: utf-8 -*-

import xmlrpclib

HOST = "localhost" PORT = 8000

rpc = xmlrpclib.ServerProxy("http://%s:%d" % (HOST, PORT))

  1. print what functions does server support

print 'Server supports these functions:', print ' '.join(rpc.system.listMethods())

  1. echo something

rpc.echo("We sent this data to server")

  1. div numbers

print 'Server says: 8 / 4 is: %d' % rpc.div(8, 4)

  1. control if foo_function returns True

if rpc.foo_function():

   print 'Server says: foo_function returned True'

</lang>

HTTP

Protocol: HTTP

Server

<lang python>

  1. !/usr/bin/python
  2. -*- coding: utf-8 -*-

import BaseHTTPServer

HOST = "localhost" PORT = 8000

  1. we just want to write own class, we replace do_GET method. This could be extended, I just added basics
  2. see; http://docs.python.org/lib/module-BaseHTTPServer.html

class MyHTTPHandler(BaseHTTPServer.BaseHTTPRequestHandler):

   def do_GET(self):
       # send 200 (OK) message
       self.send_response(200)
       # send header
       self.send_header("Content-type", "text/html")
       self.end_headers()
       # send context
       self.wfile.write("<html><head><title>Our Web Title</title></head>")

self.wfile.write("<body>

This is our body. You wanted to visit %s page

</body>" % self.path)

       self.wfile.write("</html>")

if __name__ == '__main__':

   server = BaseHTTPServer.HTTPServer((HOST, PORT), MyHTTPHandler)
   try:
       server.serve_forever()
   except KeyboardInterrupt:
       print 'Exiting...'
       server.server_close()

</lang>

Client

<lang python>

  1. !/usr/bin/python
  2. -*- coding: utf-8 -*-

import httplib

HOST = "localhost" PORT = 8000

conn = httplib.HTTPConnection(HOST, PORT) conn.request("GET", "/somefile")

response = conn.getresponse() print 'Server Status: %d' % response.status

print 'Server Message: %s' % response.read() </lang>

Socket, Plain Text

Protocol: Plain Text

Server

<lang python>

  1. !/usr/bin/python
  2. -*- coding: utf-8 -*-

import SocketServer

HOST = "localhost" PORT = 8000

  1. our instance that will upper whatever it gets and send back to client

class UpperCaseHandler(SocketServer.StreamRequestHandler):

   def handle(self):
       print '%s connected' % self.client_address[0]
       # get what client sends
       get = self.rfile.readline()
       # write back to client
       self.wfile.write(get.upper())

if __name__ == '__main__':

   tcpserver = SocketServer.TCPServer((HOST, PORT), UpperCaseHandler)
   try:
       tcpserver.serve_forever()
   except KeyboardInterrupt:
       print 'Exiting...'
       tcpserver.server_close()

</lang>

Client

<lang python>

  1. !/usr/bin/python
  2. -*- coding: utf-8 -*-

import socket

HOST = "localhost" PORT = 8000

DATA = "my name is eren"

  1. connect to server and send data

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((HOST, PORT)) sock.send("%s\n" % DATA)

  1. get

response = sock.recv(256) sock.close()

print "We sent: %s" % DATA print 'Server responded: %s' % response </lang>


Pyro

Note: You should install Pyro (http://pyro.sourceforge.net) first and run pyro-ns binary to run code below.

Server

<lang python>

  1. !/usr/bin/python
  2. -*- coding: utf-8 -*-

import Pyro.core import Pyro.naming

  1. create instance that will return upper case

class StringInstance(Pyro.core.ObjBase):

   def makeUpper(self, data):
       return data.upper()

class MathInstance(Pyro.core.ObjBase):

   def div(self, num1, num2):
       return num1/num2

if __name__ == '__main__':

   server = Pyro.core.Daemon()
   name_server = Pyro.naming.NameServerLocator().getNS()
   server.useNameServer(name_server)
   server.connect(StringInstance(), 'string')
   server.connect(MathInstance(), 'math')
   try:
       server.requestLoop()
   except KeyboardInterrupt:
       print 'Exiting...'
       server.shutdown()

</lang>

Client

<lang python>

  1. !/usr/bin/python
  2. -*- coding: utf-8 -*-

import Pyro.core

DATA = "my name is eren" NUM1 = 10 NUM2 = 5

string = Pyro.core.getProxyForURI("PYRONAME://string") math = Pyro.core.getProxyForURI("PYRONAME://math")

print 'We sent: %s' % DATA print 'Server responded: %s\n' % string.makeUpper(DATA)

print 'We sent two numbers to divide: %d and %d' % (NUM1, NUM2) print 'Server responded the result: %s' % math.div(NUM1, NUM2) </lang>

Spread

Note: You should install Spread (http://www.spread.org) and its python bindings (http://www.python.org/other/spread/)

Server

You don't need any code for server. You should start "spread" daemon by typing "spread -c /etc/spread.conf -n localhost". If you want more configuration, look at /etc/spread.conf.

After starting daemon, if you want to make sure that it is running, enter spuser -s 4803 command where 4803 is your port set in spread.conf, you will see prompt, type j user, you should see something like this message: Received REGULAR membership for group test with 3 members, where I am member 2

Client (Listener)

<lang python>

  1. !/usr/bin/python
  2. -*- coding: utf-8 -*-

import spread

PORT = '4803'

  1. connect spread daemon

conn = spread.connect(PORT)

  1. join the room

conn.join('test')

print 'Waiting for messages... If you want to stop this script, please stop spread daemon' while True:

   recv = conn.receive()
   if hasattr(recv, 'sender') and hasattr(recv, 'message'):
       print 'Sender: %s' % recv.sender
       print 'Message: %s' % recv.message

</lang>

Client (Sender)

<lang python>

  1. !/usr/bin/python
  2. -*- coding: utf-8 -*-

import spread

PORT = '4803'

conn = spread.connect(PORT) conn.join('test')

conn.multicast(spread.RELIABLE_MESS, 'test', 'hello, this is message sent from python') conn.disconnect() </lang>

UnixPipes

Uses netcat

server

(echo 1; echo 2; echo 3) | nc -l 1024

client

nc 192.168.0.1 1024 | wc -l