def flatten(sequence):
for item in sequence:
try:
yield from item # Или yield from flatten(item) для более общего случая
except TypeError:
yield item
from itertools import chain
def flatten(sequence):
list_of_lists = ([item] if type(item) is int else item for item in sequence)
return list(chain.from_iterable(list_of_lists))
flatten([1, 2, 3, [4, 5, 6], 7, 8])
[int(e) for e in re.findall('\d+', str(a))]
from collections import Counter
counter = Counter()
with open(r'z:\test.txt', 'rt') as file:
for line in file:
for word in line.strip().split():
print(counter[word], end=' ')
counter[word] += 1
x = float('40.80')
a = int(x)
b = int(100 * (x - a))
print(a, b) # => 40 79
from decimal import Decimal
x = Decimal('40.80')
a = int(x)
b = int(100 * (x - a))
print(a, b) # => 40 80
a, b = map(int, '40.80'.split('.'))
print(a, b) # => 40 80
text = "Тестовая строка" # Unicode в Python 3.x
assert type(text) is str
octetstring1 = bytes(text, encoding='utf-8')
assert type(octetstring1) is bytes
octetstring2 = text.encode('utf-8')
assert type(octetstring1) is bytes
assert octetstring1 == octetstring2
text = octetstring2.decode('utf-8')
assert type(text) is str
def user_input():
# n, m = int(input()), int(input())
n = 2
m = 4
# a = [[int(i) for i in 'input()'.split()] for w in range(m)]
first = [[1, 2], [3, 4, 5], [6, 7], [8, 9, 10, 11]]
# input()
# b = [int(i) for i in input().split()]
second = [2, 3]
return first, second, n
def process(list_of_lists, n):
# Можно ли использовать множества? А вместо вложенных списков - tuple или set.
seen_lists, seen_numbers = [], []
has_at_least_two_items = lambda lst: lst and len(lst) >= 2
for sublist in filter(has_at_least_two_items, list_of_lists):
number0, number1 = sublist[:2]
numbers = {number0, number1} # Для ускорения проверок <x in Y>
# Ни одно из чисел не встречалось ранее
if not numbers.issubset(seen_numbers):
seen_lists.append(sublist)
seen_numbers.extend(sublist)
# Здесь бы вставить continue и уменьшить вложенность блока ниже, но зачистка seen_lists перед return мешает
else: # Обработка списка, напоминающая сортировку пузырьком
# Почему перебор до конца списка, а не до seen_lists[:-1]? Поправил.
for i, seen_list_i in enumerate(seen_lists[:-1]): # Заметка: слайс возвращает КОПИЮ списка
sl_i = set(seen_list_i) # Для ускорения проверок <x in Y>
common = len(numbers.intersection(seen_numbers))
# Оба числа встречались раньше
if common == 2:
if numbers.issubset(sl_i):
continue
# Что будет, когда i == j == len(seen_lists) - 1?
for j, seen_list_j in enumerate(seen_lists[i:], i): # Заметка: слайс возвращает КОПИЮ списка
sl_j = set(seen_list_j) # Для ускорения проверок <x in Y>
if (number0 in sl_i and number1 in sl_j) or (number1 in sl_i and number0 in sl_j):
seen_list_i.extend(seen_list_j)
seen_list_j.clear()
# Раньше встречалось только одно число из двух
elif common == 1:
# Дальше у одного числа приоритет перед другим;
# Нельзя ли сделать nx = numbers - sl_i? Но проверить, что nx - это одно число, а не {number1, n2}
if number0 in sl_i:
nx = number1
elif number1 in sl_i:
nx = number0
else:
continue
seen_list_i.append(nx)
seen_numbers.append(nx)
# Эта зачистка точно на своём месте? Можно её сделать перед return? Или удалять пустые сразу (выше)?
seen_lists = [sublist for sublist in seen_lists if sublist]
# В чём смысл этого результата?
return len(seen_lists) - len(seen_numbers) + n
def main(first, second, n):
for value in second:
first[value-1].clear() # В чём смысл?
print(process(first, n), end=' ')
if __name__ == '__main__':
main(*user_input())
cx_Freeze is a set of scripts and modules for freezing Python scripts
into executables, in much the same way that py2exe and py2app do.
Unlike these two tools, cx_Freeze is cross platform and should work
on any platform that Python itself works on. It supports Python 2.7
or higher (including Python 3).
if int(x) == x:
...
assert int(1.0) == 1.0
if type(x) is int:
...
data = [1.0, 1.23, 2.0, 2.71, 3.0, 3.14]
for x in filter(float.is_integer, data):
print(x)
class Match:
@classmethod
def from_coords(cls, coords):
yield cls(coords[0:2])
yield cls(coords[2:4])
yield cls(coords[4:6])
def __init__(self, endpoints):
self.x1, self.x2 = sorted(endpoints)
def __invert__(self):
return self.x2 - self.x1
def __xor__(m1, m2):
return max(m1.x2, m2.x2) - min(m1.x1, m2.x1) - ~m1 - ~m2
def __and__(m1, m2):
return m1 ^ m2 <= 0
def solve(*coords):
m1, m2, m3 = Match.from_coords(coords)
if (m1 & m2) + (m1 & m3) + (m2 & m3) >= 2:
return 0
if ~m1 >= m2 ^ m3:
return 1
if ~m2 >= m1 ^ m3:
return 2
if ~m3 >= m1 ^ m2:
return 3
return -1
if __name__ == '__main__':
assert solve(0, 2, 4, 5, 3, 6) == 1
assert solve(1, 2, 9, 10, 12, 20) == 3
assert solve(1, 5, 0, 1, 4, 8) == 0
assert solve(1, 3, 3, 4, 5, 6) == 1
assert solve(0, 1, 3, 5, 9, 10) == -1
# Python 3.6
"привет \\n как дела" == r"привет \n как дела" # => True
"привет \\n как дела" is r"привет \n как дела" # => True, но это не точно (С)
"привет \\n как дела" == "привет \n как дела" # => False
pd.options.display.max_columns = None
from time import time, sleep
from operator import attrgetter
class Article:
def __init__(self, timestamp, text):
self.timestamp, self.text = timestamp, text
def post(self):
print(f"planned={self.timestamp:.0f}, posted={time():.0f}, text={self.text}")
class Scheduler:
def __init__(self, *articles):
self.articles = sorted(articles, key=attrgetter("timestamp"))
def execute(self):
for article in self.articles:
sleep(max(article.timestamp - time(), 0))
article.post()
if __name__ == "__main__":
now = time()
Scheduler(
Article(now + 7, "post3"),
Article(now + 2, "post1"),
Article(now + 3, "post2"),
).execute()
from collections import defaultdict
keys = ['1', '1', '1', '2', '2', '3']
vals = ['q', 'w', 'e', 'r', 't', 'y']
d = defaultdict(list)
for key, value in zip(keys, vals):
d[key].append(value)
d = dict()
for key, value in zip(a, b):
d.setdefault(key, []).append(value)
import asyncio
from contextlib import closing
async def get_some_data():
# Обработка I/O - например, получение данных через HTTP
await asyncio.sleep(2)
data = 'ping'
print(data)
return data
async def process(data):
# Обработка данных на CPU
await asyncio.sleep(1)
print('pong')
async def main():
while True:
data = await get_some_data() # Блокирующий вызов - ожидание поступления новой порции данных
asyncio.ensure_future(process(data)) # Неблокирующий вызов - обработка данных на CPU
if __name__ == "__main__":
with closing(asyncio.get_event_loop()) as event_loop:
event_loop.run_until_complete(main())
async def run(self):
if not self.chains:
return
command = f'{{"command": "subscribe", "channel": "{self.name}"}}'
while True:
await asyncio.sleep(len(self.exchange.markets)*random())
try:
async with websockets.connect(self.wss, max_queue=0) as self.websocket:
await asyncio.wait_for(self.websocket.send(command), timeout=10)
while True:
data = await asyncio.wait_for(self.websocket.recv(), timeout=20)
data = json.loads(data, cls=DeepDecoder)
asyncio.ensure_future(self.process_websocket_message(data))
except asyncio.TimeoutError as error:
print(f'{self.name}: WEBSOCKET_TIMEOUT: {error}')
except websockets.ConnectionClosed as error:
print(f'{self.name}: WEBSOCKET_CLOSED: {error}')
except (asyncio.CancelledError, KeyboardInterrupt, SystemExit) as error:
print(f'{self.name}: TASK_CANCELLED: {error}')
self.clear()
return
except Exception as error:
print(f'{self.name}: {error}')
finally:
self.clear()
def intersect(a1, a2):
common = []
i1, i2 = iter(a1), iter(a2)
try:
e1, e2 = next(i1), next(i2)
while True:
if e1 < e2:
e1 = next(i1)
elif e1 > e2:
e2 = next(i2)
else:
common.append(e1)
e1, e2 = next(i1), next(i2)
except StopIteration:
return common
intersect([1,2,3,4,5,10,12], [3,4,5,6,7,11,12]) # => [3, 4, 5, 12]
from itertools import groupby
def squeeze(text):
return ''.join(key for key, group in groupby(text))
squeeze('aAaaabbccdcc') # => 'aAabcdc'
def squeeze(text):
for c1, c2 in zip(text[:-1], text[1:]):
if c1 != c2:
yield c1
yield c2
''.join(squeeze('aAaaabbccdcc')) # => 'aAabcdc'
from operator import ne
from itertools import compress, chain
def squeeze(text):
return ''.join(chain(compress(text, map(ne, text[:-1], text[1:])), text[-1]))
squeeze('aAaaabbccdcc') # => 'aAabcdc'