Distributed programming: Difference between revisions

(→‎UnixPipes: still incorrect)
(→‎{{header|C}}: using PVM)
Line 118:
 
=={{header|C}}==
Using PVM [[http://www.csm.ornl.gov/pvm/pvm_home.html]
{{incorrect|C|The protocol used is not sufficiently general-purpose.}}
This program is in a sense both a server and a client, depending on if its task is spawned with a command-line argument: if yes, it spawns another task of the same executible on the parallel virtual machine and waits for it to transmit data; if no, it transmits data and is done.
{{works with|Win32}} (Visual C/C++ 6.0)
<lang C>#include <stdio.h>
 
The changes needed to work with Linux/Unix should be small.
The example server serves only one client at a time. The protocol has 3 messages. The
first character determines the server action.
:'0' - echo string reversed '1' - echo string '2' - shutdown the server.
===Server===
<lang c>#include <stdio.h>
#include <stdlib.h>
#include <winsockpvm3.h>
//#include <sys/socket.h>
//#include <inet.h>
#include <string.h>
//#include <time.h>
#include <windows.h>
#pragma comment(lib, "wsock32.lib")
 
int main(int c, char **v)
typedef struct sockaddr_in * InetSockAddr;
 
typedef int (*Handler)( InetSockAddr client, int socket );
 
typedef struct sServer {
Handler handler;
int running;
int sock;
struct sockaddr_in addr;
} *Server;
 
Server NewServer( short port, Handler handler)
{
int socktids[10];
int parent, spawn;
struct sockaddr_in svrAddr;
int i_data, i2;
Server srv= malloc(sizeof(struct sServer));
double f_data;
if (!srv) return srv;
srv->handler = handler;
srv->running = 1;
sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
if (sock <0 ) {
printf("Couldn't open socket. -failed %d \n", sock);
exit(1);
}
srv->sock = sock;
 
if (c > 1) {
memset(&svrAddr, 0, sizeof(struct sockaddr_in));
spawn = pvm_spawn("/tmp/a.out", 0, PvmTaskDefault, 0, 1, tids);
svrAddr.sin_family = AF_INET;
if (spawn <= 0) {
svrAddr.sin_addr.s_addr = htonl(INADDR_ANY); //any incoming addr ok
printf("Can't spawn task\n");
svrAddr.sin_port = htons(port);
return 1;
srv->addr = svrAddr;
}
 
printf("Spawning successful\n");
if (bind(sock, (struct sockaddr *)&svrAddr, sizeof(svrAddr)) <0) {
printf("Bind to port %d failed\n", port);
exit(1);
}
return srv;
}
 
/* pvm_recv(task_id, msgtag). msgtag identifies what kind of data it is,
#define MAXPENDING SOMAXCONN
* for here: 1 = (int, double), 2 = (int, int)
#define close(sock) closesocket(sock)
*/
pvm_recv(-1, 2);
pvm_unpackf("%d %d", &i_data, &i2);
printf("got msg type 1: %d %d\n", i_data, i2);
 
pvm_recv(-1, 1);
int ServerServe( Server svr)
pvm_unpackf("%d %lf", &i_data, &f_data);
{
printf("got msg type 2: %d %f\n", i_data, f_data);
struct sockaddr_in cli_addr;
} else {
int clen = sizeof(cli_addr);
parent = pvm_parent();
if (listen(svr->sock, MAXPENDING) < 0) {
printf("listen() call failed.\n");
return -1;
}
while (svr->running) {
int sock = accept(svr->sock, (struct sockaddr *) &cli_addr, &clen);
if (sock < 0) {
printf("accept call failed in ServerServ.\n");
svr->running = 0;
}
else
svr->running = (*(svr->handler))( &cli_addr, sock );
}
}
 
#define ServerDelete( svr ) \
{ if(svr->sock>0) close(svr->sock); \
free(svr); svr = NULL; }
 
#define BUFR_SIZE 256
/* - return 0 on success, nonzero on done; */
int SimpleHandler( InetSockAddr cli_addr, int cli_sock )
{
char msgBufr[BUFR_SIZE];
int msgSize;
int rplySize;
const char *rplyMsg;
int mcode;
 
do {
msgSize = recv(cli_sock, msgBufr, sizeof(msgBufr), 0) ;
if (msgSize < 0) {
printf("done receiving\n");
return 1;
}
mcode = msgBufr[0];
switch(mcode) {
default:
case '0': {
char *p1 = msgBufr+1;
char *p2 = msgBufr+msgSize-2;
while (p2>p1) {
*p1 = *p1 + *p2; *p2 = *p1 - *p2; *p1 = *p1 - *p2;
p1++; p2--;
}
rplyMsg = msgBufr;
}
break;
case '1':
rplyMsg = msgBufr;
break;
case '2':
rplyMsg = "Server Quitting";
// server should quit
break;
}
rplySize = strlen(rplyMsg)+1;
if (rplySize > 0) {
int bytesSent = send(cli_sock, rplyMsg, rplySize, 0);
if (bytesSent < rplySize) {
printf("Not All bytes sent back from msg.");
msgSize = 0;
}
}
} while ((msgSize > 0) && (mcode !=2));
return (mcode == 2); // true if server should quit
}
 
int main( int argc, char *argv[])
{
short port;
Server server;
WORD sockVrsn;
WSADATA wsaData;
 
sockVrsn = MAKEWORD(1,1);
 
if (argc < 2) {
printf("Usage %s <port number>\n",argv[0]);
exit(1);
}
WSAStartup(sockVrsn, &wsaData);
port = (short)atoi(argv[1]);
server = NewServer( port, &SimpleHandler);
if (server){
ServerServe(server);
ServerDelete(server);
}
return 0;
}</lang>
===Client===
<lang c>#include <stdio.h>
#include <stdlib.h>
#include <winsock.h>
//#include <sys/socket.h>
//#include <inet.h>
#include <string.h>
//#include <time.h>
#include <windows.h>
#pragma comment(lib, "wsock32.lib")
 
typedef struct sockaddr_in * InetSockAddr;
 
typedef struct sClient {
int sock;
struct sockaddr_in addr;
} *Client;
 
Client NewClient( const char *ipAddr, short port )
{
int sock;
struct sockaddr_in svrAddr;
Client clint= malloc(sizeof(struct sClient));
if (!clint) return clint;
 
sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
if (sock <0 ) {
printf("Couldn't open socket. -failed %d \n", sock);
exit(1);
}
clint->sock = sock;
 
memset(&svrAddr, 0, sizeof(struct sockaddr_in));
svrAddr.sin_family = AF_INET;
svrAddr.sin_addr.s_addr = inet_addr(ipAddr);
printf("IP addr: %x", inet_addr(ipAddr));
svrAddr.sin_port = htons(port);
clint->addr = svrAddr;
 
if (connect(sock, (struct sockaddr *)&svrAddr, sizeof(svrAddr)) <0) {
perror("connect failed");
printf("Connect to Server %s:%d failed\n", ipAddr,port);
exit(1);
}
return clint;
}
 
#define close(sock) closesocket(sock)
 
#define BUFR_SIZE 128
 
int ClientDoProcs( Client clint, const char *mlist[])
{
char rcvBufr[BUFR_SIZE];
const char **msg;
int v;
 
for (msg = mlist; *msg; msg++) {
int mlen = strlen(*msg)+1;
printf("Send: %s\n", *msg);
v = send(clint->sock, *msg, mlen, 0);
if (v != mlen ) {
printf("MessageSend error: %d %d\n", v,mlen);
}
else {
int bytesRcvd = 0;
bytesRcvd = recv( clint->sock, rcvBufr, BUFR_SIZE-1, 0);
if (bytesRcvd < 0) {
printf("Error Rcvg Bytes\n");
break;
}
printf("Recvd: %s\n", rcvBufr);
}
}
return 0;
}
 
#define ClientDelete( cli ) \
{ if(cli->sock>0) close(cli->sock); \
free(cli); cli = NULL; }
 
const char *msglist[] = {
"0Hello World!!!",
"1Hello Teacher!",
"1This should echo back same",
"0This should echo back reversed",
"2 ByeBye",
NULL };
 
int main( int argc, char *argv[])
{
short port;
Client clint;
WORD sockVrsn;
WSADATA wsaData;
 
pvm_initsend(PvmDataDefault);
sockVrsn = MAKEWORD(1,1);
i_data = rand();
WSAStartup(sockVrsn, &wsaData);
f_data = (double)rand() / RAND_MAX;
pvm_packf("%d %lf", i_data, f_data);
pvm_send(parent, 1); /* send msg type 1 */
 
pvm_initsend(PvmDataDefault);
if (argc < 3) {
i2 = rand();
printf("Usage %s <serverIP> <port number>\n",argv[0]);
pvm_packf("%d %d", i_data, i2);
exit(1);
pvm_send(parent, 2); /* send msg type 2 */
}
}
 
pvm_exit();
port = (short)atoi(argv[2]);
return 0;
// argv[1]="127.0.0.1";
}</lang>Running it: (on PVM console, exe is /tmp/a.out)<lang>pvm> spawn -> /tmp/a.out 1
clint = NewClient( argv[1], port );
spawn -> /tmp/a.out 1
if (clint) {
[2]
ClientDoProcs(clint, msglist);
1 successful
ClientDelete(clint);
t40028
}
pvm> [2:t40029] EOF
Sleep(10);
[2:t40028] Spawning successful
return 0;
[2:t40028] got msg type 2: 1804289383 1681692777
}</lang>
[2:t40028] got msg type 1: 1804289383 0.394383
[2:t40028] EOF
[2] finished</lang>
 
=={{header|C sharp|C#}}==
Anonymous user