import asyncio
from aiohttp import ClientSession
class Scrapper:
def __init__(self) -> None:
self.session = ClientSession()
async def get_access(self) -> None:
response = await self.session.get("https://google.com")
scrapper = Scrapper()
async def main():
await scrapper.get_access()
asyncio.run(main())
DeprecationWarning: The object should be created within an async function
self.session = ClientSession()
Traceback (most recent call last):
File "E:\default\botss\bot-aiog3\main.py", line 33, in <module>
asyncio.run(main())
File "C:\Users\artem\AppData\Local\Programs\Python\Python310\lib\asyncio\runners.py", line 44, in run
return loop.run_until_complete(main)
File "C:\Users\artem\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 646, in run_until_complete
return future.result()
File "E:\default\botss\bot-aiog3\main.py", line 31, in main
await scrapper.get_access()
File "E:\default\botss\bot-aiog3\main.py", line 25, in get_access
response = await self.session.get("https://google.com")
File "E:\default\botss\bot-aiog3\venv\lib\site-packages\aiohttp\client.py", line 466, in _request
with timer:
File "E:\default\botss\bot-aiog3\venv\lib\site-packages\aiohttp\helpers.py", line 701, in __enter__
raise RuntimeError(
RuntimeError: Timeout context manager should be used inside a task
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000001CEDEAFFEB0>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000001CEDCA846A0>
import asyncio
import ssl
from typing import Optional, Type
import aiohttp
import certifi
import ujson as json
class Scrapper:
def __init__(self, connections_limit: int = None) -> None:
self._session: Optional[aiohttp.ClientSession] = None
self._connector_class: Type[aiohttp.TCPConnector] = aiohttp.TCPConnector
ssl_context = ssl.create_default_context(cafile=certifi.where())
self._connector_init = dict(limit=connections_limit, ssl=ssl_context)
async def get_new_session(self) -> aiohttp.ClientSession:
return aiohttp.ClientSession(
connector=self._connector_class(**self._connector_init),
json_serialize=json.dumps
)
async def get_session(self) -> Optional[aiohttp.ClientSession]:
if self._session is None or self._session.closed:
self._session = await self.get_new_session()
if not self._session._loop.is_running():
await self._session.close()
self._session = await self.get_new_session()
return self._session
async def make_request(self, session, url, post, **kwargs):
try:
if post:
async with session.post(url, data=None, headers=None, **kwargs) as response:
try:
return await response.json()
except:
return response.text
else:
async with session.get(url, params=None, headers=None, **kwargs) as response:
try:
return await response.json()
except:
return response.text
except aiohttp.ClientError as e:
print(f"aiohttp client throws an error: {e.__class__.__name__}: {e}")
async def request(self, url: str, post: bool = False, **kwargs):
return await self.make_request(await self.get_session(), url, post, **kwargs)
async def get_access(self) -> None:
return await self.request("https://google.com", False)
scrapper = Scrapper()
async def main():
await scrapper.get_access()
asyncio.run(main())