diff --git a/src/Socket2.cpp b/src/Socket2.cpp index 7cb26fb153be7f92d308a623808da69a37fb8070..7f723794fc5982ace2d3a103611efc2365a1f999 100644 --- a/src/Socket2.cpp +++ b/src/Socket2.cpp @@ -42,11 +42,17 @@ #include <Socket2.h> #include <Socket2Class.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <unistd.h> #include <fcntl.h> #include <sys/ioctl.h> -#include <csignal> #include <arpa/inet.h> #include <netinet/tcp.h> +#include <errno.h> +#include <algorithm> +#include <ctime> +#include <csignal> /*----- PROTECTED REGION END -----*/ // Socket2.cpp @@ -93,8 +99,6 @@ Socket2::Socket2(Tango::DeviceClass *cl, std::string &s) : TANGO_BASE_CLASS(cl, s.c_str()) { /*----- PROTECTED REGION ID(Socket2::constructor_1) ENABLED START -----*/ - reconnections = -1; - connecting = false; init_device(); /*----- PROTECTED REGION END -----*/ // Socket2::constructor_1 @@ -104,8 +108,6 @@ Socket2::Socket2(Tango::DeviceClass *cl, const char *s) : TANGO_BASE_CLASS(cl, s) { /*----- PROTECTED REGION ID(Socket2::constructor_2) ENABLED START -----*/ - reconnections = -1; - connecting = false; init_device(); /*----- PROTECTED REGION END -----*/ // Socket2::constructor_2 @@ -115,8 +117,6 @@ Socket2::Socket2(Tango::DeviceClass *cl, const char *s, const char *d) : TANGO_BASE_CLASS(cl, s, d) { /*----- PROTECTED REGION ID(Socket2::constructor_3) ENABLED START -----*/ - reconnections = -1; - connecting = false; init_device(); /*----- PROTECTED REGION END -----*/ // Socket2::constructor_3 @@ -159,8 +159,8 @@ void Socket2::init_device() /*----- PROTECTED REGION ID(Socket2::init_device_before) ENABLED START -----*/ // Initialization before get_device_property() call + reconnections = 0; init_error.clear(); - /*----- PROTECTED REGION END -----*/ // Socket2::init_device_before @@ -176,34 +176,16 @@ void Socket2::init_device() /*----- PROTECTED REGION ID(Socket2::init_device) ENABLED START -----*/ - // Initialize device - try - { - set_state( Tango::INIT ); - set_status( "Connecting..." ); - - if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) - { - ERROR_STREAM << "Fail to ignore SIGPIPE" << endl; - } + // Disabling SIGPIPE signal + if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) + init_error = "Fail to ignore SIGPIPE signal"; + // Initialize device + if (init_error.empty()) { resolve(); open(); + check_state(true); } - catch (Tango::DevFailed &e) - { - init_error = "Initialization failed: " + string(e.errors[0].desc); - } - catch (...) - { - init_error = "Initialization failed: Unknown error"; - } - - if( init_error.empty() ) - { - check_connection( ); - } - /*----- PROTECTED REGION END -----*/ // Socket2::init_device } @@ -310,29 +292,34 @@ void Socket2::get_device_property() /*----- PROTECTED REGION ID(Socket2::get_device_property_after) ENABLED START -----*/ // Check device property data members init - transform(protocol.begin(), protocol.end(), protocol.begin(), ::tolower); - if (protocol == "udp") - { + if (protocol == "udp") { proto = UDP; - DEBUG_STREAM << "Using UDP protocol" << endl; } else { proto = TCP; - DEBUG_STREAM << "Using TCP protocol" << endl; } transform(iOMultiplexing.begin(), iOMultiplexing.end(), iOMultiplexing.begin(), ::tolower); - if (iOMultiplexing == "sleep") - { + if (iOMultiplexing == "sleep") { multiplexing = SLEEP; - DEBUG_STREAM << "Using sleep IO multiplexing type" << endl; } else { multiplexing = SELECT; - DEBUG_STREAM << "Using select IO multiplexing type" << endl; } - if (port > 0xFFFF) - init_error = "Invalit port number"; + if (port <= 0 || port > 65535) + init_error = "Invalid port"; + + if (timeout <= 0) + timeout = 1000; + + DEBUG_STREAM << "Connecting to " << hostname + << " on port " << port << " using " << protocol << " protocol" + << " and " << iOMultiplexing << " IO multiplexing type" + << " with a timeout of " << timeout << " ms "<< endl; + + timeout_timeval.tv_sec = timeout / 1000; + timeout_timeval.tv_usec = timeout % 1000 * 1000; + /*----- PROTECTED REGION END -----*/ // Socket2::get_device_property_after } //-------------------------------------------------------- @@ -381,22 +368,13 @@ void Socket2::always_executed_hook() /*----- PROTECTED REGION ID(Socket2::always_executed_hook) ENABLED START -----*/ // code always executed before all requests + tout = timeout_timeval; - if (! init_error.empty()) - { + if (! init_error.empty()) { set_state(Tango::FAULT); set_status(init_error); - return; - } - - tout.tv_sec = timeout / 1000; - tout.tv_usec = timeout % 1000 * 1000; - if ( ! timerisset( &tout ) ) - { - set_state(Tango::FAULT); - set_status("Invalid timeout"); } else { - check_connection( ); + check_state(true); } /*----- PROTECTED REGION END -----*/ // Socket2::always_executed_hook @@ -414,7 +392,7 @@ void Socket2::read_attr_hardware(TANGO_UNUSED(std::vector<long> &attr_list)) /*----- PROTECTED REGION ID(Socket2::read_attr_hardware) ENABLED START -----*/ // Add your own code - + /*----- PROTECTED REGION END -----*/ // Socket2::read_attr_hardware } @@ -432,9 +410,16 @@ void Socket2::read_InputLength(Tango::Attribute &attr) DEBUG_STREAM << "Socket2::read_InputLength(Tango::Attribute &attr) entering... " << std::endl; /*----- PROTECTED REGION ID(Socket2::read_InputLength) ENABLED START -----*/ // Set the attribute value - attr_InputLength_read[0] = input_queue_length() + data.size(); - attr.set_value(attr_InputLength_read); - + Tango::AttrQuality qual; + long len = input_queue_length(); + if (len >= 0) { + qual = Tango::ATTR_VALID; + attr_InputLength_read[0] = len + data.size(); + } else { + qual = Tango::ATTR_INVALID; + attr_InputLength_read[0] = data.size(); + } + attr.set_value_date_quality(attr_InputLength_read, time(NULL), qual); /*----- PROTECTED REGION END -----*/ // Socket2::read_InputLength } //-------------------------------------------------------- @@ -451,9 +436,16 @@ void Socket2::read_OutputLength(Tango::Attribute &attr) DEBUG_STREAM << "Socket2::read_OutputLength(Tango::Attribute &attr) entering... " << std::endl; /*----- PROTECTED REGION ID(Socket2::read_OutputLength) ENABLED START -----*/ // Set the attribute value - attr_OutputLength_read[0] = output_queue_length(); - attr.set_value(attr_OutputLength_read); - + Tango::AttrQuality qual; + long len = output_queue_length(); + if (len >= 0) { + qual = Tango::ATTR_VALID; + attr_OutputLength_read[0] = len; + } else { + qual = Tango::ATTR_INVALID; + attr_OutputLength_read[0] = 0; + } + attr.set_value_date_quality(attr_OutputLength_read, time(NULL), qual); /*----- PROTECTED REGION END -----*/ // Socket2::read_OutputLength } //-------------------------------------------------------- @@ -504,107 +496,46 @@ void Socket2::write(const Tango::DevVarCharArray *argin) { DEBUG_STREAM << "Socket2::Write() - " << device_name << std::endl; /*----- PROTECTED REGION ID(Socket2::write) ENABLED START -----*/ - check_init(); - - char *argin_data = new char[ argin->length() ]; - for( unsigned int i=0; i<argin->length(); ++i ) - { - argin_data[i] = (*argin)[i]; + if (! init_error.empty()) { + DEBUG_STREAM << init_error << endl; + sleep(tout); + Tango::Except::throw_exception("", + init_error.c_str(), + "Socket2::write()"); } - int bytes_total = 0, bytes_to_write = argin->length(); - while (bytes_total != bytes_to_write && wait_for( WRITE, &tout ) ) - { - int bytes_written; - bytes_written = proto == UDP? sendto(fd, argin_data + bytes_total, - bytes_to_write - bytes_total, 0, - (struct sockaddr*) &sa, sa_len) : - ::write(fd, argin_data + bytes_total, bytes_to_write - bytes_total); - if (bytes_written < 0) - { - DEBUG_STREAM << strerror( errno ) << " (" << errno << ")" << endl; - if( errno == EINTR ) - { - continue; - } - - if( errno == ECONNREFUSED || errno == EHOSTUNREACH) - { - delete argin_data; - - close(); - open(); - - string error_mesg = "Connection refused"; - DEBUG_STREAM << error_mesg << endl; - - set_state( Tango::FAULT ); - set_status( error_mesg ); - - sleep( tout.tv_sec ); - usleep( tout.tv_usec ); - timerclear( &tout ); + vector<unsigned char> argin_data; + argin_data << *argin; + size_t bytes_total = 0, bytes_to_write = argin_data.size(); - Tango::Except::throw_exception( "", - error_mesg, - "Socket2::write()"); - } - - ERROR_STREAM << "write() error not handled:" << endl; - assert( false ); - } - else if( bytes_written == 0 ) - { - assert( false ); - } - else /* bytes_written > 0 */ - { + while (bytes_total < bytes_to_write && wait_for(WRITE)) { + ssize_t bytes_written = _write(fd, argin_data.data() + bytes_total, + bytes_to_write - bytes_total); + if ( bytes_written > 0) { bytes_total += bytes_written; - } - } - delete argin_data; - - timeval time_to_wait; - time_to_wait.tv_sec = 0; - time_to_wait.tv_usec = 1000; - - while ( output_queue_length() ) - { - timeval newtimeout; - timersub( &tout, &time_to_wait, &newtimeout ); - if( newtimeout.tv_sec >= 0 && newtimeout.tv_usec >= 0 ) - { - sleep( time_to_wait.tv_sec ); - usleep( time_to_wait.tv_usec ); - - tout = newtimeout; - timeradd( &time_to_wait, &time_to_wait, &time_to_wait ); - } - else - { - sleep( tout.tv_sec ); - usleep( tout.tv_usec ); - timerclear( &tout ); + } else if (bytes_written == 0 && multiplexing == SELECT) { + break; + } else if (bytes_written == 0 && multiplexing == SLEEP) { + /* Ignore */ + } else { /* bytes_written < 0 */ + check_state(false); break; } } - if( (bytes_total - output_queue_length()) != bytes_to_write ) - { - close(); - open(); - - string error_mesg = "Unable to send request to device"; - DEBUG_STREAM << error_mesg << endl; - - set_state( Tango::FAULT ); - set_status( error_mesg ); + timeval twait; + timerclear(&twait); + twait.tv_usec = 1000; + int olength = max(output_queue_length(), 0); - Tango::Except::throw_exception( "", - error_mesg, - "Socket2::write()"); + while ((bytes_total - olength) != bytes_to_write) { + if (! sleep(twait)) + Tango::Except::throw_exception("", + "Timeout expired", + "Socket2::write()"); + timeradd(&twait, &twait, &twait); + olength = max(output_queue_length(), 0); } - /*----- PROTECTED REGION END -----*/ // Socket2::write } //-------------------------------------------------------- @@ -621,34 +552,34 @@ Tango::DevVarCharArray *Socket2::read(Tango::DevLong argin) Tango::DevVarCharArray *argout; DEBUG_STREAM << "Socket2::Read() - " << device_name << std::endl; /*----- PROTECTED REGION ID(Socket2::read) ENABLED START -----*/ - check_init(); - - if (argin < 0) - { - Tango::Except::throw_exception("", - "Input has to be in positive range", - "Socket2::read()"); - } - - while( (size_t)argin > data.size() ) - { - if ( ! wait_for( READ, &tout ) ) - { - string mesg( "No response from device" ); - DEBUG_STREAM << mesg << endl; + try { + if (! init_error.empty()) { + DEBUG_STREAM << init_error << endl; Tango::Except::throw_exception("", - mesg, "Socket2::read()"); + init_error.c_str(), + "Socket2::read()"); } - } - argout = new Tango::DevVarCharArray(); - argout->length(argin); - for( int i=0; i<argin; ++i ) - { - (*argout)[i] = data[i]; + if (argin < 0) { + Tango::Except::throw_exception("", + "Out of limit", + "Socket2::read()"); + } + + if (common_read(argin) < (size_t)argin) { + Tango::Except::throw_exception("", + "Timeout expired", + "Socket2::read()"); + } + + argout = new Tango::DevVarCharArray(); + vector<unsigned char> transfer(data.begin(), data.begin() + argin); + data.erase(data.begin(), data.begin() + argin); + *argout << transfer; + } catch(...) { + sleep(tout); + throw; } - data.erase( data.begin(), data.begin() + argin ); - /*----- PROTECTED REGION END -----*/ // Socket2::read return argout; } @@ -666,47 +597,47 @@ Tango::DevVarCharArray *Socket2::read_until(const Tango::DevVarCharArray *argin) Tango::DevVarCharArray *argout; DEBUG_STREAM << "Socket2::ReadUntil() - " << device_name << std::endl; /*----- PROTECTED REGION ID(Socket2::read_until) ENABLED START -----*/ - check_init(); + try { + if (! init_error.empty()) { + DEBUG_STREAM << init_error << endl; + Tango::Except::throw_exception("", + init_error.c_str(), + "Socket2::read_until()"); + } - if (argin->length() != 1) - { - Tango::Except::throw_exception("", - "Delimiter has to be one byte length", - "Socket2::read_until()"); - } + if (argin->length() != 1) { + Tango::Except::throw_exception("", + "Delimiter has to be exactly one byte", + "Socket2::read_until()"); + } - char delim = (*argin)[0]; - bool found = false; - size_t pos; + char delim = (*argin)[0]; + bool found = false; + size_t pos = 0; - while( ! found ) - { - for( pos = 0; pos < data.size(); ++pos ) - { - if (memcmp(&data[pos], &delim, 1) == 0) - { - found = true; - break; + do { + for (; pos < data.size(); ++pos) { + if (memcmp(&data[pos], &delim, 1) == 0) { + found = true; + break; + } } - } - if ( ! found && ! wait_for( READ, &tout ) ) - { - string mesg( "No response from device" ); - DEBUG_STREAM << mesg << endl; + } while (! found && common_read(1)); + + if (! found) { Tango::Except::throw_exception("", - mesg, "Socket2::read_until()"); + "Timeout expired", + "Socket2::read_until()"); } - } - - argout = new Tango::DevVarCharArray(); - argout->length( pos+1 ); - for( size_t i = 0; i < pos + 1; ++i ) - { - (*argout)[i] = data[i]; + argout = new Tango::DevVarCharArray(); + vector<unsigned char> transfer(data.begin(), data.begin() + pos +1); + data.erase(data.begin(), data.begin() + pos + 1); + *argout << transfer; + } catch(...) { + sleep(tout); + throw; } - data.erase( data.begin(), data.begin() + pos + 1 ); - /*----- PROTECTED REGION END -----*/ // Socket2::read_until return argout; } @@ -729,92 +660,89 @@ void Socket2::add_dynamic_commands() /*----- PROTECTED REGION ID(Socket2::namespace_ending) ENABLED START -----*/ // Additional Methods -void Socket2::check_init() +bool Socket2::sleep(timeval tv) { - if (! init_error.empty() ) - { - DEBUG_STREAM << init_error << endl; - Tango::Except::throw_exception( "", - init_error.c_str(), - "Socket2::check_init()"); + if (! timerisset(&tout)) + return false; + + if (timercmp(&tout, &tv, <)) { + ::sleep(tout.tv_sec); + ::usleep(tout.tv_usec); + timerclear(&tout); + } else { // tout >= tv + ::sleep(tv.tv_sec); + ::usleep(tv.tv_usec); + timersub(&tout, &tv, &tout); + assert(tout.tv_sec >= 0 && tout.tv_usec >= 0); } + return true; } void Socket2::open() { - DEBUG_STREAM << "Creating the file descriptor..." << endl; + DEBUG_STREAM << "Opening the file descriptor..." << endl; - if ((fd = socket(PF_INET, proto == UDP? SOCK_DGRAM:SOCK_STREAM, 0)) == -1) - { - string error_mesg = "Socket creation failed: " - + string(strerror( errno )); - ERROR_STREAM << error_mesg << endl; - assert( false); - Tango::Except::throw_exception( "", - error_mesg, - "Socket2::open()"); + if ((fd = ::socket(PF_INET, proto == UDP? SOCK_DGRAM:SOCK_STREAM, 0)) == -1) { + ERROR_STREAM << "Socket creation failed: " + << string(strerror(errno)) << endl; + return; } if (proto == TCP) { int flag = 1; - if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)) == -1) - { - ::close( fd ); - - string error_mesg = "Disabling Nagle failed: " - + string(strerror( errno )); - ERROR_STREAM << error_mesg << endl; - assert( false); - Tango::Except::throw_exception( "", - error_mesg, - "Socket2::open()"); + if (::setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, + sizeof(flag)) == -1) { + ::close(fd); + ERROR_STREAM << "Disabling Nagle failed: " + << string(strerror(errno)) << endl; + return; } flag = 1; - if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&flag, sizeof(flag)) == -1) - { - ::close( fd ); - - string error_mesg = "Enabling reuseaddr flag failed: " - + string(strerror( errno )); - ERROR_STREAM << error_mesg << endl; - assert( false); - Tango::Except::throw_exception( "", - error_mesg, - "Socket2::open()"); + if (::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&flag, + sizeof(flag)) == -1) { + ::close(fd); + ERROR_STREAM << "Enabling reuseaddr flag failed: " + << string(strerror(errno)) << endl; + return; } } int flags = fcntl(fd, F_GETFL, 0); - if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) - { - ::close( fd ); - - string error_mesg = "Enabling O_NONBLOCK failed: " - + string(strerror( errno )); - ERROR_STREAM << error_mesg << endl; - assert( false); - Tango::Except::throw_exception( "", - error_mesg, - "Socket2::open()"); + if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) { + ::close(fd); + ERROR_STREAM << "Enabling O_NONBLOCK failed: " + << string(strerror(errno)) << endl; + return; } - DEBUG_STREAM << "Connecting..." << endl; ::connect(fd, (sockaddr*)&sa, sizeof(sockaddr)); +} + +int Socket2::input_queue_length() +{ + int len; + if (::ioctl(fd, TIOCINQ, &len) == -1) + return -1; + return len; +} - connecting = true; +int Socket2::output_queue_length() +{ + int len; + if (::ioctl(fd, TIOCOUTQ, &len) == -1) + return -1; + return len; } void Socket2::close() { DEBUG_STREAM << "Closing the file descriptor..." << endl; - connecting = false; - - int input_len = input_queue_length() + data.size(); - int output_len = output_queue_length(); + int output_len = max(output_queue_length(), 0); + int input_len = max(input_queue_length(), 0) + data.size(); - if( input_len + output_len) + if(input_len + output_len) { WARN_STREAM << " Bytes dropped: " << input_len << " input, " << output_len << " output" << endl; @@ -825,30 +753,7 @@ void Socket2::close() ERROR_STREAM << "Error closing file descriptor: " << strerror(errno) << endl; } - data.clear(); - - DEBUG_STREAM << "File descriptor closed" << endl; -} - -int Socket2::input_queue_length() -{ - int len; - if (ioctl(fd, FIONREAD, &len) == -1) - { - len = 0; - } - return len; -} - -int Socket2::output_queue_length() -{ - int len; - if (ioctl(fd, TIOCOUTQ, &len) == -1) - { - len = 0; - } - return len; } void Socket2::resolve() @@ -857,77 +762,81 @@ void Socket2::resolve() char ipstr[INET6_ADDRSTRLEN]; sa_len = sizeof(sa); - memset(&sa, 0, sa_len); + ::memset(&sa, 0, sa_len); addrinfo hints; - memset(&hints, 0, sizeof(hints)); + ::memset(&hints, 0, sizeof(hints)); hints.ai_family = AF_UNSPEC; hints.ai_socktype = SOCK_STREAM; addrinfo *res, *p; - if (inet_pton(AF_INET, hostname.c_str(), &(sa.sin_addr)) > 0) - { + if (::inet_pton(AF_INET, hostname.c_str(), &(sa.sin_addr)) > 0) { sa.sin_family = AF_INET; - sa.sin_port = htons(port); - } - else if (getaddrinfo(hostname.c_str(), NULL, &hints, &res) == 0) - { - for (p=res; p!=NULL; p=p->ai_next) - { - if (p->ai_family == AF_INET) - { - inet_ntop(p->ai_family, - &(((sockaddr_in *)p->ai_addr)->sin_addr), - ipstr, sizeof ipstr); + sa.sin_port = ::htons(port); + } else if (::getaddrinfo(hostname.c_str(), NULL, &hints, &res) == 0) { + for (p=res; p!=NULL; p=p->ai_next) { + if (p->ai_family == AF_INET) { + ::inet_ntop(p->ai_family, + &(((sockaddr_in *)p->ai_addr)->sin_addr), + ipstr, sizeof ipstr); sa.sin_addr = ((sockaddr_in *)p->ai_addr)->sin_addr; sa.sin_family = ((sockaddr_in *)p->ai_addr)->sin_family; - sa.sin_port = htons(port); + sa.sin_port = ::htons(port); } } - freeaddrinfo(res); - } - else - { - string error_mesg = "Name resolution failed"; - ERROR_STREAM << error_mesg << endl; - Tango::Except::throw_exception( "", - error_mesg, - "Socket2::resolve()"); + ::freeaddrinfo(res); + } else { + ERROR_STREAM << "Name resolution failed" << endl; } } -void Socket2::check_connection( ) +ssize_t Socket2::_write(int fd, const void *buf, size_t count) +{ + errno = 0; + int ret = proto == UDP? + ::sendto(fd, buf, count, 0, (sockaddr*) &sa, sa_len) : + ::write(fd, buf, count); + conn_state = errno; + return ret; +} + +ssize_t Socket2::_read(int fd, void *buf, size_t count) { - int so_error; - socklen_t len = sizeof so_error; - getsockopt(fd, SOL_SOCKET, SO_ERROR, &so_error, &len); -#ifndef NDEBUG - DEBUG_STREAM << "getsockopt(): " << strerror( so_error ) << " (" << so_error << ")" << endl; -#endif - - bool do_reconnect = false; - string error_mesg; - switch( so_error ) + int ret = proto == UDP? + ::recvfrom(fd, buf, count, 0, (sockaddr*) &sa, &sa_len): + ::read(fd, buf, count); + return ret; +} + +void Socket2::check_state(bool forcing) +{ + string mesg; + + if (forcing) + (void)_write(fd, NULL, 0); + + switch(conn_state) { + case 0: /* Success */ + set_state(Tango::ON); + set_status("Connected"); + case EINTR: + case EAGAIN: + return; case EHOSTUNREACH: - error_mesg = "No route to host"; - do_reconnect = true; + mesg = "No route to host"; break; case ECONNREFUSED: - error_mesg = "Connection refused"; - do_reconnect = true; + mesg = "Connection refused"; break; case EPIPE: - error_mesg = "Broken pipe"; - do_reconnect = true; + mesg = "Broken pipe"; break; case ETIMEDOUT: - error_mesg = "Connection timed out"; - do_reconnect = true; + mesg = "Connection timed out"; break; case ECONNRESET: - error_mesg = "Connection reset by peer"; - do_reconnect = true; + mesg = "Connection reset by peer"; break; case EISCONN: case EALREADY: @@ -935,7 +844,6 @@ void Socket2::check_connection( ) case EADDRNOTAVAIL: case EAFNOSUPPORT: case EBADF: - case EINTR: case ENOTSOCK: case EPROTOTYPE: case EIO: @@ -944,149 +852,41 @@ void Socket2::check_connection( ) case ENOENT: case ENOTDIR: default: - ERROR_STREAM << "Socket error " << so_error - << " not handled!" << endl; - abort( ); - break; - case 0 /* SUCCESS */: + ERROR_STREAM << "Socket error " << conn_state + << " not handled!" << endl; + abort(); break; } - if( do_reconnect ) - { - close(); - open(); - DEBUG_STREAM << error_mesg << endl; - set_state( Tango::FAULT ); - set_status( error_mesg ); - } - else - { - timeval tv; - timerclear( &tv ); - if( wait_for( WRITE, &tv ) ) - { - if( connecting ) - { - reconnections++; - connecting = false; - } - - set_state( Tango::ON ); - set_status( "Connected" ); - } - } -} - -bool Socket2::read(timeval *tv) -{ - int bytes_total = 0, bytes_to_read = input_queue_length(); - do - { - int len = (bytes_to_read - bytes_total) > BUFFER_SIZE? - BUFFER_SIZE : (bytes_to_read - bytes_total); - - int bytes_readed = proto == UDP? - recvfrom(fd, buffer, len, 0, (struct sockaddr*) &sa, &sa_len) : - ::read(fd, buffer, len); - - if( bytes_readed < 0 ) - { - DEBUG_STREAM << strerror( errno ) << " (" << errno << ")" << endl; - if( errno == EINTR) - { - continue; - } - } - else if( bytes_readed == 0 ) - { - close(); - open(); - - string error_mesg = "Server shutting down"; - DEBUG_STREAM << error_mesg << endl; - - set_state(Tango::FAULT); - set_status(error_mesg); - - sleep( tv->tv_sec ); - usleep( tv->tv_usec ); - timerclear( tv ); - - return false; - } - else /* bytes_readed > 0 */ - { - data.insert(data.end(), &buffer[0], &buffer[bytes_readed]); - bytes_total += bytes_readed; - } - } - while (bytes_total != bytes_to_read); + set_state(Tango::INIT); + set_status("Reconnecting due: " + mesg); - return true; + close(); + resolve(); + open(); + reconnections += 1; } -bool Socket2::wait_for( event_type et, timeval *tv ) +bool Socket2::wait_for(event_type et) { - switch(multiplexing) { - case SLEEP: - return wait_for_with_sleep(et, tv); - default: - return wait_for_with_select(et, tv); + if (multiplexing == SLEEP) { + timeval twait; + timerclear(&twait); + twait.tv_usec = 10000; + return sleep(twait); } -} - -bool Socket2::wait_for_with_sleep( event_type et, timeval *tv ) -{ - struct timeval sleep_time; - sleep_time.tv_sec = 0; - sleep_time.tv_usec = 10000; /* 10 ms */ - - do { - switch( et ) - { - case WRITE: - if( output_queue_length() == 0 ) - { -#ifndef NDEBUG - DEBUG_STREAM << "Ready to write" << endl; -#endif - return true; - } - break; - case READ: - if( input_queue_length() != 0 ) - { -#ifndef NDEBUG - DEBUG_STREAM << "Ready to read" << endl; -#endif - return read(tv); - } - break; - } - int usleep_ret = usleep(sleep_time.tv_usec); -#ifndef NDEBUG - DEBUG_STREAM << "usleep(): " << usleep_ret << endl; -#else - (void)usleep_ret; -#endif - timersub(tv, &sleep_time, tv); - } while (timerisset(tv)); + // multiplexing == SELECT + fd_set readfds; + fd_set writefds; + fd_set errorfds; - return false; -} - -bool Socket2::wait_for_with_select( event_type et, timeval *tv ) -{ FD_ZERO(&errorfds); FD_ZERO(&readfds); FD_ZERO(&writefds); - FD_SET(fd, &errorfds); - switch( et ) - { + switch (et) { case WRITE: FD_SET(fd, &writefds); break; @@ -1094,45 +894,46 @@ bool Socket2::wait_for_with_select( event_type et, timeval *tv ) FD_SET(fd, &readfds); break; } - int select_ret = select( fd + 1, &readfds, &writefds, &errorfds, tv ); -#ifndef NDEBUG - DEBUG_STREAM << "select(): " << select_ret << endl; -#endif + int select_ret = select(fd + 1, &readfds, &writefds, + &errorfds, &tout); - if( select_ret == -1 ) - { - ERROR_STREAM << "Select() error " << select_ret << " not handled:" - << strerror( errno ) << endl; - assert( false ); - } - else if( select_ret == 0 ) - { + if (select_ret == -1 || FD_ISSET(fd, &errorfds)) { + ERROR_STREAM << "Select() error " << select_ret + << " not handled:" << strerror(errno) << endl; return false; - } - - if (FD_ISSET(fd, &errorfds)) - { - ERROR_STREAM << "Select() error event not handled!" << endl; - assert( false ); - } - - if (FD_ISSET(fd, &writefds)) - { -#ifndef NDEBUG - DEBUG_STREAM << "Ready to write" << endl; -#endif - } + } else if (select_ret == 0) { + return false; + } else if (et == WRITE && FD_ISSET(fd, &writefds)) { + return true; + } else if (et == READ && FD_ISSET(fd, &readfds)) { + return true; + } else {} + assert(false); +} - if (FD_ISSET(fd, &readfds)) - { -#ifndef NDEBUG - DEBUG_STREAM << "Ready to read" << endl; -#endif - return read(tv); +size_t Socket2::common_read(size_t bytes_to_read) +{ + unsigned char buffer[10000]; + size_t bytes_total = data.size(); + + while (bytes_total < bytes_to_read && wait_for(READ)) { + ssize_t bytes_readed = _read(fd, buffer, + min((size_t)max(input_queue_length(), 0), + sizeof(buffer))); + if (bytes_readed > 0) { + data.insert(data.end(), &buffer[0], &buffer[bytes_readed]); + bytes_total += bytes_readed; + } else if (bytes_readed == 0 && multiplexing == SELECT) { + break; + } else if (bytes_readed == 0 && multiplexing == SLEEP) { + /* Ignore */ + } else { /* bytes_readed < 0 */ + check_state(true); + break; + } } - - return true; + return bytes_total; } /*----- PROTECTED REGION END -----*/ // Socket2::namespace_ending diff --git a/src/Socket2.h b/src/Socket2.h index 569a9ace582ca84038673a69a67fa0d7c0af73db..2744a715a7d2fd4a62905550747dc18c72d8b69b 100644 --- a/src/Socket2.h +++ b/src/Socket2.h @@ -41,8 +41,6 @@ #include <tango.h> #include <netdb.h> -#define BUFFER_SIZE 1000 - /*----- PROTECTED REGION END -----*/ // Socket2.h #ifdef TANGO_LOG @@ -77,28 +75,22 @@ class Socket2 : public TANGO_BASE_CLASS // Add your own data members string init_error; + enum {TCP, UDP} proto; + + int fd; + int conn_state; sockaddr_in sa; socklen_t sa_len; - - enum { TCP, UDP } proto; - - int fd; - fd_set readfds; - fd_set writefds; - fd_set errorfds; - - enum event_type { READ, WRITE }; - unsigned char buffer[ BUFFER_SIZE ]; - vector< unsigned char > data; + enum event_type {READ, WRITE}; - bool connecting; - long long reconnections; + vector<unsigned char> data; - enum { SLEEP, SELECT } multiplexing; + int reconnections; - timeval tout; + enum {SLEEP, SELECT} multiplexing; + timeval timeout_timeval, tout; /*----- PROTECTED REGION END -----*/ // Socket2::Data Members // Device property data members @@ -106,7 +98,7 @@ public: // Hostname: std::string hostname; // Port: - Tango::DevUShort port; + Tango::DevLong port; // Protocol: std::string protocol; // Timeout: @@ -267,22 +259,17 @@ public: /*----- PROTECTED REGION ID(Socket2::Additional Method prototypes) ENABLED START -----*/ // Additional Method prototypes - void check_init(); - - void resolve(); - + bool sleep(timeval); void open(); - void close(); - void check_connection( ); - bool wait_for( event_type et, timeval *tv ); - bool wait_for_with_sleep( event_type et, timeval *tv ); - bool wait_for_with_select( event_type et, timeval *tv ); - int input_queue_length(); int output_queue_length(); - - bool read( struct timeval *tv ); - + void close(); + void resolve(); + ssize_t _write(int, const void*, size_t); + ssize_t _read(int, void*, size_t); + void check_state(bool); + bool wait_for(event_type); + size_t common_read(size_t); /*----- PROTECTED REGION END -----*/ // Socket2::Additional Method prototypes }; diff --git a/src/Socket2.xmi b/src/Socket2.xmi index 43d71478e087e045eaf7a04c043de226cd993a8c..e8453a8383116f1f686be4a87839ec35bda32258 100644 --- a/src/Socket2.xmi +++ b/src/Socket2.xmi @@ -1,7 +1,7 @@ <?xml version="1.0" encoding="ASCII"?> <pogoDsl:PogoSystem xmi:version="2.0" xmlns:xmi="http://www.omg.org/XMI" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:pogoDsl="http://www.esrf.fr/tango/pogo/PogoDsl"> <classes name="Socket2" pogoRevision="9.7"> - <description description="" title="" sourcePath="/home/alessio/Sources/git-trees/socket2/src" language="Cpp" filestogenerate="XMI file,Code files,Protected Regions" license="GPL" hasMandatoryProperty="true" hasConcreteProperty="true" hasAbstractCommand="false" hasAbstractAttribute="false"> + <description description="" title="" sourcePath="/home/alessio.bogani/Sources/git-trees/linkstabilizer/deps/socket2/src" language="Cpp" filestogenerate="XMI file,Code files,Protected Regions" license="GPL" hasMandatoryProperty="true" hasConcreteProperty="true" hasAbstractCommand="false" hasAbstractAttribute="false"> <inheritances classname="Device_Impl" sourcePath=""/> <identification contact="at elettra.eu> - Alessio Igor Bogani <alessio.bogani" author="Alessio Igor Bogani <alessio.bogani" emailDomain="elettra.eu>" classFamily="Communication" siteSpecific="" platform="Unix Like" bus="Socket" manufacturer="none" reference=""/> </description> @@ -10,7 +10,7 @@ <status abstract="false" inherited="false" concrete="true" concreteHere="true"/> </deviceProperties> <deviceProperties name="Port" mandatory="true" description=""> - <type xsi:type="pogoDsl:UShortType"/> + <type xsi:type="pogoDsl:IntType"/> <status abstract="false" inherited="false" concrete="true" concreteHere="true"/> </deviceProperties> <deviceProperties name="Protocol" description=""> @@ -25,8 +25,9 @@ <deviceProperties name="IOMultiplexing" description="Use `sleep` for fixed sleep and `select` for the syscall of the same name"> <type xsi:type="pogoDsl:StringType"/> <status abstract="false" inherited="false" concrete="true" concreteHere="true"/> + <DefaultPropValue>SELECT</DefaultPropValue> </deviceProperties> - <commands name="Write" description="" execMethod="write" displayLevel="OPERATOR" polledPeriod="0"> + <commands name="Write" description="" execMethod="write" displayLevel="OPERATOR" polledPeriod="0" isDynamic="false"> <argin description=""> <type xsi:type="pogoDsl:CharArrayType"/> </argin> @@ -34,7 +35,6 @@ <type xsi:type="pogoDsl:VoidType"/> </argout> <status abstract="false" inherited="false" concrete="true" concreteHere="true"/> - <excludedStates>INIT</excludedStates> </commands> <commands name="Read" description="" execMethod="read" displayLevel="OPERATOR" polledPeriod="0"> <argin description=""> @@ -44,7 +44,6 @@ <type xsi:type="pogoDsl:CharArrayType"/> </argout> <status abstract="false" inherited="false" concrete="true" concreteHere="true"/> - <excludedStates>INIT</excludedStates> </commands> <commands name="ReadUntil" description="" execMethod="read_until" displayLevel="OPERATOR" polledPeriod="0"> <argin description=""> @@ -54,7 +53,6 @@ <type xsi:type="pogoDsl:CharArrayType"/> </argout> <status abstract="false" inherited="false" concrete="true" concreteHere="true"/> - <excludedStates>INIT</excludedStates> </commands> <attributes name="InputLength" attType="Scalar" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:IntType"/> @@ -83,10 +81,16 @@ <states name="ON" description="Connected"> <status abstract="false" inherited="false" concrete="true" concreteHere="true"/> </states> - <states name="FAULT" description="Connection failed"> + <states name="INIT" description=""> <status abstract="false" inherited="false" concrete="true" concreteHere="true"/> </states> - <states name="INIT" description=""> + <states name="UNKNOWN" description=""> + <status abstract="false" inherited="false" concrete="true" concreteHere="true"/> + </states> + <states name="ALARM" description=""> + <status abstract="false" inherited="false" concrete="true" concreteHere="true"/> + </states> + <states name="FAULT" description=""> <status abstract="false" inherited="false" concrete="true" concreteHere="true"/> </states> <preferences docHome="./doc_html" makefileHome="$(TANGO_HOME)"/> diff --git a/src/Socket2Class.cpp b/src/Socket2Class.cpp index 0e5e6d8d0872c3c89cea0f2d84462760474243f8..477b4182a859468b3a37ada3ba069dea7902c634 100644 --- a/src/Socket2Class.cpp +++ b/src/Socket2Class.cpp @@ -338,8 +338,9 @@ void Socket2Class::set_default_property() add_wiz_dev_prop(prop_name, prop_desc); prop_name = "IOMultiplexing"; prop_desc = "Use `sleep` for fixed sleep and `select` for the syscall of the same name"; - prop_def = ""; + prop_def = "SELECT"; vect_data.clear(); + vect_data.push_back("SELECT"); if (prop_def.length()>0) { Tango::DbDatum data(prop_name); diff --git a/src/Socket2StateMachine.cpp b/src/Socket2StateMachine.cpp index fbfeb043515278c7b81ac6b32046ba5030a2d331..39a857c88daa241cfb901b4f0ac5c3c556f8a3c4 100644 --- a/src/Socket2StateMachine.cpp +++ b/src/Socket2StateMachine.cpp @@ -39,11 +39,13 @@ /*----- PROTECTED REGION END -----*/ // Socket2::Socket2StateMachine.cpp //================================================================ -// States | Description +// States | Description //================================================================ -// ON | Connected -// FAULT | Connection failed -// INIT | +// ON | Connected +// INIT | +// UNKNOWN | +// ALARM | +// FAULT | namespace Socket2_ns @@ -113,14 +115,10 @@ bool Socket2::is_Reconnections_allowed(TANGO_UNUSED(Tango::AttReqType type)) //-------------------------------------------------------- bool Socket2::is_Write_allowed(TANGO_UNUSED(const CORBA::Any &any)) { - // Compare device state with not allowed states. - if (get_state()==Tango::INIT) - { + // Not any excluded states for Write command. /*----- PROTECTED REGION ID(Socket2::WriteStateAllowed) ENABLED START -----*/ /*----- PROTECTED REGION END -----*/ // Socket2::WriteStateAllowed - return false; - } return true; } @@ -132,14 +130,10 @@ bool Socket2::is_Write_allowed(TANGO_UNUSED(const CORBA::Any &any)) //-------------------------------------------------------- bool Socket2::is_Read_allowed(TANGO_UNUSED(const CORBA::Any &any)) { - // Compare device state with not allowed states. - if (get_state()==Tango::INIT) - { + // Not any excluded states for Read command. /*----- PROTECTED REGION ID(Socket2::ReadStateAllowed) ENABLED START -----*/ /*----- PROTECTED REGION END -----*/ // Socket2::ReadStateAllowed - return false; - } return true; } @@ -151,14 +145,10 @@ bool Socket2::is_Read_allowed(TANGO_UNUSED(const CORBA::Any &any)) //-------------------------------------------------------- bool Socket2::is_ReadUntil_allowed(TANGO_UNUSED(const CORBA::Any &any)) { - // Compare device state with not allowed states. - if (get_state()==Tango::INIT) - { + // Not any excluded states for ReadUntil command. /*----- PROTECTED REGION ID(Socket2::ReadUntilStateAllowed) ENABLED START -----*/ /*----- PROTECTED REGION END -----*/ // Socket2::ReadUntilStateAllowed - return false; - } return true; }