@Guerro69

Как постоянно доставать события из LongPoll а VK?

Суть такая, пишу код на python`e для того что-бы прослушивать LongPoll вк и получать от туда события, я написал код, который позволяет вытянуть из LongPoll`a одно событие в течении 25 секунд, но с этим работать не очень удобно когда я взялся за выполнение того, что бы LongPoll прослушивался постоянно у меня ничего хорошего из того не вышло. Вот мой код:
import json
import requests

class LongPoll:
	def __init__(self, session, wait=25):
		self.token = session[0]
		self.grp_id = session[1]
		self.v = session[-1]
		self.wait = wait
		self.url = 'https://api.vk.com/method/'
		self.http = requests.Session()
		self.ts = None
		self.server = None
		self.key = None
		self.__update_data()

	def __update_data(self, updts=True):
		api_data = {
			'access_token': self.token,
			'v': self.v,
			'group_id': self.grp_id
			}
		r = self.http.post(f"{self.url}groups.getLongPollServer?", params=api_data).json()
		self.server = r['response']['server']
		self.key = r['response']['key']
		if updts:
			self.ts = r['response']['ts']

	def _lp_check(self):
		lp_data = {
			'act': 'a_check',
			'key': self.key,
			'wait': self.wait,
			'mode': 2,
			'ts': self.ts
		}
		r = self.http.post(self.server, params=lp_data, timeout=self.wait + 10).json()
		if "failed" in r:
			if r['failed'] == 1:
				self.__update_data()
			elif r['failed'] == 2:
				self.__update_data(updts=False)
			elif r['failed'] == 3:
				self.__update_data()
		else:
			return r['updates']

	def listening(self):
		print('LongPoll listening...\n')
		while True:
			yield [
				i
				for i in self._lp_check()
			]

lp = LongPoll(['Здесь мой токен', 190582509, 5.95])

for event in lp.listening():
	print(event)


В итоге я получаю не каждое событие, а просто цикл в котором одно событие крутится вечно.
  • Вопрос задан
  • 75 просмотров
Решения вопроса 1
@Guerro69 Автор вопроса
Ответ решён, просто нужно было после получения события обновить данные о LongPoll сервере:
def listening(self):
		print('LongPoll listening...\n')
		while True:
			for i in self._lp_check():
				yield i
				self.__update_data()
Ответ написан
Комментировать
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы