$date = '2024-05-20 12:45:53';
$sql = "SELECT * FROM your_table WHERE your_datetime_column < ?";
$stmt = $conn->prepare($sql);
$stmt->bind_param('s', $date);
$stmt->execute();
$result = $stmt->get_result();
if ($result->num_rows > 0) {
while ($row = $result->fetch_assoc()) {
echo "id: " . $row["id"] . " - Дата: " . $row["your_datetime_column"] . "<br>";
}
} else {
echo "Пусто";
}
from urllib.request import urlretrieve
url = a.attr("href")
filename=url.split("/")[-1].split("?")[0]
urlretrieve(url, filename)
import os
import os.path
import re
import sys
from glob import glob
from time import sleep
from urllib.request import urlretrieve
import urllib.error
packages_to_install = "pyquery==2.0.0 tqdm==4.66.1"
cmd_for_install = sys.executable + " -m pip install "
for i in range(3):
try:
from pyquery import PyQuery as pq # type: ignore
from tqdm import trange, tqdm
from tqdm.contrib.concurrent import thread_map
break
except ImportError:
if os.system(cmd_for_install + packages_to_install) != 0:
os.system(cmd_for_install + " --user " + packages_to_install)
links_to_files: list = []
class ArchiveNotFound(Exception):
pass
class patterns:
RN = re.compile(r"[\r\n]{2,}")
TIME = re.compile(r" в [\d]{1,2}:[\d]{1,2}:[\d]{1,2}")
DELMSG = re.compile("Сообщение удалено[\n]?")
EDITEDMSG = re.compile(r"\(ред\.\)")
SLUG = re.compile(r"[^\w\s-]")
ONLY_DIGITS = r"[\D]+"
def get_user_aaction(prompt: str) -> str:
action = ""
while len(action) > 0:
action = re.sub(patterns.ONLY_DIGITS, "", input(prompt)).strip()
return action
def isInt(value):
try:
int(value)
except ValueError:
return False
return True
def norm(value: str) -> str:
for pattern in [patterns.RN, patterns.TIME, patterns.EDITEDMSG, patterns.DELMSG]:
value = re.sub(pattern, "\n" if pattern == patterns.RN else "", value)
return value.strip() + "\n\n"
def slugify(value: str) -> str:
value = re.sub(patterns.SLUG, "", value.lower())
return re.sub(r"[-\s]+", "-", value).strip("-_")
def get_dir_with_messages() -> str:
cwd = os.getcwd()
cwd2 = ""
for a in sys.argv:
if os.path.isdir(a):
cwd = a
cwd2 = os.path.join(cwd, "messages")
if os.path.isdir(cwd2):
cwd = cwd2
if not os.path.isfile(os.path.join(cwd, "index-messages.html")):
raise ArchiveNotFound("")
return cwd
def export_to_txt(files_with_messages: list, results_folder: str):
with open(files_with_messages[-1], "r", encoding="CP1251") as fp:
d = pq(fp.read())
chat_title = d("div.ui_crumb:last").text().strip()
txtfile = slugify(chat_title)
txtfile = os.path.join(results_folder, txtfile + ".txt")
with open(txtfile, "w", encoding="UTF-8") as txtfp:
txtfp.write("\ufeff")
for i in trange(len(files_with_messages), leave=None, delay=1.5, unit="pg"):
f = files_with_messages[i]
with open(f, "r", encoding="CP1251") as fp:
d = pq(fp.read())
messages: list = list(d("div.item").items())
messages.reverse()
txtfp.writelines([norm(m.text()) for m in messages])
attachments_links = d("a.attachment__link").items()
for attachment_link in attachments_links:
links_to_files.append(attachment_link.attr("href"))
def process_chat(path_to_chat: str, results_folder: str = "."):
d = None
files_with_messages: list = glob(
os.path.join(path_to_chat, "messages*.html"), recursive=False
)
if not files_with_messages:
return
files_with_messages.sort(
key=lambda x: int(os.path.basename(x).split("s")[-1][:-5]),
reverse=True,
)
export_to_txt(files_with_messages, results_folder)
def main():
try:
p = get_dir_with_messages()
except ArchiveNotFound:
print(
"""
Неверно указан путь к папке с архивом.
Измените рабочую директорию
или передайте нужный путь в аргументе командной строки при вызове этой утилиты.
""".strip()
)
sys.exit(1)
chats = list(
filter(
lambda x: isInt(os.path.basename(x)),
[
c
for c in glob(p + os.path.sep + "*", recursive=False)
if os.path.isdir(p)
],
)
)
print("I'm starting to work. %d chats found." % (len(chats),))
results_folder = os.path.join(p, "mre")
if not os.path.isdir(results_folder):
os.mkdir(results_folder)
results = thread_map(
lambda x: process_chat(x, results_folder),
chats,
leave=None,
miniters=1,
unit="chat",
)
del results
with open("links_to_files.txt", "w", encoding="UTF-8") as fp:
fp.write( "\n".join(links_to_files) )
print("Скачиваю вложения...")
attachments_dir = os.path.join(results_folder, "attachments")
if not os.path.isdir(attachments_dir):
os.mkdir(attachments_dir)
for url in tqdm(links_to_files):
filename = os.path.join(attachments_dir, url.split("/")[-1].split("?")[0])
try:
urlretrieve(url, filename)
except urllib.error.HTTPError:
print(f"Ошибка при скачивании {filename}")
sleep(1)
print(" ")
print("Завершено! Текстовые файлы находятся по пути:\r\n", results_folder)
if __name__ == "__main__":
main()
import base64
import requests
headers = {"X-Goog-Api-Key": "AIzaSyBOti4mM-6x9WDnZIjIeyEU21OpBXqWBgw"}
BASE_URL = "https://ckintersect-pa.googleapis.com/v1/intersect/"
res = requests.post(f"{BASE_URL}langs", headers=headers, verify=False)
print(res.text)
img = open("filename.jpg", "rb")
imgData = img.read()
img.close()
imgBase = base64.b64encode(imgData)
data = {"imageRequests": [{"engineParameters": [{"ocrParameters": {}}, {"descriptionParameters": {"preferredLanguages": ["en"]}}], "imageBytes": imgBase.decode(), "imageId": "somestring"}]}
res = requests.post(f"{BASE_URL}pixels", json=data, headers=headers, verify=False)
print(res.text)
$(document).on('change', 'input[pattern]', function() {
var re = new RegExp($(this).attr('pattern'), 'g');
var newVal = '';
for (i in m = $(this).val().match(re)) {
newVal += m[i];
}
$(this).val(newVal);
});
import json
from urllib.request import urlopen, Request
class Coincap():
"""parser module for the coincap API"""
@staticmethod
def format_float(f, i=3):
return float(("%."+str(i)+"f") % float(f))
def get_info(self, convert_numbers=True):
"""Get the currency table from the api.coincap.io/v2/assets
Returns:
tuple: (headers 1d list, content 2d list with rows and cols)
"""
url = "https://api.coincap.io/v2/assets"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:100.0) Gecko/20100101 Firefox/100.0",
"Accept": "application/json, text/javascript, */*; q=0.01",
"Accept-Language": "ru-RU,ru;q=0.8,en-US;q=0.5,en;q=0.3",
"X-Requested-With": "XMLHttpRequest",
}
opener = Request(url, headers = headers)
resp = urlopen(opener)
content = resp.read().decode("UTF-8")
j = json.loads(content)
data_items = j["data"]
ret_headers = (
"Name",
"Ticker",
"Price",
"Capitalization",
"Change percent in 24 h",
"Value 24 h",
"Value",
"Change 24 h",
"Change 7 d",
)
# attention, I have not found any changes in this API in seven days
ret_content = []
tmp_data = []
for ditem in data_items:
tmp_data.append(ditem["name"])
tmp_data.append(ditem["symbol"])
priceUsd = self.format_float(ditem["priceUsd"])
tmp_data.append(priceUsd if convert_numbers else str(priceUsd) + " $")
tmp_data.append(
self.format_float(ditem["marketCapUsd"], 1)
if convert_numbers
else str(self.format_float(ditem["marketCapUsd"], 1)) + " $"
)
tmp_data.append(
self.format_float(ditem["volumeUsd24Hr"])
if convert_numbers
else str(self.format_float(ditem["volumeUsd24Hr"])) + " $"
)
tmp_data.append(
self.format_float(ditem["supply"])
if convert_numbers
else str(self.format_float(ditem["supply"]))
)
tmp_data.append(
self.format_float(ditem["changePercent24Hr"], 2)
if convert_numbers
else str(self.format_float(ditem["changePercent24Hr"], 2)) + "%"
)
tmp_data.append(0.0 if convert_numbers else "0.0")
ret_content.append(tmp_data)
tmp_data = []
return (ret_headers, tuple(ret_content))
if __name__ == "__main__":
cc = Coincap()
headers, content = cc.get_info()
btc = content[0]
print(btc[1], btc[2])
import logging
import asyncio
q = asyncio.Queue()
async def a_difficult_task(some_arg):
print(f"task {some_arg} started")
await asyncio.sleep(120)
print(f"task {some_arg} ended")
async def worker():
while True:
try:
some_arg = await q.get()
await a_difficult_task(some_arg=some_arg)
except (KeyboardInterrupt, SystemExit, SystemError):
break
except Exception:
logging.exception("...")
finally:
q.task_done()
async def main():
asyncio.create_task(worker())
print("Adding tasks to queue")
for x in range(1, 7):
await q.put( x )
print("Tasks added to queue")
while True:
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(main())