Skip to content
Snippets Groups Projects
Commit a3e1eee0 authored by Alessio Igor Bogani's avatar Alessio Igor Bogani
Browse files

Simplify the code

parent 74c5dc93
No related branches found
No related tags found
No related merge requests found
......@@ -180,11 +180,11 @@ void Socket2::init_device()
if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
init_error = "Fail to ignore SIGPIPE signal";
// Initialize device
// Initialize device
if (init_error.empty()) {
resolve();
open();
check_state(true);
check_state(true);
}
/*----- PROTECTED REGION END -----*/ // Socket2::init_device
}
......@@ -312,10 +312,9 @@ void Socket2::get_device_property()
if (timeout <= 0)
timeout = 1000;
DEBUG_STREAM << "Connecting to " << hostname
<< " on port " << port << " using " << protocol << " protocol"
<< " and " << iOMultiplexing << " IO multiplexing type"
<< " with a timeout of " << timeout << " ms "<< endl;
DEBUG_STREAM << "Connecting to " << hostname << " on port " << port
<< " using " << protocol << " protocol" << " and " << iOMultiplexing
<< " IO multiplexing type" << " with a timeout of " << timeout << " ms "<< endl;
timeout_timeval.tv_sec = timeout / 1000;
timeout_timeval.tv_usec = timeout % 1000 * 1000;
......@@ -373,6 +372,7 @@ void Socket2::always_executed_hook()
if (! init_error.empty()) {
set_state(Tango::FAULT);
set_status(init_error);
DEBUG_STREAM << init_error << endl;
} else {
check_state(true);
}
......@@ -497,45 +497,54 @@ void Socket2::write(const Tango::DevVarCharArray *argin)
DEBUG_STREAM << "Socket2::Write() - " << device_name << std::endl;
/*----- PROTECTED REGION ID(Socket2::write) ENABLED START -----*/
if (! init_error.empty()) {
DEBUG_STREAM << init_error << endl;
sleep(tout);
Tango::Except::throw_exception("",
init_error.c_str(),
"Socket2::write()");
Tango::Except::throw_exception(
"", init_error.c_str(), __PRETTY_FUNCTION__);
}
vector<unsigned char> argin_data;
argin_data << *argin;
size_t bytes_total = 0, bytes_to_write = argin_data.size();
size_t bytes_total = 0, bytes_to_write = argin_data.size();
int olength;
while (bytes_total < bytes_to_write) {
if (! wait_for(WRITE))
goto timeout;
ssize_t bytes_written = _write(
fd, argin_data.data() + bytes_total,
bytes_to_write - bytes_total);
while (bytes_total < bytes_to_write && wait_for(WRITE)) {
ssize_t bytes_written = _write(fd, argin_data.data() + bytes_total,
bytes_to_write - bytes_total);
if ( bytes_written > 0) {
bytes_total += bytes_written;
} else if (bytes_written == 0 && multiplexing == SELECT) {
break;
} else if (bytes_written == 0 && multiplexing == SLEEP) {
/* Ignore */
} else if (bytes_written == 0) {
if (multiplexing == SELECT)
goto error;
/* Continue if multiplexing == SLEEP */
} else { /* bytes_written < 0 */
check_state(false);
break;
goto error;
}
}
timeval twait;
timerclear(&twait);
twait.tv_usec = 1000;
int olength = max(output_queue_length(), 0);
olength = max(output_queue_length(), 0);
while ((bytes_total - olength) != bytes_to_write) {
if (! sleep(twait))
Tango::Except::throw_exception("",
"Timeout expired",
"Socket2::write()");
goto timeout;
timeradd(&twait, &twait, &twait);
olength = max(output_queue_length(), 0);
}
return;
error:
sleep(tout);
timeout:
Tango::Except::throw_exception(
"", "Timeout expired", __PRETTY_FUNCTION__);
/*----- PROTECTED REGION END -----*/ // Socket2::write
}
//--------------------------------------------------------
......@@ -552,34 +561,24 @@ Tango::DevVarCharArray *Socket2::read(Tango::DevLong argin)
Tango::DevVarCharArray *argout;
DEBUG_STREAM << "Socket2::Read() - " << device_name << std::endl;
/*----- PROTECTED REGION ID(Socket2::read) ENABLED START -----*/
try {
if (! init_error.empty()) {
DEBUG_STREAM << init_error << endl;
Tango::Except::throw_exception("",
init_error.c_str(),
"Socket2::read()");
}
if (! init_error.empty()) {
sleep(tout);
Tango::Except::throw_exception(
"", init_error.c_str(), __PRETTY_FUNCTION__);
}
if (argin < 0) {
Tango::Except::throw_exception("",
"Out of limit",
"Socket2::read()");
}
if (common_read(argin) < (size_t)argin) {
Tango::Except::throw_exception("",
"Timeout expired",
"Socket2::read()");
}
argout = new Tango::DevVarCharArray();
vector<unsigned char> transfer(data.begin(), data.begin() + argin);
data.erase(data.begin(), data.begin() + argin);
*argout << transfer;
} catch(...) {
if (argin < 0) {
sleep(tout);
throw;
Tango::Except::throw_exception(
"", "Out of limit", __PRETTY_FUNCTION__);
}
common_read(argin);
argout = new Tango::DevVarCharArray();
vector<unsigned char> transfer(data.begin(), data.begin() + argin);
data.erase(data.begin(), data.begin() + argin);
*argout << transfer;
/*----- PROTECTED REGION END -----*/ // Socket2::read
return argout;
}
......@@ -597,47 +596,35 @@ Tango::DevVarCharArray *Socket2::read_until(const Tango::DevVarCharArray *argin)
Tango::DevVarCharArray *argout;
DEBUG_STREAM << "Socket2::ReadUntil() - " << device_name << std::endl;
/*----- PROTECTED REGION ID(Socket2::read_until) ENABLED START -----*/
try {
if (! init_error.empty()) {
DEBUG_STREAM << init_error << endl;
Tango::Except::throw_exception("",
init_error.c_str(),
"Socket2::read_until()");
}
if (! init_error.empty()) {
sleep(tout);
Tango::Except::throw_exception(
"", init_error.c_str(), __PRETTY_FUNCTION__);
}
if (argin->length() != 1) {
Tango::Except::throw_exception("",
"Delimiter has to be exactly one byte",
"Socket2::read_until()");
}
if (argin->length() != 1) {
sleep(tout);
Tango::Except::throw_exception(
"", "Delimiter has to be exactly one byte", __PRETTY_FUNCTION__);
}
char delim = (*argin)[0];
bool found = false;
size_t pos = 0;
char delim = (*argin)[0];
size_t pos = 0, dsize;
do {
for (; pos < data.size(); ++pos) {
if (memcmp(&data[pos], &delim, 1) == 0) {
found = true;
break;
}
do {
dsize = data.size();
for (; pos < dsize; ++pos) {
if (memcmp(&data[pos], &delim, 1) == 0) {
break;
}
} while (! found && common_read(1));
if (! found) {
Tango::Except::throw_exception("",
"Timeout expired",
"Socket2::read_until()");
}
common_read(dsize + 1);
} while (true);
argout = new Tango::DevVarCharArray();
vector<unsigned char> transfer(data.begin(), data.begin() + pos +1);
data.erase(data.begin(), data.begin() + pos + 1);
*argout << transfer;
} catch(...) {
sleep(tout);
throw;
}
argout = new Tango::DevVarCharArray();
vector<unsigned char> transfer(data.begin(), data.begin() + pos +1);
data.erase(data.begin(), data.begin() + pos + 1);
*argout << transfer;
/*----- PROTECTED REGION END -----*/ // Socket2::read_until
return argout;
}
......@@ -663,7 +650,7 @@ void Socket2::add_dynamic_commands()
bool Socket2::sleep(timeval tv)
{
if (! timerisset(&tout))
return false;
return false;
if (timercmp(&tout, &tv, <)) {
::sleep(tout.tv_sec);
......@@ -691,7 +678,7 @@ void Socket2::open()
if (proto == TCP) {
int flag = 1;
if (::setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag,
sizeof(flag)) == -1) {
sizeof(flag)) == -1) {
::close(fd);
ERROR_STREAM << "Disabling Nagle failed: "
<< string(strerror(errno)) << endl;
......@@ -700,7 +687,7 @@ void Socket2::open()
flag = 1;
if (::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&flag,
sizeof(flag)) == -1) {
sizeof(flag)) == -1) {
::close(fd);
ERROR_STREAM << "Enabling reuseaddr flag failed: "
<< string(strerror(errno)) << endl;
......@@ -756,40 +743,6 @@ void Socket2::close()
data.clear();
}
void Socket2::resolve()
{
DEBUG_STREAM << "Resolving " << hostname << "... " << endl;
char ipstr[INET6_ADDRSTRLEN];
sa_len = sizeof(sa);
::memset(&sa, 0, sa_len);
addrinfo hints;
::memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
addrinfo *res, *p;
if (::inet_pton(AF_INET, hostname.c_str(), &(sa.sin_addr)) > 0) {
sa.sin_family = AF_INET;
sa.sin_port = htons(port);
} else if (::getaddrinfo(hostname.c_str(), NULL, &hints, &res) == 0) {
for (p=res; p!=NULL; p=p->ai_next) {
if (p->ai_family == AF_INET) {
::inet_ntop(p->ai_family,
&(((sockaddr_in *)p->ai_addr)->sin_addr),
ipstr, sizeof ipstr);
sa.sin_addr = ((sockaddr_in *)p->ai_addr)->sin_addr;
sa.sin_family = ((sockaddr_in *)p->ai_addr)->sin_family;
sa.sin_port = htons(port);
}
}
::freeaddrinfo(res);
} else {
ERROR_STREAM << "Name resolution failed" << endl;
}
}
ssize_t Socket2::_write(int fd, const void *buf, size_t count)
{
errno = 0;
......@@ -817,11 +770,11 @@ void Socket2::check_state(bool forcing)
switch(conn_state)
{
case 0: /* Success */
set_state(Tango::ON);
set_status("Connected");
case 0: /* Success */
set_state(Tango::ON);
set_status("Connected");
case EINTR:
case EAGAIN:
case EAGAIN:
return;
case EHOSTUNREACH:
mesg = "No route to host";
......@@ -853,8 +806,8 @@ void Socket2::check_state(bool forcing)
case ENOTDIR:
default:
ERROR_STREAM << "Socket error " << conn_state
<< " not handled!" << endl;
abort();
<< " not handled!" << endl;
assert(false);
break;
}
......@@ -895,46 +848,94 @@ bool Socket2::wait_for(event_type et)
break;
}
int select_ret = select(fd + 1, &readfds, &writefds,
&errorfds, &tout);
if (select_ret == -1 || FD_ISSET(fd, &errorfds)) {
int select_ret = select(fd + 1, &readfds,
&writefds, &errorfds, &tout);
if (select_ret > 0) {
if (FD_ISSET(fd, &errorfds))
return false;
if (et == READ && FD_ISSET(fd, &readfds))
return true;
if (et == WRITE && FD_ISSET(fd, &writefds))
return true;
assert(false);
return false;
} else if (select_ret == 0) {
return false;
} else { // select_ret < 0
ERROR_STREAM << "Select() error " << select_ret
<< " not handled:" << strerror(errno) << endl;
return false;
} else if (select_ret == 0) {
return false;
} else if (et == WRITE && FD_ISSET(fd, &writefds)) {
return true;
} else if (et == READ && FD_ISSET(fd, &readfds)) {
return true;
} else {}
}
assert(false);
return false;
}
size_t Socket2::common_read(size_t bytes_to_read)
void Socket2::common_read(size_t bytes_to_read)
{
unsigned char buffer[10000];
size_t bytes_total = data.size();
while (bytes_total < bytes_to_read && wait_for(READ)) {
while (bytes_total < bytes_to_read) {
if (! wait_for(READ))
goto timeout;
ssize_t bytes_readed = _read(fd, buffer,
min((size_t)max(input_queue_length(), 0),
sizeof(buffer)));
min((size_t)max(input_queue_length(), 0),
sizeof(buffer)));
if (bytes_readed > 0) {
data.insert(data.end(), &buffer[0], &buffer[bytes_readed]);
bytes_total += bytes_readed;
} else if (bytes_readed == 0 && multiplexing == SELECT) {
break;
} else if (bytes_readed == 0 && multiplexing == SLEEP) {
/* Ignore */
} else if (bytes_readed == 0) {
if (multiplexing == SELECT)
goto error;
/* Continue if multiplexing == SLEEP */
} else { /* bytes_readed < 0 */
check_state(true);
break;
goto error;
}
}
return bytes_total;
return;
error:
sleep(tout);
timeout:
Tango::Except::throw_exception(
"", "Timeout expired", __PRETTY_FUNCTION__);
}
void Socket2::resolve()
{
DEBUG_STREAM << "Resolving " << hostname << "... " << endl;
char ipstr[INET6_ADDRSTRLEN];
sa_len = sizeof(sa);
::memset(&sa, 0, sa_len);
addrinfo hints;
::memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
addrinfo *res, *p;
if (::inet_pton(AF_INET, hostname.c_str(), &(sa.sin_addr)) > 0) {
sa.sin_family = AF_INET;
sa.sin_port = htons(port);
} else if (::getaddrinfo(hostname.c_str(), NULL, &hints, &res) == 0) {
for (p=res; p!=NULL; p=p->ai_next) {
if (p->ai_family == AF_INET) {
::inet_ntop(p->ai_family,
&(((sockaddr_in *)p->ai_addr)->sin_addr),
ipstr, sizeof ipstr);
sa.sin_addr = ((sockaddr_in *)p->ai_addr)->sin_addr;
sa.sin_family = ((sockaddr_in *)p->ai_addr)->sin_family;
sa.sin_port = htons(port);
}
}
::freeaddrinfo(res);
} else {
ERROR_STREAM << "Name resolution failed" << endl;
}
}
/*----- PROTECTED REGION END -----*/ // Socket2::namespace_ending
......
......@@ -264,12 +264,12 @@ public:
int input_queue_length();
int output_queue_length();
void close();
void resolve();
ssize_t _write(int, const void*, size_t);
ssize_t _read(int, void*, size_t);
void check_state(bool);
bool wait_for(event_type);
size_t common_read(size_t);
void common_read(size_t);
void resolve();
/*----- PROTECTED REGION END -----*/ // Socket2::Additional Method prototypes
};
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment