Subject: Overpass API developpement
List archive
- From: Roland Olbricht <>
- To:
- Subject: Sockaddr on OS X (was Re: [overpass] Re: Where is bin/init_osm3s.sh?)
- Date: Sat, 13 Jun 2015 13:42:11 +0200
Hi all,
In the meanwhile, please check whether this is really
"osm3s_v0.7.52_osm_bas" and not "osm3s_v0.7.52_osm_base"
I'm sorry for the slow reply. It looks like Apple does a system call slightly different than the rest of the Unix world. The man page
https://developer.apple.com/library/mac/documentation/Darwin/Reference/ManPages/man4/unix.4.html
refers to a field "sun_len" which doesn't exist elswhere. I've tried to add this conditionally to the source code.
Plese replace the files
src/template_db/dispatcher.cc
src/template_db/dispatcher_client.cc
by the attached files. Then please recompile (with "make").
These files should contain as only difference the added block
#ifdef __APPLE__
local.sun_len = socket_name.size() + 1;
#endif
which hopefully compiles. This sets explicitly the correct length for the socket filenames.
Best regards,
Roland
/** Copyright 2008, 2009, 2010, 2011, 2012 Roland Olbricht * * This file is part of Template_DB. * * Template_DB is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. * * Template_DB is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with Template_DB. If not, see <http://www.gnu.org/licenses/>. */ #include "dispatcher.h" #include <errno.h> #include <fcntl.h> #include <signal.h> #include <sys/mman.h> #include <sys/select.h> #include <sys/socket.h> #include <sys/stat.h> #include <sys/time.h> #include <sys/types.h> #include <sys/un.h> #include <unistd.h> #include <cstdlib> #include <cstring> #include <fstream> #include <iostream> #include <vector> Dispatcher_Socket::Dispatcher_Socket (const std::string& dispatcher_share_name, const std::string& shadow_name_, const std::string& db_dir_, uint max_num_reading_processes) { signal(SIGPIPE, SIG_IGN); std::string shadow_name = shadow_name_; std::string db_dir = db_dir_; // get the absolute pathname of the current directory if (db_dir.substr(0, 1) != "/") db_dir = getcwd() + db_dir_; if (shadow_name.substr(0, 1) != "/") shadow_name = getcwd() + shadow_name_; // initialize the socket for the server socket_name = db_dir + dispatcher_share_name; socket_descriptor = socket(AF_UNIX, SOCK_STREAM, 0); if (socket_descriptor == -1) throw File_Error (errno, socket_name, "Dispatcher_Server::2"); if (fcntl(socket_descriptor, F_SETFL, O_RDWR|O_NONBLOCK) == -1) throw File_Error (errno, socket_name, "Dispatcher_Server::3"); struct sockaddr_un local; local.sun_family = AF_UNIX; strcpy(local.sun_path, socket_name.c_str()); #ifdef __APPLE__ local.sun_len = socket_name.size() + 1; #endif if (bind(socket_descriptor, (struct sockaddr*)&local, sizeof(local.sun_family) + strlen(local.sun_path)) == -1) throw File_Error (errno, socket_name, "Dispatcher_Server::4"); if (chmod(socket_name.c_str(), S_666) == -1) throw File_Error (errno, socket_name, "Dispatcher_Server::8"); if (listen(socket_descriptor, max_num_reading_processes) == -1) throw File_Error (errno, socket_name, "Dispatcher_Server::5"); } Dispatcher_Socket::~Dispatcher_Socket() { close(socket_descriptor); remove(socket_name.c_str()); } void Dispatcher_Socket::look_for_a_new_connection(Connection_Per_Pid_Map& connection_per_pid) { struct sockaddr_un sockaddr_un_dummy; uint sockaddr_un_dummy_size = sizeof(sockaddr_un_dummy); int socket_fd = accept(socket_descriptor, (sockaddr*)&sockaddr_un_dummy, (socklen_t*)&sockaddr_un_dummy_size); if (socket_fd == -1) { if (errno != EAGAIN && errno != EWOULDBLOCK) throw File_Error (errno, "(socket)", "Dispatcher_Server::6"); } else { if (fcntl(socket_fd, F_SETFL, O_RDWR|O_NONBLOCK) == -1) throw File_Error (errno, "(socket)", "Dispatcher_Server::7"); started_connections.push_back(socket_fd); } // associate to a new connection the pid of the sender for (std::vector< int >::iterator it = started_connections.begin(); it != started_connections.end(); ++it) { pid_t pid; int bytes_read = recv(*it, &pid, sizeof(pid_t), 0); if (bytes_read == -1) ; else { if (bytes_read != 0) connection_per_pid.set(pid, new Blocking_Client_Socket(*it)); else close(*it); *it = started_connections.back(); started_connections.pop_back(); break; } } } int Global_Resource_Planner::probe(uint32 pid, uint32 client_token, uint32 time_units, uint64 max_space) { if (rate_limit > 0 && client_token > 0) { uint32 token_count = 0; for (std::vector< Reader_Entry >::const_iterator it = active.begin(); it != active.end(); ++it) { if (it->client_token == client_token) ++token_count; } if (token_count >= rate_limit) return Dispatcher::RATE_LIMITED; uint32 current_time = time(0); for (std::vector< Quota_Entry >::iterator it = afterwards.begin(); it != afterwards.end(); ) { if (it->expiration_time < current_time) { *it = afterwards.back(); afterwards.pop_back(); } else { if (it->client_token == client_token) ++token_count; ++it; } } if (token_count >= rate_limit) return Dispatcher::RATE_LIMITED; } // Simple checks: is the query acceptable from a global point of view? if (time_units > (global_available_time - global_used_time)/2 || max_space > (global_available_space - global_used_space)/2) return 0; active.push_back(Reader_Entry(pid, max_space, time_units, client_token, time(0))); global_used_space += max_space; global_used_time += time_units; return Dispatcher::REQUEST_READ_AND_IDX; } void Global_Resource_Planner::remove_entry(std::vector< Reader_Entry >::iterator& it) { uint32 end_time = time(0); if (last_update_time < end_time && last_counted > 0) { if (end_time - last_update_time < 15) { for (uint32 i = last_update_time; i < end_time; ++i) { recent_average_used_space[i % 15] = last_used_space / last_counted; recent_average_used_time[i % 15] = last_used_time / last_counted; } } else { for (uint32 i = 0; i < 15; ++i) { recent_average_used_space[i] = last_used_space / last_counted; recent_average_used_time[i] = last_used_time / last_counted; } } average_used_space = 0; average_used_time = 0; for (uint32 i = 0; i < 15; ++i) { average_used_space += recent_average_used_space[i]; average_used_time += recent_average_used_time[i]; } average_used_space = average_used_space / 15; average_used_time = average_used_time / 15; last_used_space = 0; last_used_time = 0; last_counted = 0; last_update_time = end_time; } last_used_space += global_used_space; last_used_time += global_used_time; ++last_counted; // Adjust global counters global_used_space -= it->max_space; global_used_time -= it->max_time; if (rate_limit > 0 && it->client_token > 0) { // Calculate afterwards blocking time uint32 penalty_time = std::max(global_available_space * (end_time - it->start_time + 1) / (global_available_space - average_used_space), uint64(global_available_time) * (end_time - it->start_time + 1) / (global_available_time - average_used_time)) - (end_time - it->start_time + 1); afterwards.push_back(Quota_Entry(it->client_token, penalty_time + end_time)); } // Really remove the element *it = active.back(); active.pop_back(); } void Global_Resource_Planner::remove(uint32 pid) { for (std::vector< Reader_Entry >::iterator it = active.begin(); it != active.end(); ++it) { if (it->client_pid == pid) { remove_entry(it); break; } } } void Global_Resource_Planner::purge(Connection_Per_Pid_Map& connection_per_pid) { for (std::vector< Reader_Entry >::iterator it = active.begin(); it != active.end(); ) { if (connection_per_pid.get(it->client_pid) == 0) remove_entry(it); else ++it; } } Dispatcher::Dispatcher (std::string dispatcher_share_name_, std::string index_share_name, std::string shadow_name_, std::string db_dir_, uint max_num_reading_processes_, uint purge_timeout_, uint64 total_available_space_, uint64 total_available_time_units_, const std::vector< File_Properties* >& controlled_files_, Dispatcher_Logger* logger_) : socket(dispatcher_share_name_, shadow_name_, db_dir_, max_num_reading_processes_), controlled_files(controlled_files_), data_footprints(controlled_files_.size()), map_footprints(controlled_files_.size()), shadow_name(shadow_name_), db_dir(db_dir_), dispatcher_share_name(dispatcher_share_name_), logger(logger_), pending_commit(false), requests_started_counter(0), requests_finished_counter(0), global_resource_planner(total_available_time_units_, total_available_space_, 0) { signal(SIGPIPE, SIG_IGN); // get the absolute pathname of the current directory if (db_dir.substr(0, 1) != "/") db_dir = getcwd() + db_dir_; if (shadow_name.substr(0, 1) != "/") shadow_name = getcwd() + shadow_name_; // open dispatcher_share #ifdef __APPLE__ dispatcher_shm_fd = shm_open (dispatcher_share_name.c_str(), O_RDWR|O_CREAT, S_666); if (dispatcher_shm_fd < 0) throw File_Error (errno, dispatcher_share_name, "Dispatcher_Server::APPLE::1"); #else dispatcher_shm_fd = shm_open (dispatcher_share_name.c_str(), O_RDWR|O_CREAT|O_TRUNC|O_EXCL, S_666); if (dispatcher_shm_fd < 0) throw File_Error (errno, dispatcher_share_name, "Dispatcher_Server::1"); fchmod(dispatcher_shm_fd, S_666); #endif int foo = ftruncate(dispatcher_shm_fd, SHM_SIZE + db_dir.size() + shadow_name.size()); foo = foo; dispatcher_shm_ptr = (uint8*)mmap (0, SHM_SIZE + db_dir.size() + shadow_name.size(), PROT_READ|PROT_WRITE, MAP_SHARED, dispatcher_shm_fd, 0); // copy db_dir and shadow_name *(uint32*)(dispatcher_shm_ptr + 3*sizeof(uint32)) = db_dir.size(); memcpy((uint8*)dispatcher_shm_ptr + 4*sizeof(uint32), db_dir.data(), db_dir.size()); *(uint32*)(dispatcher_shm_ptr + 4*sizeof(uint32) + db_dir.size()) = shadow_name.size(); memcpy((uint8*)dispatcher_shm_ptr + 5*sizeof(uint32) + db_dir.size(), shadow_name.data(), shadow_name.size()); // Set command state to zero. *(uint32*)dispatcher_shm_ptr = 0; if (file_exists(shadow_name)) { copy_shadows_to_mains(); remove(shadow_name.c_str()); } remove_shadows(); remove((shadow_name + ".lock").c_str()); set_current_footprints(); } Dispatcher::~Dispatcher() { munmap((void*)dispatcher_shm_ptr, SHM_SIZE + db_dir.size() + shadow_name.size()); shm_unlink(dispatcher_share_name.c_str()); } void Dispatcher::write_start(pid_t pid) { // Lock the writing lock file for the client. try { Raw_File shadow_file(shadow_name + ".lock", O_RDWR|O_CREAT|O_EXCL, S_666, "write_start:1"); copy_mains_to_shadows(); std::vector< pid_t > registered = write_index_of_empty_blocks(); if (logger) logger->write_start(pid, registered); } catch (File_Error e) { if ((e.error_number == EEXIST) && (e.filename == (shadow_name + ".lock"))) { pid_t locked_pid; std::ifstream lock((shadow_name + ".lock").c_str()); lock>>locked_pid; if (locked_pid == pid) return; } std::cerr<<"File_Error "<<e.error_number<<' '<<strerror(e.error_number)<<' '<<e.filename<<' '<<e.origin<<'\n'; return; } try { std::ofstream lock((shadow_name + ".lock").c_str()); lock<<pid; } catch (...) {} } void Dispatcher::write_rollback(pid_t pid) { if (logger) logger->write_rollback(pid); remove_shadows(); remove((shadow_name + ".lock").c_str()); } void Dispatcher::write_commit(pid_t pid) { if (!processes_reading_idx.empty()) { pending_commit = true; return; } pending_commit = false; if (logger) logger->write_commit(pid); try { Raw_File shadow_file(shadow_name, O_RDWR|O_CREAT|O_EXCL, S_666, "write_commit:1"); copy_shadows_to_mains(); } catch (File_Error e) { std::cerr<<"File_Error "<<e.error_number<<' '<<strerror(e.error_number)<<' '<<e.filename<<' '<<e.origin<<'\n'; return; } remove(shadow_name.c_str()); remove_shadows(); remove((shadow_name + ".lock").c_str()); set_current_footprints(); } void Dispatcher::request_read_and_idx(pid_t pid, uint32 max_allowed_time, uint64 max_allowed_space, uint32 client_token) { if (logger) logger->request_read_and_idx(pid, max_allowed_time, max_allowed_space); ++requests_started_counter; for (std::vector< Idx_Footprints >::iterator it(data_footprints.begin()); it != data_footprints.end(); ++it) it->register_pid(pid); for (std::vector< Idx_Footprints >::iterator it(map_footprints.begin()); it != map_footprints.end(); ++it) it->register_pid(pid); processes_reading_idx.insert(pid); } void Dispatcher::read_idx_finished(pid_t pid) { if (logger) logger->read_idx_finished(pid); processes_reading_idx.erase(pid); } void Dispatcher::read_finished(pid_t pid) { if (logger) logger->read_finished(pid); ++requests_finished_counter; for (std::vector< Idx_Footprints >::iterator it(data_footprints.begin()); it != data_footprints.end(); ++it) it->unregister_pid(pid); for (std::vector< Idx_Footprints >::iterator it(map_footprints.begin()); it != map_footprints.end(); ++it) it->unregister_pid(pid); processes_reading_idx.erase(pid); disconnected.erase(pid); global_resource_planner.remove(pid); } void Dispatcher::read_aborted(pid_t pid) { if (logger) logger->read_aborted(pid); for (std::vector< Idx_Footprints >::iterator it(data_footprints.begin()); it != data_footprints.end(); ++it) it->unregister_pid(pid); for (std::vector< Idx_Footprints >::iterator it(map_footprints.begin()); it != map_footprints.end(); ++it) it->unregister_pid(pid); processes_reading_idx.erase(pid); disconnected.erase(pid); global_resource_planner.remove(pid); } void Dispatcher::copy_shadows_to_mains() { for (std::vector< File_Properties* >::const_iterator it(controlled_files.begin()); it != controlled_files.end(); ++it) { copy_file(db_dir + (*it)->get_file_name_trunk() + (*it)->get_data_suffix() + (*it)->get_index_suffix() + (*it)->get_shadow_suffix(), db_dir + (*it)->get_file_name_trunk() + (*it)->get_data_suffix() + (*it)->get_index_suffix()); copy_file(db_dir + (*it)->get_file_name_trunk() + (*it)->get_id_suffix() + (*it)->get_index_suffix() + (*it)->get_shadow_suffix(), db_dir + (*it)->get_file_name_trunk() + (*it)->get_id_suffix() + (*it)->get_index_suffix()); } } void Dispatcher::copy_mains_to_shadows() { for (std::vector< File_Properties* >::const_iterator it(controlled_files.begin()); it != controlled_files.end(); ++it) { copy_file(db_dir + (*it)->get_file_name_trunk() + (*it)->get_data_suffix() + (*it)->get_index_suffix(), db_dir + (*it)->get_file_name_trunk() + (*it)->get_data_suffix() + (*it)->get_index_suffix() + (*it)->get_shadow_suffix()); copy_file(db_dir + (*it)->get_file_name_trunk() + (*it)->get_id_suffix() + (*it)->get_index_suffix(), db_dir + (*it)->get_file_name_trunk() + (*it)->get_id_suffix() + (*it)->get_index_suffix() + (*it)->get_shadow_suffix()); } } void Dispatcher::remove_shadows() { for (std::vector< File_Properties* >::const_iterator it(controlled_files.begin()); it != controlled_files.end(); ++it) { remove((db_dir + (*it)->get_file_name_trunk() + (*it)->get_data_suffix() + (*it)->get_index_suffix() + (*it)->get_shadow_suffix()).c_str()); remove((db_dir + (*it)->get_file_name_trunk() + (*it)->get_id_suffix() + (*it)->get_index_suffix() + (*it)->get_shadow_suffix()).c_str()); remove((db_dir + (*it)->get_file_name_trunk() + (*it)->get_data_suffix() + (*it)->get_shadow_suffix()).c_str()); remove((db_dir + (*it)->get_file_name_trunk() + (*it)->get_id_suffix() + (*it)->get_shadow_suffix()).c_str()); } } void Dispatcher::set_current_footprints() { for (std::vector< File_Properties* >::size_type i = 0; i < controlled_files.size(); ++i) { try { data_footprints[i].set_current_footprint (controlled_files[i]->get_data_footprint(db_dir)); } catch (File_Error e) { std::cerr<<"File_Error "<<e.error_number<<' '<<strerror(e.error_number)<<' '<<e.filename<<' '<<e.origin<<'\n'; } catch (...) {} try { map_footprints[i].set_current_footprint (controlled_files[i]->get_map_footprint(db_dir)); } catch (File_Error e) { std::cerr<<"File_Error "<<e.error_number<<' '<<strerror(e.error_number)<<' '<<e.filename<<' '<<e.origin<<'\n'; } catch (...) {} } } void write_to_index_empty_file_data(const std::vector< bool >& footprint, const std::string& filename) { Void_Pointer< std::pair< uint32, uint32 > > buffer(footprint.size() * 8); std::pair< uint32, uint32 >* pos = buffer.ptr; uint32 last_start = 0; for (uint32 i = 0; i < footprint.size(); ++i) { if (footprint[i]) { if (last_start < i) { *pos = std::make_pair(i - last_start, last_start); ++pos; } last_start = i+1; } } if (last_start < footprint.size()) *pos = std::make_pair(footprint.size() - last_start, last_start); Raw_File file(filename, O_RDWR|O_CREAT|O_TRUNC, S_666, "write_to_index_empty_file_data:1"); file.write((uint8*)buffer.ptr, ((uint8*)pos) - ((uint8*)buffer.ptr), "Dispatcher:26"); } void write_to_index_empty_file_ids(const std::vector< bool >& footprint, const std::string& filename) { Void_Pointer< uint32 > buffer(footprint.size() * 4); uint32* pos = buffer.ptr; for (uint32 i = 0; i < footprint.size(); ++i) { if (!footprint[i]) { *pos = i; ++pos; } } Raw_File file(filename, O_RDWR|O_CREAT|O_TRUNC, S_666, "write_to_index_empty_file_ids:1"); file.write((uint8*)buffer.ptr, ((uint8*)pos) - ((uint8*)buffer.ptr), "Dispatcher:36"); } std::vector< Dispatcher::pid_t > Dispatcher::write_index_of_empty_blocks() { std::set< pid_t > registered; for (std::vector< Idx_Footprints >::iterator it(data_footprints.begin()); it != data_footprints.end(); ++it) { std::vector< Idx_Footprints::pid_t > registered_processes = it->registered_processes(); for (std::vector< Idx_Footprints::pid_t >::const_iterator it = registered_processes.begin(); it != registered_processes.end(); ++it) registered.insert(*it); } for (std::vector< Idx_Footprints >::iterator it(map_footprints.begin()); it != map_footprints.end(); ++it) { std::vector< Idx_Footprints::pid_t > registered_processes = it->registered_processes(); for (std::vector< Idx_Footprints::pid_t >::const_iterator it = registered_processes.begin(); it != registered_processes.end(); ++it) registered.insert(*it); } for (std::vector< File_Properties* >::size_type i = 0; i < controlled_files.size(); ++i) { if (file_exists(db_dir + controlled_files[i]->get_file_name_trunk() + controlled_files[i]->get_data_suffix() + controlled_files[i]->get_index_suffix() + controlled_files[i]->get_shadow_suffix())) { write_to_index_empty_file_data (data_footprints[i].total_footprint(), db_dir + controlled_files[i]->get_file_name_trunk() + controlled_files[i]->get_data_suffix() + controlled_files[i]->get_shadow_suffix()); } if (file_exists(db_dir + controlled_files[i]->get_file_name_trunk() + controlled_files[i]->get_id_suffix() + controlled_files[i]->get_index_suffix() + controlled_files[i]->get_shadow_suffix())) { write_to_index_empty_file_ids (map_footprints[i].total_footprint(), db_dir + controlled_files[i]->get_file_name_trunk() + controlled_files[i]->get_id_suffix() + controlled_files[i]->get_shadow_suffix()); } } std::vector< pid_t > registered_v; registered_v.assign(registered.begin(), registered.end()); return registered_v; } void Dispatcher::standby_loop(uint64 milliseconds) { uint32 counter = 0; uint32 idle_counter = 0; while ((milliseconds == 0) || (counter < milliseconds/100)) { socket.look_for_a_new_connection(connection_per_pid); uint32 command = 0; uint32 client_pid = 0; connection_per_pid.poll_command_round_robin(command, client_pid); if (command == HANGUP) command = READ_ABORTED; if (command == 0) { ++counter; ++idle_counter; millisleep(idle_counter < 10 ? idle_counter*10 : 100); continue; } if (idle_counter > 0) { if (logger) logger->idle_counter(idle_counter); idle_counter = 0; } try { if (command == TERMINATE || command == OUTPUT_STATUS) { if (command == OUTPUT_STATUS) output_status(); connection_per_pid.get(client_pid)->send_result(command); connection_per_pid.set(client_pid, 0); if (command == TERMINATE) break; } else if (command == WRITE_START || command == WRITE_ROLLBACK || command == WRITE_COMMIT) { if (command == WRITE_START) { check_and_purge(); write_start(client_pid); } else if (command == WRITE_COMMIT) { check_and_purge(); write_commit(client_pid); } else if (command == WRITE_ROLLBACK) write_rollback(client_pid); connection_per_pid.get(client_pid)->send_result(command); } else if (command == HANGUP || command == READ_ABORTED || command == READ_FINISHED) { if (command == READ_ABORTED) read_aborted(client_pid); else if (command == READ_FINISHED) { read_finished(client_pid); connection_per_pid.get(client_pid)->send_result(command); } connection_per_pid.set(client_pid, 0); } else if (command == READ_IDX_FINISHED) { read_idx_finished(client_pid); if (connection_per_pid.get(client_pid) != 0) connection_per_pid.get(client_pid)->send_result(command); } else if (command == REQUEST_READ_AND_IDX) { std::vector< uint32 > arguments = connection_per_pid.get(client_pid)->get_arguments(4); if (arguments.size() < 4) { connection_per_pid.get(client_pid)->send_result(0); continue; } uint32 max_allowed_time = arguments[0]; uint64 max_allowed_space = (((uint64)arguments[2])<<32 | arguments[1]); uint32 client_token = arguments[3]; if (pending_commit) { connection_per_pid.get(client_pid)->send_result(0); continue; } command = global_resource_planner.probe(client_pid, client_token, max_allowed_time, max_allowed_space); if (command == REQUEST_READ_AND_IDX) request_read_and_idx(client_pid, max_allowed_time, max_allowed_space, client_token); connection_per_pid.get(client_pid)->send_result(command); } else if (command == PURGE) { std::vector< uint32 > arguments = connection_per_pid.get(client_pid)->get_arguments(1); if (arguments.size() < 1) continue; uint32 target_pid = arguments[0]; read_aborted(target_pid); if (connection_per_pid.get(target_pid) != 0) { connection_per_pid.get(target_pid)->send_result(READ_FINISHED); connection_per_pid.set(target_pid, 0); } connection_per_pid.get(client_pid)->send_result(command); } else if (command == QUERY_BY_TOKEN) { std::vector< uint32 > arguments = connection_per_pid.get(client_pid)->get_arguments(1); if (arguments.size() < 1) continue; uint32 target_token = arguments[0]; pid_t target_pid = 0; for (std::vector< Reader_Entry >::const_iterator it = global_resource_planner.get_active().begin(); it != global_resource_planner.get_active().end(); ++it) { if (it->client_token == target_token) target_pid = it->client_pid; } connection_per_pid.get(client_pid)->send_result(target_pid); } else if (command == SET_GLOBAL_LIMITS) { std::vector< uint32 > arguments = connection_per_pid.get(client_pid)->get_arguments(5); if (arguments.size() < 5) continue; uint64 new_total_available_space = (((uint64)arguments[1])<<32 | arguments[0]); uint64 new_total_available_time_units = (((uint64)arguments[3])<<32 | arguments[2]); int rate_limit_ = arguments[4]; if (new_total_available_space > 0) global_resource_planner.set_total_available_space(new_total_available_space); if (new_total_available_time_units > 0) global_resource_planner.set_total_available_time(new_total_available_time_units); if (rate_limit_ > -1) global_resource_planner.set_rate_limit(rate_limit_); connection_per_pid.get(client_pid)->send_result(command); } } catch (File_Error e) { std::cerr<<"File_Error "<<e.error_number<<' '<<strerror(e.error_number)<<' '<<e.filename<<' '<<e.origin<<'\n'; counter += 30; millisleep(3000); // Set command state to zero. *(uint32*)dispatcher_shm_ptr = 0; } } } void Dispatcher::output_status() { try { std::ofstream status((shadow_name + ".status").c_str()); status<<"Number of not yet opened connections: "<<socket.num_started_connections()<<'\n' <<"Number of connected clients: "<<connection_per_pid.base_map().size()<<'\n' <<"Rate limit: "<<global_resource_planner.get_rate_limit()<<'\n' <<"Total available space: "<<global_resource_planner.get_total_available_space()<<'\n' <<"Total claimed space: "<<global_resource_planner.get_total_claimed_space()<<'\n' <<"Average claimed space: "<<global_resource_planner.get_average_claimed_space()<<'\n' <<"Total available time units: "<<global_resource_planner.get_total_available_time()<<'\n' <<"Total claimed time units: "<<global_resource_planner.get_total_claimed_time()<<'\n' <<"Average claimed time units: "<<global_resource_planner.get_average_claimed_time()<<'\n' <<"Counter of started requests: "<<requests_started_counter<<'\n' <<"Counter of finished requests: "<<requests_finished_counter<<'\n'; std::set< pid_t > collected_pids; for (std::vector< Reader_Entry >::const_iterator it = global_resource_planner.get_active().begin(); it != global_resource_planner.get_active().end(); ++it) { if (processes_reading_idx.find(it->client_pid) != processes_reading_idx.end()) status<<REQUEST_READ_AND_IDX; else status<<READ_IDX_FINISHED; status<<' '<<it->client_pid<<' '<<it->client_token<<' ' <<it->max_space<<' '<<it->max_time<<' '<<it->start_time<<'\n'; collected_pids.insert(it->client_pid); } for (std::vector< Idx_Footprints >::iterator it(data_footprints.begin()); it != data_footprints.end(); ++it) { std::vector< Idx_Footprints::pid_t > registered_processes = it->registered_processes(); for (std::vector< Idx_Footprints::pid_t >::const_iterator it = registered_processes.begin(); it != registered_processes.end(); ++it) collected_pids.insert(*it); } for (std::vector< Idx_Footprints >::iterator it(map_footprints.begin()); it != map_footprints.end(); ++it) { std::vector< Idx_Footprints::pid_t > registered_processes = it->registered_processes(); for (std::vector< Idx_Footprints::pid_t >::const_iterator it = registered_processes.begin(); it != registered_processes.end(); ++it) collected_pids.insert(*it); } for (std::map< pid_t, Blocking_Client_Socket* >::const_iterator it = connection_per_pid.base_map().begin(); it != connection_per_pid.base_map().end(); ++it) { if (processes_reading_idx.find(it->first) == processes_reading_idx.end() && collected_pids.find(it->first) == collected_pids.end()) status<<"pending\t"<<it->first<<'\n'; } for (std::vector< Quota_Entry >::const_iterator it = global_resource_planner.get_afterwards().begin(); it != global_resource_planner.get_afterwards().end(); ++it) { status<<"quota\t"<<it->client_token<<' '<<it->expiration_time<<'\n'; } } catch (...) {} } void Idx_Footprints::set_current_footprint(const std::vector< bool >& footprint) { current_footprint = footprint; } void Idx_Footprints::register_pid(pid_t pid) { footprint_per_pid[pid] = current_footprint; } void Idx_Footprints::unregister_pid(pid_t pid) { footprint_per_pid.erase(pid); } std::vector< Idx_Footprints::pid_t > Idx_Footprints::registered_processes() const { std::vector< pid_t > result; for (std::map< pid_t, std::vector< bool > >::const_iterator it(footprint_per_pid.begin()); it != footprint_per_pid.end(); ++it) result.push_back(it->first); return result; } std::vector< bool > Idx_Footprints::total_footprint() const { std::vector< bool > result = current_footprint; for (std::map< pid_t, std::vector< bool > >::const_iterator it(footprint_per_pid.begin()); it != footprint_per_pid.end(); ++it) { // By construction, it->second.size() <= result.size() for (std::vector< bool >::size_type i = 0; i < it->second.size(); ++i) result[i] = result[i] | (it->second)[i]; } return result; } void Dispatcher::check_and_purge() { global_resource_planner.purge(connection_per_pid); }
/** Copyright 2008, 2009, 2010, 2011, 2012 Roland Olbricht * * This file is part of Template_DB. * * Template_DB is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. * * Template_DB is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with Template_DB. If not, see <http://www.gnu.org/licenses/>. */ #include "dispatcher_client.h" #include "dispatcher.h" #include <fcntl.h> #include <signal.h> #include <sys/mman.h> #include <sys/socket.h> #include <sys/stat.h> #include <sys/types.h> #include <sys/un.h> #include <cstdlib> #include <cstring> #include <fstream> #include <iostream> Dispatcher_Client::Dispatcher_Client (const std::string& dispatcher_share_name_) : dispatcher_share_name(dispatcher_share_name_) { signal(SIGPIPE, SIG_IGN); // open dispatcher_share dispatcher_shm_fd = shm_open (dispatcher_share_name.c_str(), O_RDWR, S_666); if (dispatcher_shm_fd < 0) throw File_Error (errno, dispatcher_share_name, "Dispatcher_Client::1"); struct stat stat_buf; fstat(dispatcher_shm_fd, &stat_buf); dispatcher_shm_ptr = (uint8*)mmap (0, stat_buf.st_size, PROT_READ|PROT_WRITE, MAP_SHARED, dispatcher_shm_fd, 0); // get db_dir and shadow_name db_dir = std::string((const char *)(dispatcher_shm_ptr + 4*sizeof(uint32)), *(uint32*)(dispatcher_shm_ptr + 3*sizeof(uint32))); shadow_name = std::string((const char *)(dispatcher_shm_ptr + 5*sizeof(uint32) + db_dir.size()), *(uint32*)(dispatcher_shm_ptr + db_dir.size() + 4*sizeof(uint32))); // initialize the socket for the client std::string socket_name = db_dir + dispatcher_share_name_; socket_descriptor = socket(AF_UNIX, SOCK_STREAM, 0); if (socket_descriptor == -1) throw File_Error (errno, socket_name, "Dispatcher_Client::2"); struct sockaddr_un local; local.sun_family = AF_UNIX; strcpy(local.sun_path, socket_name.c_str()); #ifdef __APPLE__ local.sun_len = socket_name.size() + 1; #endif if (connect(socket_descriptor, (struct sockaddr*)&local, sizeof(local.sun_family) + strlen(local.sun_path)) == -1) throw File_Error (errno, socket_name, "Dispatcher_Client::3"); pid_t pid = getpid(); if (send(socket_descriptor, &pid, sizeof(pid_t), 0) == -1) throw File_Error(errno, dispatcher_share_name, "Dispatcher_Client::4"); } Dispatcher_Client::~Dispatcher_Client() { close(socket_descriptor); munmap((void*)dispatcher_shm_ptr, Dispatcher::SHM_SIZE + db_dir.size() + shadow_name.size()); close(dispatcher_shm_fd); } template< class TObject > void Dispatcher_Client::send_message(TObject message, const std::string& source_pos) { if (send(socket_descriptor, &message, sizeof(TObject), 0) == -1) throw File_Error(errno, dispatcher_share_name, source_pos); } uint32 Dispatcher_Client::ack_arrived() { uint32 answer = 0; int bytes_read = recv(socket_descriptor, &answer, sizeof(uint32), 0); while (bytes_read == -1) { millisleep(50); bytes_read = recv(socket_descriptor, &answer, sizeof(uint32), 0); } if (bytes_read == sizeof(uint32)) return answer; return 0; // uint32 pid = getpid(); // if (*(uint32*)(dispatcher_shm_ptr + 2*sizeof(uint32)) == pid) // return 1; // millisleep(50); // return (*(uint32*)(dispatcher_shm_ptr + 2*sizeof(uint32)) == pid ? 1 : 0); } void Dispatcher_Client::write_start() { pid_t pid = getpid(); send_message(Dispatcher::WRITE_START, "Dispatcher_Client::write_start::socket"); while (true) { if (ack_arrived() && file_exists(shadow_name + ".lock")) { try { pid_t locked_pid = 0; std::ifstream lock((shadow_name + ".lock").c_str()); lock>>locked_pid; if (locked_pid == pid) return; } catch (...) {} } millisleep(500); } } void Dispatcher_Client::write_rollback() { pid_t pid = getpid(); send_message(Dispatcher::WRITE_ROLLBACK, "Dispatcher_Client::write_rollback::socket"); while (true) { if (ack_arrived()) { if (file_exists(shadow_name + ".lock")) { try { pid_t locked_pid; std::ifstream lock((shadow_name + ".lock").c_str()); lock>>locked_pid; if (locked_pid != pid) return; } catch (...) {} } else return; } millisleep(500); } } void Dispatcher_Client::write_commit() { pid_t pid = getpid(); send_message(Dispatcher::WRITE_COMMIT, "Dispatcher_Client::write_commit::socket"); millisleep(200); while (true) { if (ack_arrived()) { if (file_exists(shadow_name + ".lock")) { try { pid_t locked_pid; std::ifstream lock((shadow_name + ".lock").c_str()); lock>>locked_pid; if (locked_pid != pid) return; } catch (...) {} } else return; } send_message(Dispatcher::WRITE_COMMIT, "Dispatcher_Client::write_commit::socket"); millisleep(200); } } void Dispatcher_Client::request_read_and_idx(uint32 max_allowed_time, uint64 max_allowed_space, uint32 client_token) { // *(uint32*)(dispatcher_shm_ptr + 2*sizeof(uint32)) = 0; uint counter = 0; uint32 ack = 0; while (++counter <= 50) { send_message(Dispatcher::REQUEST_READ_AND_IDX, "Dispatcher_Client::request_read_and_idx::socket::1"); send_message(max_allowed_time, "Dispatcher_Client::request_read_and_idx::socket::2"); send_message(max_allowed_space, "Dispatcher_Client::request_read_and_idx::socket::3"); send_message(client_token, "Dispatcher_Client::request_read_and_idx::socket::4"); ack = ack_arrived(); if (ack != 0 && ack != Dispatcher::RATE_LIMITED) return; millisleep(300); } if (ack == Dispatcher::RATE_LIMITED) throw File_Error(0, dispatcher_share_name, "Dispatcher_Client::request_read_and_idx::rate_limited"); else throw File_Error(0, dispatcher_share_name, "Dispatcher_Client::request_read_and_idx::timeout"); } void Dispatcher_Client::read_idx_finished() { // *(uint32*)(dispatcher_shm_ptr + 2*sizeof(uint32)) = 0; uint counter = 0; while (++counter <= 300) { send_message(Dispatcher::READ_IDX_FINISHED, "Dispatcher_Client::read_idx_finished::socket"); if (ack_arrived()) return; } throw File_Error(0, dispatcher_share_name, "Dispatcher_Client::read_idx_finished::timeout"); } void Dispatcher_Client::read_finished() { // *(uint32*)(dispatcher_shm_ptr + 2*sizeof(uint32)) = 0; uint counter = 0; while (++counter <= 300) { send_message(Dispatcher::READ_FINISHED, "Dispatcher_Client::read_finished::socket"); if (ack_arrived()) return; } throw File_Error(0, dispatcher_share_name, "Dispatcher_Client::read_finished::timeout"); } void Dispatcher_Client::purge(uint32 pid) { // *(uint32*)(dispatcher_shm_ptr + 2*sizeof(uint32)) = 0; while (true) { send_message(Dispatcher::PURGE, "Dispatcher_Client::purge::socket::1"); send_message(pid, "Dispatcher_Client::purge::socket::2"); if (ack_arrived()) return; } } pid_t Dispatcher_Client::query_by_token(uint32 token) { // *(uint32*)(dispatcher_shm_ptr + 2*sizeof(uint32)) = 0; send_message(Dispatcher::QUERY_BY_TOKEN, "Dispatcher_Client::query_by_token::socket::1"); send_message(token, "Dispatcher_Client::query_by_token::socket::2"); return ack_arrived(); } void Dispatcher_Client::set_global_limits(uint64 max_allowed_space, uint64 max_allowed_time_units, int rate_limit) { // *(uint32*)(dispatcher_shm_ptr + 2*sizeof(uint32)) = 0; while (true) { send_message(Dispatcher::SET_GLOBAL_LIMITS, "Dispatcher_Client::set_global_limits::1"); send_message(max_allowed_space, "Dispatcher_Client::set_global_limits::2"); send_message(max_allowed_time_units, "Dispatcher_Client::set_global_limits::3"); send_message(rate_limit, "Dispatcher_Client::set_global_limits::4"); if (ack_arrived()) return; } } void Dispatcher_Client::ping() { // Ping-Feature removed. The concept of unassured messages doesn't fit in the context of strict // two-directional communication. // send_message(Dispatcher::PING, "Dispatcher_Client::ping::socket"); } void Dispatcher_Client::terminate() { // *(uint32*)(dispatcher_shm_ptr + 2*sizeof(uint32)) = 0; while (true) { send_message(Dispatcher::TERMINATE, "Dispatcher_Client::terminate::socket"); if (ack_arrived()) return; } } void Dispatcher_Client::output_status() { // *(uint32*)(dispatcher_shm_ptr + 2*sizeof(uint32)) = 0; while (true) { send_message(Dispatcher::OUTPUT_STATUS, "Dispatcher_Client::output_status::socket"); if (ack_arrived()) break; } std::ifstream status((shadow_name + ".status").c_str()); std::string buffer; std::getline(status, buffer); while (status.good()) { std::cout<<buffer<<'\n'; std::getline(status, buffer); } }
- [overpass] Where is bin/init_osm3s.sh?, Malcolm Herring, 06/09/2015
- Re: [overpass] Where is bin/init_osm3s.sh?, Donal Diamond, 06/09/2015
- [overpass] Re: Where is bin/init_osm3s.sh?, Malcolm Herring, 06/09/2015
- Re: [overpass] Re: Where is bin/init_osm3s.sh?, Donal Diamond, 06/09/2015
- [overpass] Re: Where is bin/init_osm3s.sh?, mmd, 06/09/2015
- [overpass] Re: Where is bin/init_osm3s.sh?, Malcolm Herring, 06/12/2015
- Re: [overpass] Re: Where is bin/init_osm3s.sh?, Roland Olbricht, 06/12/2015
- Re: [overpass] Re: Where is bin/init_osm3s.sh?, Malcolm Herring, 06/12/2015
- Sockaddr on OS X (was Re: [overpass] Re: Where is bin/init_osm3s.sh?), Roland Olbricht, 06/13/2015
- Re: Sockaddr on OS X (was Re: [overpass] Re: Where is bin/init_osm3s.sh?), Malcolm Herring, 06/13/2015
- Re: Sockaddr on OS X (was Re: [overpass] Re: Where is bin/init_osm3s.sh?), Roland Olbricht, 06/13/2015
- Re: Sockaddr on OS X (was Re: [overpass] Re: Where is bin/init_osm3s.sh?), Malcolm Herring, 06/13/2015
- [overpass] Re: Sockaddr on OS X (was Re: Re: Where is bin/init_osm3s.sh?), mmd, 06/15/2015
- [overpass] Re: Sockaddr on OS X (was Re: Re: Where is bin/init_osm3s.sh?), Malcolm Herring, 06/15/2015
- Re: [overpass] Re: Sockaddr on OS X (was Re: Re: Where is bin/init_osm3s.sh?), Roland Olbricht, 06/16/2015
- [overpass] Re: Sockaddr on OS X (was Re: Re: Where is bin/init_osm3s.sh?), mmd, 06/20/2015
- Re: [overpass] Re: Sockaddr on OS X (was Re: Re: Where is bin/init_osm3s.sh?), Malcolm Herring, 06/20/2015
- Re: [overpass] Re: Sockaddr on OS X (was Re: Re: Where is bin/init_osm3s.sh?), Roland Olbricht, 06/21/2015
- [overpass] Re: Sockaddr on OS X (was Re: Re: Where is bin/init_osm3s.sh?), Malcolm Herring, 06/25/2015
- Re: [overpass] Re: Where is bin/init_osm3s.sh?, Roland Olbricht, 06/12/2015
- [overpass] Re: Where is bin/init_osm3s.sh?, Malcolm Herring, 06/12/2015
- [overpass] Re: Where is bin/init_osm3s.sh?, mmd, 06/09/2015
- Re: [overpass] Re: Where is bin/init_osm3s.sh?, Donal Diamond, 06/09/2015
- [overpass] Re: Where is bin/init_osm3s.sh?, Malcolm Herring, 06/09/2015
- Re: [overpass] Where is bin/init_osm3s.sh?, Donal Diamond, 06/09/2015
Archive powered by MHonArc 2.6.18.