Форум программистов
 

Восстановите пароль или Зарегистрируйтесь на форуме, о проблемах и с заказом рекламы пишите сюда - alarforum@yandex.ru, проверяйте папку спам!

Вернуться   Форум программистов > C/C++ программирование > C/C++ Сетевое программирование
Регистрация

Восстановить пароль
Повторная активизация e-mail

Купить рекламу на форуме - 42 тыс руб за месяц

Ответ
 
Опции темы Поиск в этой теме
Старый 14.02.2018, 01:11   #1
Андрей Цапко
Пользователь
 
Регистрация: 10.04.2017
Сообщений: 66
По умолчанию Узнать очередь на сервере

Здравствуйте. Начал использовать epoll api для работы с сокетами. Возник вопрос. Один поток отлавливает события через epoll и передает другим задания. Если на сокет сервере появляется события, epoll их улавливает, но после accepta, пока я не закрою первый сокет я не получаю новые события. Epoll думает что я знаю о них. Подскажите, как узнать сколько сокетов весит в очереди на подключение (backlog функции listen). Для длины чтения я нашел нужные функции, а для длины подключения не могу. Заранее спасибо.

На всякий случай, что бы не возникало вопросов.
Код:
#include <iostream> //cout, endl,
#include <string> //class string,
#include <thread> //class thread, ref(),
#include <vector> //class vector,
#include <queue> //class queue,

#include <sys/socket.h> //socket(), bind(), listen(), setsockopt(),
#include <netinet/in.h> //struct sockaddr_in,
#include <arpa/inet.h> //inet_addr(),
#include <unistd.h> //close(),
#include <sys/epoll.h> //struct epoll_event, epoll_create1(), epoll_wait(), epoll_ctl(),
#include <sys/ioctl.h> //ioctl(),
#include <semaphore.h> //type sem_t, sem_open(), sem_wait(), sem_post(), sem_close(), sem_unlink(),
#include <fcntl.h> //O_CREAT,

using std::cout;
using std::endl;
class Server{

	#define EPOLL_MAX_EVENTS SOMAXCONN

public:

	Server(const std::string &ip, const unsigned short int &port, const unsigned short int &therads, const std::string &_name="noname_HTTP&WS_server"):workers_count(therads),semaphore_name("/"+_name){
		fd=socket(AF_INET, SOCK_STREAM, 0);
		if(fd<0){
			std::cout<<"SOCKET_ERROR"<<std::endl;
			exit(1);
		}

		int value=1;
		setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (const void *)&value, sizeof(int));

		config.sin_family=AF_INET;
		config.sin_port=htons(port);
		config.sin_addr.s_addr=inet_addr(ip.c_str());
		if(bind(fd, (struct sockaddr*)&config, sizeof(config))<0){
			std::cout<<"BIND_ERROR"<<std::endl;
			exit(1);
		}

		if(listen(fd, SOMAXCONN)<0){
			std::cout<<"LISTEN_ERROR"<<std::endl;
			exit(1);
		}

		epollfd=epoll_create1(0);
		if(epollfd<0){
			std::cout<<"EPOLL_CREATE1_ERROR"<<std::endl;
			exit(1);
		}

		struct epoll_event ev;
		ev.data.fd=fd;
		ev.events=EPOLLIN|EPOLLET;
		if(epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev)<0){
			std::cout<<"EPOLL_CTL_ERROR"<<std::endl;
			exit(1);
		}

		main_thread=std::thread(Server::event_handler, std::ref(*this));

	}

	~Server(){
		main_thread.join();
		close(epollfd);
		close(fd);
	}

private:

	static void event_handler(Server &server){
		int epoll_count=0;
		unsigned short int next_thread=0;
		const unsigned short int workers_count=server.workers_count;
		std::string used_name;

		server.workers=new std::thread[workers_count];
		server.semaphore=new sem_t*[workers_count];
		server.threads_queue=new std::queue<struct epoll_event>[workers_count];

		for(unsigned short int i=0; i<workers_count; i++){
			used_name=server.semaphore_name+"_"+std::to_string(i);
			sem_unlink(used_name.c_str());
			if((server.semaphore[i]=sem_open((server.semaphore_name+"_"+std::to_string(i)).c_str(), O_CREAT, 0700, 0))==SEM_FAILED){
				std::cout<<"SEM_OPEN_ERROR"<<std::endl;
				exit(1);
			}
			server.workers[i]=std::thread(Server::worker, std::ref(server), i);
		}

		used_name.clear();

		while(true){
			epoll_count=epoll_wait(server.epollfd, server.events, EPOLL_MAX_EVENTS, 60000);
			if(epoll_count==0){
				continue;
			}
			if(epoll_count<1){
				std::cout<<"EPOLL_WAIT_ERROR"<<std::endl;
				exit(1);
			}
			for(unsigned short int i=0; i<epoll_count; i++){
				server.threads_queue[next_thread].push(server.events[i]);
				sem_post(server.semaphore[next_thread]);
				next_thread++;
				if(next_thread==workers_count){
					next_thread=0;
				}
			}
		}

		for(unsigned short int i=0; i<workers_count; i++){
			server.workers[i].join();
			if(sem_close(server.semaphore[i])<0){
				std::cout<<"SEM_CLOSE_ERROR"<<std::endl;
				exit(1);
			}
			if(sem_unlink((server.semaphore_name+"_"+std::to_string(i)).c_str())<0){
				std::cout<<"SEM_UNLINK_ERROR"<<std::endl;
				exit(1);
			}
		}
	}

	static void worker(Server &server, const unsigned short int worket_id){
		sem_t *cur_sem=server.semaphore[worket_id];
		std::queue<struct epoll_event> &cur_queue=server.threads_queue[worket_id];
		struct epoll_event cur_event;

		while(true){
			if(sem_wait(cur_sem)<0){
				std::cout<<"SEM_WAIT_ERROR"<<std::endl;
				exit(1);
			}
			cur_event=cur_queue.front();
			cur_queue.pop();
			if(cur_event.data.fd==server.fd){
				cout<<accept(cur_event.data.fd, NULL, NULL)<<endl;
			}
		}
	}

private:

	int fd;
	int epollfd;
	struct sockaddr_in config;
	struct epoll_event *events=new struct epoll_event[EPOLL_MAX_EVENTS];

	const unsigned short int workers_count;
	std::thread *workers;
	std::queue<struct epoll_event> *threads_queue;
	sem_t **semaphore;

	const std::string semaphore_name;
	std::thread main_thread;

};

int main(){
	Server test("127.0.1.1", 8080, 1);
}
Андрей Цапко вне форума Ответить с цитированием
Старый 14.02.2018, 08:53   #2
waleri
Старожил
 
Регистрация: 13.07.2012
Сообщений: 6,493
По умолчанию

Я правильно понимаю, что один поток пишет в очередь а другой читает и оба потока не утруждают себя синхронизацией?
Доступ к каждой очереди надо контролировать мьютексом!

Кстати, гораздо эффективней будет если очередь одна и каждый поток будет сам выбирать следующее событие.
waleri вне форума Ответить с цитированием
Старый 14.02.2018, 12:09   #3
Андрей Цапко
Пользователь
 
Регистрация: 10.04.2017
Сообщений: 66
По умолчанию

Кажд
Цитата:
Сообщение от waleri Посмотреть сообщение
Я правильно понимаю, что один поток пишет в очередь а другой читает и оба потока не утруждают себя синхронизацией?
Доступ к каждой очереди надо контролировать мьютексом!

Кстати, гораздо эффективней будет если очередь одна и каждый поток будет сам выбирать следующее событие.
Каждый поток имеет отдельную очередь. Один поток проверяет события и большую часть времени сидит в ожидании epool_wait. По срабатыванию события он пишет события в очереди, одно событие в одну очередь, следующее в другую и так далее. Потоки обработчики ждут своего семафора. После записи события в очередь, главный поток разблокирует семафор потока, в который он отправил события, а поток обработчик после этого разблокируется и получает его, после обработки снова блокируется. Проблемы с взаимодействием потоков нет. Проблема в том, что при запуске сервера срабатывает событие. Пока его не примет один поток, новые события на принятие подключений не произойдет, потому что я использую EPOOLET, а без него получается что событие дублируется и после первого подключение пока его не примет обработчик, событие успечает добавиться 1к раз, соответственно тоже не самое приятное. Есть функция, которая проверяет сколько данных можно прочитать с сокета, а мне нужна такая же только для проверки сервер сокета и не на чтение, а на принятие подключений. Как узнать сколько сокетов весит в очереди на подключение? (использовать не блокирующие сокеты я не хочу)
Андрей Цапко вне форума Ответить с цитированием
Старый 14.02.2018, 12:10   #4
Андрей Цапко
Пользователь
 
Регистрация: 10.04.2017
Сообщений: 66
По умолчанию

Цитата:
Сообщение от waleri Посмотреть сообщение
Доступ к каждой очереди надо контролировать мьютексом!
Мьютекс не подходит. Семафор можно разблокировать 10 раз (добавив 10 событий в очередь) и потом 10 раз подряд получить доступ, (обработать события). У мьютекса может быть только один владелец.
Андрей Цапко вне форума Ответить с цитированием
Старый 14.02.2018, 14:41   #5
waleri
Старожил
 
Регистрация: 13.07.2012
Сообщений: 6,493
По умолчанию

Цитата:
Сообщение от Андрей Цапко Посмотреть сообщение
Проблемы с взаимодействием потоков нет
Что будет если придут 10 событий?
Поток проснется и начнет циклично писать в очередь и семафор.
Как только первый элемент добавиться в очередь проснется рабочий поток и начнет из этой очереди читать но в этот момент первый поток все еще туда пишет.

Доступ к очереди надо защищать мьютексом!

Цитата:
Сообщение от Андрей Цапко Посмотреть сообщение
Как узнать сколько сокетов весит в очереди на подключение? (использовать не блокирующие сокеты я не хочу)
Никак.
Надо либо получать на каждый входящий свое событие, либо делать accept пока не скажет wouldblock (но для этого надо неблокирующий, да).
Если особо извращаться можно делать дополнительно select().

Последний раз редактировалось waleri; 14.02.2018 в 14:50.
waleri вне форума Ответить с цитированием
Старый 15.02.2018, 01:45   #6
Андрей Цапко
Пользователь
 
Регистрация: 10.04.2017
Сообщений: 66
По умолчанию

Цитата:
Сообщение от waleri Посмотреть сообщение
Что будет если придут 10 событий?
Поток проснется и начнет циклично писать в очередь и семафор.
Как только первый элемент добавиться в очередь проснется рабочий поток и начнет из этой очереди читать но в этот момент первый поток все еще туда пишет.
Происходит и по 100 событий, ничего не случается. очередь имеет возможность добавлять в конец и получать из начала без проблем с потоками. Пока в очереди пусто worker работать не будет.

В общем с потоками я разберусь. Если сервер сокет будет не блокирующим, то и все подключенные тоже будут не блокирующими, если я не ошибаюсь. Не скажется ли это на производительности? И что лучше? неблокирующие сокеты или дополнительное использование select с таймером в 0?
Андрей Цапко вне форума Ответить с цитированием
Старый 15.02.2018, 08:45   #7
waleri
Старожил
 
Регистрация: 13.07.2012
Сообщений: 6,493
По умолчанию

неблокирующий сокет.
Желаю удачи с потоками.
waleri вне форума Ответить с цитированием
Ответ


Купить рекламу на форуме - 42 тыс руб за месяц

Опции темы Поиск в этой теме
Поиск в этой теме:

Расширенный поиск


Похожие темы
Тема Автор Раздел Ответов Последнее сообщение
вводим 15 элементов в очередь, затем выводим на экран эту очередь и добавил в очередь еще один элемент. Потом удаляем любой элемен Xumera C++ Builder 2 07.12.2013 13:56
Как узнать путь до общего ресурса, находясь на сервере? Izhic Windows 2 16.06.2011 19:44
очередь blacktener Общие вопросы C/C++ 1 18.04.2011 02:49
Очередь sw47 Паскаль, Turbo Pascal, PascalABC.NET 5 30.03.2011 20:23