from collections import deque
from concurrent.futures import ThreadPoolExecutor
def load_proxies():
with open('proxy.txt') as f:
return deque([row.strip() for row in f])
def get_article(proxies, link):
proxy = proxies.popleft()
try:
print("working with proxy", proxy, link)
finally:
proxies.append(proxy)
def main():
proxies = load_proxies()
with ThreadPoolExecutor(max_workers=50) as pool:
pool.map(lambda link: get_article(proxies, link), ["link1", "link2"])
if __name__ == "__main__":
main()
from threading import *
def work(i):
for _ in range(100):
print(f"hello i'm a thread #{i}")
t1 = Thread(target=work, args=(1,))
t2 = Thread(target=work, args=(2,))
t1.start()
t2.start()
t1.join()
t2.join()
from threading import *
lock = Lock()
def work(i):
for _ in range(100):
with lock:
print(f"hello i'm a thread #{i}")
t1 = Thread(target=work, args=(1,))
t2 = Thread(target=work, args=(2,))
t1.start()
t2.start()
t1.join()
t2.join()
from threading import *
from time import sleep
class GlobalState:
def __init__(self, x):
self.x = x
def set_x(self, x):
self.x = x
def reader(state: GlobalState):
if state.x % 2 == 0:
sleep(0.01) # simulate OS context switch
print(f"{state.x=} is even")
else:
print(f"{state.x=} is odd")
def changer(state: GlobalState):
state.set_x(state.x + 1)
state = GlobalState(2)
t1 = Thread(target=reader, args=(state,))
t2 = Thread(target=changer, args=(state,))
t1.start()
t2.start()
t1.join()
t2.join()
from threading import *
from time import sleep
class GlobalState:
def __init__(self, x):
self.x = x
self.lock = Lock()
def set_x(self, x):
self.x = x
def reader(state: GlobalState):
with state.lock:
if state.x % 2 == 0:
sleep(0.01) # simulate OS context switch
print(f"{state.x=} is even")
else:
print(f"{state.x=} is odd")
def changer(state: GlobalState):
with state.lock:
state.set_x(state.x + 1)
state = GlobalState(2)
t1 = Thread(target=reader, args=(state,))
t2 = Thread(target=changer, args=(state,))
t1.start()
t2.start()
t1.join()
t2.join()
from threading import *
from random import *
class GlobalState:
def __init__(self):
self.x = []
def do_something_changing(self):
if random() < 0.5:
self.x.append(1)
elif self.x:
self.x.pop()
def reader(state: GlobalState):
for _ in range(10000000):
if len(state.x) % 2 == 0:
if len(state.x) % 2 != 0: # wtf how it's possible?
print(f"length {len(state.x)} was even before context switch")
def changer(state: GlobalState):
for _ in range(10000000):
state.do_something_changing()
state = GlobalState()
t1 = Thread(target=reader, args=(state,))
t2 = Thread(target=changer, args=(state,))
t1.start()
t2.start()
t1.join()
t2.join()
from concurrent.futures import ProcessPoolExecutor
D = {'x': [1, 2, 3], 'y': [4, 5, 6], 'z': [7, 8, 9]}
def process(arg):
key, values = arg
return key, [v * 10 for v in values]
if __name__ == "__main__":
with ProcessPoolExecutor() as executor:
result = executor.map(process, D.items())
print(dict(result))
from concurrent.futures import ProcessPoolExecutor
from time import sleep
D = {'x': [1, 2, 3], 'y': [4, 5, 6], 'z': [7, 8, 9]}
def process(arg):
key, values = arg
print("executing", key, values)
sleep(1)
return key, [v * 10 for v in values]
if __name__ == "__main__":
with ProcessPoolExecutor(2) as executor:
result = executor.map(process, D.items())
print(dict(result))
Ведь, используя 4 ядра вместо 2-х, можно получить и более высокую производительность, верно?
Но как это реализовывается?
И правильно ли я вообще делаю?
from concurrent.futures import ThreadPoolExecutor
from requests import Session
session = Session()
urls = [
'https://toster.ru/q/372757',
'https://toster.ru/',
]
with ThreadPoolExecutor(7) as pool: # 7 - количество тредов
for response in pool.map(session.get, urls):
do_something_with_response(response)
Потоки в питоне работают шустро
Уже не знаю что сделать для оптимизации
class Retry(object):
"""
Retries function with exponential delay if it's releasing exception.
"""
def __init__(self, tries, exceptions=None, delay=1, exponent=1.5):
self.tries = tries
if exceptions:
self.exceptions = tuple(exceptions)
else:
self.exceptions = (Exception,)
self.delay = delay
self.exponent = exponent
def __call__(self, f):
def fn(*args, **kwargs):
exception = None
for i in range(self.tries):
try:
return f(*args, **kwargs)
except self.exceptions as e:
delay = self.delay * self.exponent ** i
if i + 1 == self.tries:
raise exception
sleep(delay)
exception = e
raise exception
return fn
@Retry(3)
def f():
...