diff --git a/src/Socket2.cpp b/src/Socket2.cpp index 9180ba3aea2d82ab509875c42cb1817d24ad2f9c..863b251bf001ccde8a69366eb6a762a53e5790e1 100644 --- a/src/Socket2.cpp +++ b/src/Socket2.cpp @@ -180,11 +180,11 @@ void Socket2::init_device() if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) init_error = "Fail to ignore SIGPIPE signal"; - // Initialize device + // Initialize device if (init_error.empty()) { resolve(); open(); - check_state(true); + check_state(true); } /*----- PROTECTED REGION END -----*/ // Socket2::init_device } @@ -312,10 +312,9 @@ void Socket2::get_device_property() if (timeout <= 0) timeout = 1000; - DEBUG_STREAM << "Connecting to " << hostname - << " on port " << port << " using " << protocol << " protocol" - << " and " << iOMultiplexing << " IO multiplexing type" - << " with a timeout of " << timeout << " ms "<< endl; + DEBUG_STREAM << "Connecting to " << hostname << " on port " << port + << " using " << protocol << " protocol" << " and " << iOMultiplexing + << " IO multiplexing type" << " with a timeout of " << timeout << " ms "<< endl; timeout_timeval.tv_sec = timeout / 1000; timeout_timeval.tv_usec = timeout % 1000 * 1000; @@ -373,6 +372,7 @@ void Socket2::always_executed_hook() if (! init_error.empty()) { set_state(Tango::FAULT); set_status(init_error); + DEBUG_STREAM << init_error << endl; } else { check_state(true); } @@ -497,45 +497,54 @@ void Socket2::write(const Tango::DevVarCharArray *argin) DEBUG_STREAM << "Socket2::Write() - " << device_name << std::endl; /*----- PROTECTED REGION ID(Socket2::write) ENABLED START -----*/ if (! init_error.empty()) { - DEBUG_STREAM << init_error << endl; sleep(tout); - Tango::Except::throw_exception("", - init_error.c_str(), - "Socket2::write()"); + Tango::Except::throw_exception( + "", init_error.c_str(), __PRETTY_FUNCTION__); } vector<unsigned char> argin_data; argin_data << *argin; - size_t bytes_total = 0, bytes_to_write = argin_data.size(); + size_t bytes_total = 0, bytes_to_write = argin_data.size(); + int olength; + + while (bytes_total < bytes_to_write) { + if (! wait_for(WRITE)) + goto timeout; + + ssize_t bytes_written = _write( + fd, argin_data.data() + bytes_total, + bytes_to_write - bytes_total); - while (bytes_total < bytes_to_write && wait_for(WRITE)) { - ssize_t bytes_written = _write(fd, argin_data.data() + bytes_total, - bytes_to_write - bytes_total); if ( bytes_written > 0) { bytes_total += bytes_written; - } else if (bytes_written == 0 && multiplexing == SELECT) { - break; - } else if (bytes_written == 0 && multiplexing == SLEEP) { - /* Ignore */ + } else if (bytes_written == 0) { + if (multiplexing == SELECT) + goto error; + /* Continue if multiplexing == SLEEP */ } else { /* bytes_written < 0 */ check_state(false); - break; + goto error; } } timeval twait; timerclear(&twait); twait.tv_usec = 1000; - int olength = max(output_queue_length(), 0); - + olength = max(output_queue_length(), 0); while ((bytes_total - olength) != bytes_to_write) { if (! sleep(twait)) - Tango::Except::throw_exception("", - "Timeout expired", - "Socket2::write()"); + goto timeout; timeradd(&twait, &twait, &twait); olength = max(output_queue_length(), 0); } + + return; + +error: + sleep(tout); +timeout: + Tango::Except::throw_exception( + "", "Timeout expired", __PRETTY_FUNCTION__); /*----- PROTECTED REGION END -----*/ // Socket2::write } //-------------------------------------------------------- @@ -552,34 +561,24 @@ Tango::DevVarCharArray *Socket2::read(Tango::DevLong argin) Tango::DevVarCharArray *argout; DEBUG_STREAM << "Socket2::Read() - " << device_name << std::endl; /*----- PROTECTED REGION ID(Socket2::read) ENABLED START -----*/ - try { - if (! init_error.empty()) { - DEBUG_STREAM << init_error << endl; - Tango::Except::throw_exception("", - init_error.c_str(), - "Socket2::read()"); - } + if (! init_error.empty()) { + sleep(tout); + Tango::Except::throw_exception( + "", init_error.c_str(), __PRETTY_FUNCTION__); + } - if (argin < 0) { - Tango::Except::throw_exception("", - "Out of limit", - "Socket2::read()"); - } - - if (common_read(argin) < (size_t)argin) { - Tango::Except::throw_exception("", - "Timeout expired", - "Socket2::read()"); - } - - argout = new Tango::DevVarCharArray(); - vector<unsigned char> transfer(data.begin(), data.begin() + argin); - data.erase(data.begin(), data.begin() + argin); - *argout << transfer; - } catch(...) { + if (argin < 0) { sleep(tout); - throw; + Tango::Except::throw_exception( + "", "Out of limit", __PRETTY_FUNCTION__); } + + common_read(argin); + + argout = new Tango::DevVarCharArray(); + vector<unsigned char> transfer(data.begin(), data.begin() + argin); + data.erase(data.begin(), data.begin() + argin); + *argout << transfer; /*----- PROTECTED REGION END -----*/ // Socket2::read return argout; } @@ -597,47 +596,35 @@ Tango::DevVarCharArray *Socket2::read_until(const Tango::DevVarCharArray *argin) Tango::DevVarCharArray *argout; DEBUG_STREAM << "Socket2::ReadUntil() - " << device_name << std::endl; /*----- PROTECTED REGION ID(Socket2::read_until) ENABLED START -----*/ - try { - if (! init_error.empty()) { - DEBUG_STREAM << init_error << endl; - Tango::Except::throw_exception("", - init_error.c_str(), - "Socket2::read_until()"); - } + if (! init_error.empty()) { + sleep(tout); + Tango::Except::throw_exception( + "", init_error.c_str(), __PRETTY_FUNCTION__); + } - if (argin->length() != 1) { - Tango::Except::throw_exception("", - "Delimiter has to be exactly one byte", - "Socket2::read_until()"); - } + if (argin->length() != 1) { + sleep(tout); + Tango::Except::throw_exception( + "", "Delimiter has to be exactly one byte", __PRETTY_FUNCTION__); + } - char delim = (*argin)[0]; - bool found = false; - size_t pos = 0; + char delim = (*argin)[0]; + size_t pos = 0, dsize; - do { - for (; pos < data.size(); ++pos) { - if (memcmp(&data[pos], &delim, 1) == 0) { - found = true; - break; - } + do { + dsize = data.size(); + for (; pos < dsize; ++pos) { + if (memcmp(&data[pos], &delim, 1) == 0) { + break; } - } while (! found && common_read(1)); - - if (! found) { - Tango::Except::throw_exception("", - "Timeout expired", - "Socket2::read_until()"); } + common_read(dsize + 1); + } while (true); - argout = new Tango::DevVarCharArray(); - vector<unsigned char> transfer(data.begin(), data.begin() + pos +1); - data.erase(data.begin(), data.begin() + pos + 1); - *argout << transfer; - } catch(...) { - sleep(tout); - throw; - } + argout = new Tango::DevVarCharArray(); + vector<unsigned char> transfer(data.begin(), data.begin() + pos +1); + data.erase(data.begin(), data.begin() + pos + 1); + *argout << transfer; /*----- PROTECTED REGION END -----*/ // Socket2::read_until return argout; } @@ -663,7 +650,7 @@ void Socket2::add_dynamic_commands() bool Socket2::sleep(timeval tv) { if (! timerisset(&tout)) - return false; + return false; if (timercmp(&tout, &tv, <)) { ::sleep(tout.tv_sec); @@ -691,7 +678,7 @@ void Socket2::open() if (proto == TCP) { int flag = 1; if (::setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, - sizeof(flag)) == -1) { + sizeof(flag)) == -1) { ::close(fd); ERROR_STREAM << "Disabling Nagle failed: " << string(strerror(errno)) << endl; @@ -700,7 +687,7 @@ void Socket2::open() flag = 1; if (::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&flag, - sizeof(flag)) == -1) { + sizeof(flag)) == -1) { ::close(fd); ERROR_STREAM << "Enabling reuseaddr flag failed: " << string(strerror(errno)) << endl; @@ -756,40 +743,6 @@ void Socket2::close() data.clear(); } -void Socket2::resolve() -{ - DEBUG_STREAM << "Resolving " << hostname << "... " << endl; - char ipstr[INET6_ADDRSTRLEN]; - - sa_len = sizeof(sa); - ::memset(&sa, 0, sa_len); - - addrinfo hints; - ::memset(&hints, 0, sizeof(hints)); - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - addrinfo *res, *p; - - if (::inet_pton(AF_INET, hostname.c_str(), &(sa.sin_addr)) > 0) { - sa.sin_family = AF_INET; - sa.sin_port = htons(port); - } else if (::getaddrinfo(hostname.c_str(), NULL, &hints, &res) == 0) { - for (p=res; p!=NULL; p=p->ai_next) { - if (p->ai_family == AF_INET) { - ::inet_ntop(p->ai_family, - &(((sockaddr_in *)p->ai_addr)->sin_addr), - ipstr, sizeof ipstr); - sa.sin_addr = ((sockaddr_in *)p->ai_addr)->sin_addr; - sa.sin_family = ((sockaddr_in *)p->ai_addr)->sin_family; - sa.sin_port = htons(port); - } - } - ::freeaddrinfo(res); - } else { - ERROR_STREAM << "Name resolution failed" << endl; - } -} - ssize_t Socket2::_write(int fd, const void *buf, size_t count) { errno = 0; @@ -817,11 +770,11 @@ void Socket2::check_state(bool forcing) switch(conn_state) { - case 0: /* Success */ - set_state(Tango::ON); - set_status("Connected"); + case 0: /* Success */ + set_state(Tango::ON); + set_status("Connected"); case EINTR: - case EAGAIN: + case EAGAIN: return; case EHOSTUNREACH: mesg = "No route to host"; @@ -853,8 +806,8 @@ void Socket2::check_state(bool forcing) case ENOTDIR: default: ERROR_STREAM << "Socket error " << conn_state - << " not handled!" << endl; - abort(); + << " not handled!" << endl; + assert(false); break; } @@ -895,46 +848,94 @@ bool Socket2::wait_for(event_type et) break; } - int select_ret = select(fd + 1, &readfds, &writefds, - &errorfds, &tout); - - if (select_ret == -1 || FD_ISSET(fd, &errorfds)) { + int select_ret = select(fd + 1, &readfds, + &writefds, &errorfds, &tout); + + if (select_ret > 0) { + if (FD_ISSET(fd, &errorfds)) + return false; + if (et == READ && FD_ISSET(fd, &readfds)) + return true; + if (et == WRITE && FD_ISSET(fd, &writefds)) + return true; + assert(false); + return false; + } else if (select_ret == 0) { + return false; + } else { // select_ret < 0 ERROR_STREAM << "Select() error " << select_ret << " not handled:" << strerror(errno) << endl; return false; - } else if (select_ret == 0) { - return false; - } else if (et == WRITE && FD_ISSET(fd, &writefds)) { - return true; - } else if (et == READ && FD_ISSET(fd, &readfds)) { - return true; - } else {} + } assert(false); return false; } -size_t Socket2::common_read(size_t bytes_to_read) +void Socket2::common_read(size_t bytes_to_read) { unsigned char buffer[10000]; size_t bytes_total = data.size(); - while (bytes_total < bytes_to_read && wait_for(READ)) { + while (bytes_total < bytes_to_read) { + if (! wait_for(READ)) + goto timeout; + ssize_t bytes_readed = _read(fd, buffer, - min((size_t)max(input_queue_length(), 0), - sizeof(buffer))); + min((size_t)max(input_queue_length(), 0), + sizeof(buffer))); + if (bytes_readed > 0) { data.insert(data.end(), &buffer[0], &buffer[bytes_readed]); bytes_total += bytes_readed; - } else if (bytes_readed == 0 && multiplexing == SELECT) { - break; - } else if (bytes_readed == 0 && multiplexing == SLEEP) { - /* Ignore */ + } else if (bytes_readed == 0) { + if (multiplexing == SELECT) + goto error; + /* Continue if multiplexing == SLEEP */ } else { /* bytes_readed < 0 */ check_state(true); - break; + goto error; } } - return bytes_total; + return; +error: + sleep(tout); +timeout: + Tango::Except::throw_exception( + "", "Timeout expired", __PRETTY_FUNCTION__); +} + +void Socket2::resolve() +{ + DEBUG_STREAM << "Resolving " << hostname << "... " << endl; + char ipstr[INET6_ADDRSTRLEN]; + + sa_len = sizeof(sa); + ::memset(&sa, 0, sa_len); + + addrinfo hints; + ::memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + addrinfo *res, *p; + + if (::inet_pton(AF_INET, hostname.c_str(), &(sa.sin_addr)) > 0) { + sa.sin_family = AF_INET; + sa.sin_port = htons(port); + } else if (::getaddrinfo(hostname.c_str(), NULL, &hints, &res) == 0) { + for (p=res; p!=NULL; p=p->ai_next) { + if (p->ai_family == AF_INET) { + ::inet_ntop(p->ai_family, + &(((sockaddr_in *)p->ai_addr)->sin_addr), + ipstr, sizeof ipstr); + sa.sin_addr = ((sockaddr_in *)p->ai_addr)->sin_addr; + sa.sin_family = ((sockaddr_in *)p->ai_addr)->sin_family; + sa.sin_port = htons(port); + } + } + ::freeaddrinfo(res); + } else { + ERROR_STREAM << "Name resolution failed" << endl; + } } /*----- PROTECTED REGION END -----*/ // Socket2::namespace_ending diff --git a/src/Socket2.h b/src/Socket2.h index 2744a715a7d2fd4a62905550747dc18c72d8b69b..1912cb8cc7d4410c360845cc023729d0368ee576 100644 --- a/src/Socket2.h +++ b/src/Socket2.h @@ -264,12 +264,12 @@ public: int input_queue_length(); int output_queue_length(); void close(); - void resolve(); ssize_t _write(int, const void*, size_t); ssize_t _read(int, void*, size_t); void check_state(bool); bool wait_for(event_type); - size_t common_read(size_t); + void common_read(size_t); + void resolve(); /*----- PROTECTED REGION END -----*/ // Socket2::Additional Method prototypes };