Skip to main content

7 posts tagged with "Python"

View All Tags

· 6 min read
zaxro

Object-Oriented Programming

物件導向編程(Object-Oriented Programming,OOP)是設計風格,會實作以下幾項

  • 物件:把程式組成一個物件,具有屬性跟方法
  • 封裝:把程式邏輯封裝,只提供公開介面
  • 繼承: 可以繼承其他物件的屬性跟方法
  • 多型: 物件可以根據上下文的不同以不同的方式呈現
  • 抽象化: 抽象類別跟介面結構,並由物件實現

以上東西文字看了會不太懂,所以我這邊用ec2做舉例. aws提供一個api,它提供了很多方法去他的server撈資料,今天我要撈某區的ec2,會先建立連接,然後查詢有哪些ec2,然後把資料匯出並配合他的資訊做管控. 以上過程可以用幾個function串起來,像是connect_api check_ec2_info start_ec2 stop_ec2 reboot_ec2 get_ec2_public_dns(這邊只是舉例,不是實際功能). 那用oop來設計就會把建立check_ec2_info 查到的ec2,封裝到一個定義有start stop reboot get_public_dns方法的物件,你操作就會變成ec2.start(),stop(),reboot()這樣. 我個人是覺得管理跟理解上更好.

實作oop

以下實作項目:

  • 抽象化:就是code開頭在用abc規劃架構
  • 多型: 因為目前只寫aws的,但是要用到gcp那些也都會使用父輩的VM class,畢竟每一台機器,無論是gcp,azure,aws都會需要 開機 關機 重開等功能.
  • 繼承: EC2會繼承至VM架構
  • 封裝: 要做到stop, start, restart並加入一些小東西,並把這些東西集合在各個公開介面.

另外補充,該程式碼功能主要是拿來再起動ec2後,用程式幫我修改ssh config,避免每次都要自己上去查詢並修改. 基本上把ssh config裡面的host拉出來做變數再帶入寫會更好,不過這邊偷懶就先這樣寫囉.

main.py
from abc import ABC, abstractmethod
import boto3
from typing import Iterable
import argparse

# 抽象化去規劃VM結構 會適用於所有vm
class VM(ABC):

@abstractmethod
def start_instance(self):
pass


@abstractmethod
def stop_instance(self):
pass

@abstractmethod
def restart_instance(self):
pass

@abstractmethod
def _show_instance_public_dns(self):
pass

def create_instance(cls, instance_id, instance_type):
pass

# 繼承至VM,for aws vm 示範
class EC2(VM):
def __init__(self, ec2_client, instance_id, instance_type, instance_state, public_dns_name, tags):
self.ec2_client = ec2_client
self.instance_id = instance_id
self.instance_type = instance_type
self.instance_state = instance_state
self.public_dns_name = public_dns_name
self.tags = tags

def start_instance(self):
self.ec2_client.start_instances(InstanceIds=[self.instance_id])
print(f"{self.tags[0]['Value']}開機了")
return

def start_instance_update_ssh_config_too(self, target_host_in_ssh_config: str):
self.start_instance()
self._show_instance_public_dns()
update_result = self._update_ssh_config(target_host=target_host_in_ssh_config)
if update_result:
print(f"{self.public_dns_name} change to set {target_host_in_ssh_config} a new dns")
else:
print(f"Dont see {target_host_in_ssh_config} in ssh config")
return

def stop_instance(self):
self.ec2_client.stop_instances(InstanceIds=[self.instance_id])
if self.tags:

print(f"{self.tags[0]['Value']} is going to stop")
else:
print(f"{self.instance_id} is going to stop")
return

def restart_instance(self):
self.ec2_client.reboot_instances(InstanceIds=[self.instance_id])
if self.tags:
print(f"{self.tags[0]['Value']} is going to restart")
else:
print(f"{self.instance_id} is going to restart")
return
# 因為是內部使用所以在前面加 _ 這個符號
def _show_instance_public_dns(self):
# 等待实例状态变为 "running"
self.ec2_client.get_waiter('instance_running').wait(InstanceIds=[self.instance_id])

# 获取实例详细信息
response = self.ec2_client.describe_instances(InstanceIds=[self.instance_id])

# 提取公共 DNS 名称
self.public_dns_name = response['Reservations'][0]['Instances'][0]['PublicDnsName']
return

def _update_ssh_config(self, target_host: str):

config_file = '/Users/suyuying/.ssh/config' # SSH 配置文件路径

# 读取配置文件内容
with open(config_file, 'r') as file:
lines = file.readlines()

# 查找目标主机名的行索引,-1是為了一旦沒有鎖定到目標host,可以方便做if else處理
target_index = -1
# 尋找目標Host,並用字串處理確認是否符合
for i, line in enumerate(lines):
if line.strip().startswith('Host') and line.strip().split()[1] == target_host:
# 一但找到就把target index做修改
target_index = i
break

# 更新主机名
if target_index != -1:
# 以target_index為基準,處理他的下一行,也就是HostName那一行
lines[target_index + 1] = f' HostName {self.public_dns_name}\n'

# 写入更新后的内容
with open(config_file, 'w') as file:
file.writelines(lines)
print('SSH configuration updated successfully.')
return True
else:
print('Target host not found in SSH configuration.')
return False


# 使用组合来管理 EC2 实例
class EC2Cluster:
def __init__(self, instances: Iterable[EC2]):
self.instances = instances

def start_all(self):
for instance in self.instances:
instance.start_instance()
return

def start_all_update_ssh_config(self):
for instance in self.instances:
if instance.tags[0]['Value'] == 'ford-pmm-server':
target_host = "prometheus-server"
instance.start_instance_update_ssh_config_too(target_host)
elif instance.tags[0]['Value'] == 'ford-pmm-test':
target_host = "backend1"
instance.start_instance_update_ssh_config_too(target_host)
else:
print("not matched tags in cloud")
return

def stop_all(self):
for instance in self.instances:
instance.stop_instance()
return

#
# def datetime_encoder(obj):
# if isinstance(obj, datetime):
# return obj.isoformat()

# 取得ec2資訊,並製作物件
def get_instances(ec2_client_side):
response = ec2_client_side.describe_instances()

instances = []
for reservation in response['Reservations']:
for instance in reservation['Instances']:
instance_id = instance['InstanceId']
instance_type = instance['InstanceType']
instance_state = instance['State']['Name']
instance_public_dns_name = instance.get('PublicDnsName', '')
instance_tags = instance.get('Tags', [])

instance_info = {
'InstanceID': instance_id,
'InstanceType': instance_type,
'InstanceState': instance_state,
'PublicDNSName': instance_public_dns_name,
'InstanceTags': instance_tags
}
print(instance_info)
if instance_tags:
instance_info['Tags'] = instance_tags

ec2_instance = EC2(ec2_client_side,instance_id, instance_type, instance_state, instance_public_dns_name, instance_tags)

instances.append(ec2_instance)
return EC2Cluster(instances)


if __name__ == "__main__":

# 创建 ArgumentParser 对象
parser = argparse.ArgumentParser(description='EC2 Instances Control')

# 添加 start 选项
parser.add_argument('--start', action='store_true', help='Start EC2 instances and update config')

# 添加 stop 选项
parser.add_argument('--stop', action='store_true', help='Stop EC2 instances')

# 解析命令行参数
args = parser.parse_args()
ec2_client = boto3.client('ec2', region_name='us-west-1')
instances_cluster = get_instances(ec2_client_side=ec2_client)
# 根据选项执行相应操作
if args.start:
print('Starting EC2 instances...')
# 在这里添加启动 EC2 实例的逻辑
instances_cluster.start_all_update_ssh_config()
if args.stop:
print('Stopping EC2 instances...')
# 在这里添加停止 EC2 实例的逻辑
instances_cluster.stop_all()

· 9 min read
zaxro

應用情境介紹

如果你有一個套件,他是 I/O 密集的套件,例如 python 的 request,會對對方 server 請求,且這個時候會 block 住 thread,在這種情況下,你無法使用異步(因為同一 thread 會被 block 住),這時候就可以考慮用 multithread 做加速,也就是用並行執行方式提速,以下就是我自己在應用的其中一段 code,這樣用也要注意速率限制就是!

    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(get_file_do_scan, file_tg=file, api_keys=copy.deepcopy(my_api_keys)) for file in
just_compress_files]
for future in concurrent.futures.as_completed(futures):
future_result = future.result()
if future_result == "dangerous" or future_result is None:
continue
print(future_result)
print("以上檔案是安全的")

thread pool vs process pool

這邊都是以內建的concurrent.futures套件為例!主要用在並行執行程式.

  • thread 是 os 系統裡進行運算的最小單位,一個 process 可以有多個 thread(看你設計,也可以只有單個),thread 間會共享記憶體空間.
  • process 是 os 系統做資源分配跟調度的基本單位,每個 process 會有獨立記憶體空間.

在 python 世界裡,thread pool 跟 process pool 是兩樣不同東西,以下講結論

  • thread pool 適用於 i/o 密集工作,例如 同時對 api server 做請求!
  • process poo 適用於運算密集的工作!

thread pool

thread pool 的部分,受限於官方 python 的 CPython 記憶體管理問題,需要透過 GIL(Global Interpreter Lock)去做到單 process 中的多 thread 管理,當一個 process 運行多 thread 時,在 python 世界裡其實一次只會運行一個 thread,這時候會對 process 做 lock,當該 thread 遇到 I/O 密集工作(例如:查詢網站)時,就會釋放 GIL,讓另一個 thread 可以執行.

在使用 thread pool 時要注意,Python 中的 thread 會共享同一進程的記憶體空間,因此它們能夠訪問並修改全局變數。如果多個 thread 同時修改同一個全局變數,可能會導致不可預期的結果。所以全局變數的修改就要注意,避免去影響其他 thread.例如以下程式碼:my_api_keys是一個 list,如果你執行過程中會刪除該 list 內的元素,為了要避免影響其他 thread 就建議改成用 deep.copy 方式,每個 thread 放獨立的my_api_keys.

    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(get_file_do_scan, file_tg=file, api_keys=copy.deepcopy(my_api_keys)) for file in
just_compress_files]

process pool

process pool部分,使用它就會是應用多 cpu 的狀況,它會透過 process pool 利用多核 CPU 進行真正的並行計算(thread pool 有點假並行的感覺,因為它實際上會等另一個 thread),但 process pool 間通信成本高.

教學內容

以下都是講thread pool.

info

我當初是看這篇學習的

主要會用的就是submit()map()這兩種方法,使用submit()會配合as_completed去動態收集結果.

主要講一下,submit()的情況,當執行[executor.submit(fetch_yahoo_homepage, url) for url in url_will_request]時,會產生 future 物件組成的列表,同時即刻開始並行執行程式,接著這個 list 執行完之後.可以用as_completed去取得程式執行的結果,像以下這樣

with ThreadPoolExecutor() as executor:
# 使用 list comprehension 創建一個 Future 對象的列表
# 每個 Future 對象代表一個即將完成的操作(在這裡,操作是 fetch_yahoo_homepage 函數的調用)
# executor.submit 函數的參數是要調用的函數和該函數的參數
# 当你使用executor.submit()提交任务给线程池时,任务会立即开始执行。executor.submit()会立即返回一个concurrent.futures.Future对象
futures = [executor.submit(fetch_yahoo_homepage, url) for url in url_will_request]
for future in as_completed(futures):
# 使用 Future 對象的 result 方法來獲取操作的結果(在這裡,結果是 HTML 字串)
# 通过调用future.result()获取任务的执行结果
html = future.result()

map就有點像是把以上兩件事submitas_completed直接結合,他返回的是迭代器,可以迭代取得執行完畢結果,像這樣

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as e:
# Executors 的相關方法,map(func, *iterables),執行可迭代物件 ex list tuple named tuple
# 其實就是把可迭代物件的元素一個個拆開喂給Executor,提升程式執行速度
for hostinfo in e.map(lambda x: get_certificate(x[0], x[1]), HOSTS):
if hostinfo is None:
continue
print_basic_info(hostinfo)

看起來submit()map()很像,使用submit允許你檢查任務狀態,而使用map則是能夠簡潔的並行執行!

以下是我學習的過程,主要是用 request 網站來做 demo. 基本會用到的 function 跟套件,跟變數:list-url_will_request=["https://www.google.com",'https://www.yahoo.com'," https://www.youtube.com","https://www.nba.com"]

import urllib.request
import ssl
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
ssl._create_default_https_context = ssl._create_unverified_context
url_will_request=["https://www.google.com",'https://www.yahoo.com'," https://www.youtube.com","https://www.nba.com"]

def fetch_yahoo_homepage(url):
try:
response = urllib.request.urlopen(url)
html = response.read().decode('utf-8')
return html
except urllib.error.URLError as e:
print(f'Error: {e}')
return None

單 thread 一般版本

#single_thead
start_time_single_thead=time.time()
for i in url_will_request:
html=fetch_yahoo_homepage(i)
print(html[:100])
end_time_single_thead=time.time()
all_time_single_thead=end_time_single_thead-start_time_single_thead

multi-thead use map

start_time=time.time()
# 使用示例ex
with ThreadPoolExecutor() as executor:
for html in ecutor.map(fetch_yahoo_homepage,url_will_request ):
print(html[:100])
end_time=time.time()
all_time=end_time-start_time`

multi-thead and use submit

start_time_multi_thread_submit=time.time()

# 使用 ThreadPoolExecutor 作為上下文管理器
# 當離開這個區塊時,將自動清理並關閉所有線程
with ThreadPoolExecutor() as executor:
# 使用 list comprehension 創建一個 Future 對象的列表
# 每個 Future 對象代表一個即將完成的操作(在這裡,操作是 fetch_yahoo_homepage 函數的調用)
# executor.submit 函數的參數是要調用的函數和該函數的參數
# 当你使用executor.submit()提交任务给线程池时,任务会立即开始执行。executor.submit()会立即返回一个concurrent.futures.Future对象
futures = [executor.submit(fetch_yahoo_homepage, url) for url in url_will_request]

# 使用as_completed()函数来迭代这些Future对象,并在future中有程式完成时获取结果到future。
for future in as_completed(futures):
# 使用 Future 對象的 result 方法來獲取操作的結果(在這裡,結果是 HTML 字串)
# 通过调用future.result()获取任务的执行结果
html = future.result()

# 如果 HTML 不為空(也就是說,操作成功返回了 HTML),則打印 HTML 的前 100 個字符
if html:
print(html[:100])
end_time_multi_thread_submit=time.time()
all_time_multi_thread_submit=end_time_multi_thread_submit-start_time_multi_thread_submit

以上的 time 做時間比較

print(f"all_time_single_thead is {all_time_single_thead}")
print(f"all map time is {all_time}")
print(f"all_time_single_thead_submit is {all_time_multi_thread_submit}")
  • all_time_single_thead is 2.3154239654541016
  • all map time is 1.4191679954528809
  • all_time_single_thead_submit is 1.0428180694580078

可以看出用 multi_thread 運行還是比較快的!

小結

  • excutor.map(fetch_yahoo_homepage,url_will_request )當使用 map 方法時,第一個參數需要是可調用的對象(一個 function,或帶有__call__方法的物件),第二個參數則是可迭代物件(ex.string,set,list,str,dict)基本上只要實現__iter__就是.

  • executor.submit():一樣第一個參數需要是可調用的對象,第二個參數是可迭代物件

至於使用上可以參考上面的方法

· 7 min read
zaxro

裝飾器

寫程式很常看到高級技巧裝飾器,@xxxx 這種東西,因為之前都可以用一些方式避開不這樣寫,直到某次在跟別人串程式的時候,為了減少影響範圍,所以選用裝飾器去拿對方 return 的東西在處理一次,而對方也很簡單,只要把裝飾器放上去就可以直接用了,很方便!裝飾器主要應用情境是在不影響對方 function 下,給這個 function 新功能,裝飾器的彈性和可擴展性使其成為 Python 中強大的程式設計工具之一.

partial function

因為裝飾器有時候會配合 partial function,所以要了解一下.

partial 是 python 的偏函示,使用上感覺就是方便你帶入預設參數到一個已知 function,並透過偏函式去建立新的 function!使用 partial 函數可以非常方便地對現有函式進行定制和重用。那為何不預設帶入?總會有需要多個預設值的情況吧~

  • 偏函示範例一
from functools import partial

# 原始函式
def add(x, y):
return x + y

# 創建偏函式
add_five = partial(add, y=5)

# 使用偏函式
result = add_five(10)
print(result) # 輸出: 15
  • 偏函示範例二
from functools import partial

# 定義一個具有多個參數的函式
def multiply(x, y, z):
return x * y * z

# 使用偏函數將其中一個參數的預設值固定為 2
new_func = partial(multiply, y=2)

# 調用新函式
result = new_func(3, z=4)
print(result) # 輸出: 24

裝飾器正篇

from functools import partial, wraps
import vt

def partial_decorator1(func=None, *, apikey=api_key_global):
def actual_decorator(func, apikey):
@wraps(func)
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)
sha256_hash = result
client = vt.Client(apikey)
file = client.get_object(f"/files/{sha256_hash}")
print(file.last_analysis_stats)
client.close()
return result

return wrapper

if func:
return actual_decorator(func, apikey)
else:
return partial(actual_decorator, apikey=apikey)

# 使用新的装饰器
@partial_decorator1(apikey=api_key_global)
def my_function(file_path):
# 在这里执行你的逻辑并返回一个哈希值
return file_path

result = func(*args, **kwargs)這邊會拿到原始 function 帶入參數的一般 return!後面的程式碼是對該 return 作處理!

最後面的 if else 會比較難懂.

  • if func: 這部分處理的是當裝飾器被用於一個函數時(也就是被直接應用到函數上,例如 @partial_decorator1)的情況。在這種情況下,func 參數會被自動設置為被裝飾的函數。這時,actual_decorator(func, api_key) 會被調用,其結果(也就是裝飾過的函數)被返回。

  • else: 這部分處理的是當裝飾器被當作函數調用(例如 @partial_decorator1() 或 @partial_decorator1(api_key=some_key))的情況。在這種情況下,func 參數會默認為 None,因此 else 分支會被執行。 partial(actual_decorator, api_key=api_key) 創建了一個新的裝飾器,這個裝飾器會接受一個函數作為參數,並返回該函數的裝飾版本。新的裝飾器會在稍後的時間被調用,這時才會裝飾目標函數。

當裝飾器以 @partial_decorator1 的形式應用到函數上時,會走 if func: 分支;而當裝飾器以 @partial_decorator1() 或 @partial_decorator1(api_key=some_key) 的形式應用時,會走 else: 分支

裝飾器範例二

from functools import partial, wraps

def partial_decorator1(generator_func=None, *, api_key="api_key_global"):
def actual_decorator(generator_func, api_key):
@wraps(generator_func)
def wrapper(*args, **kwargs):
# 打印字串
print("Before generator execution")
# 並不會執行,只是產生生成器而已
result = generator_func(*args, **kwargs)
print(api_key)
# 打印字串
print("After generator execution")
return result

return wrapper

if generator_func is None:
return partial(actual_decorator, api_key=api_key)
else:
return actual_decorator(generator_func, api_key)


# @partial_decorator1(api_key="my_api_key"),這是代表裝飾器被當作函式調用情況,
# 因為調用時沒有帶generator_func,所以generator_func會是預設的None.會走if那邊,
# 用partial(actual_decorator, api_key=api_key),產生新裝飾器,這個裝飾器會接受一個函數作為參數,並返回該函數的裝飾版本。
@partial_decorator1(api_key="my_api_key")
def my_generator():
yield 1
yield 2
yield 3

# 使用装饰器修饰生成器函数,這邊是在修改帶入的既有餐數,api_key="my_api_key"
# 注意這邊的my_generator()是返回<generator object my_generator at 0x10041ce00>
# yield是實現generator的一種方法!
for value in my_generator():
print(value)

# 使用 partial_decorator1 创建装饰器,@partial_decorator1不帶參數,就會直接應用到function
# ,因此參數generator_func就會有帶東西,而走else.else做的事情是修飾並return新function
# actual_decorator(generator_func, api_key)會修飾function,return新function.
@partial_decorator1
def my_generator1():
yield 1
yield 2
yield 3

# 使用装饰器修饰生成器函数,這邊是如果什麼都不帶,就會用默認的api_key="api_key_global"
for value in my_generator1():
print(value)

小結

一般裝飾器不帶括號(ex.@partial_decorator1),代表該裝飾器會自動將 func 參數設定為套用的 func,這滿單純! 如果有帶括號(ex.@partial_decorator1(api_key=some_key)),會需要指定帶入的 function,就需要透過 partial 寫法去指定要帶入哪個 function 作為裝飾器,不過都用裝飾器了,當然會帶入要執行的包裝 function.

另外,裝飾器寫法很多,這邊寫的只是其中一種!

info

裝飾器主要應用情境

  • 記錄日誌:裝飾器可以捕獲函式的輸入和輸出,並將其記錄到日誌中,以便後續調試和追蹤。

  • 身份驗證和授權:裝飾器可以在執行函式之前檢查用戶的身份驗證和權限,以確保只有授權的用戶可以訪問該函式。

  • 快取:裝飾器可以將函式的輸出快取起來,以減少重複計算,提高效能。

  • 異步處理:裝飾器可以將函式的執行轉換為非同步處理,以提高系統的反應性和併發能力。

  • 計時和性能監控:裝飾器可以計時函式的執行時間,並收集性能指標,用於分析和優化代碼的效能。

· 7 min read
zaxro

這邊會記錄一些實用的 coding 方法

coding 部分

False 跟 None 使用時機

False 主要做條件判斷,None 則用於表示缺少值,或變量沒有被給值的請況. 舉例來說,今天 request 想要取得資料放入變數 stock_info, 如果判斷取出來資料不合預期,或者報錯,那就適合用 None. 那 False 就是拿來表達條件成不成立的情況,例如判斷股票是否上漲,可以這樣定義變數,is_price_up,True代表上漲,False代表下跌.

# 使用 None 表示缺少值或變量尚未被賦予任何值
name = None
if name is None:
print("名字尚未被賦值")

# None 作為函式的返回值
def divide(a, b):
if b == 0:
return None
return a / b

result = divide(10, 0)
if result is None:
print("無效的操作")

# 使用 False 進行條件判斷
flag = False
if flag:
print("這個條件不成立")
else:
print("這個條件成立")

請求 api 失敗後 retry

有時候對方 server 不知道啥問題,會出現報錯,過一下又好了,這邊用 while 配合數字開關,去執行 retry,不過要記得 try,except 只存在於當下文檔,不會跨文檔接 Error.

max_retries = 3
retries = 0
while retries<max_retries:
try:
request....
return sth
except Exception as err:
retries += 1
time.sleep(1)# 延遲一秒後再重新執行
if retries==max_retries:
telegram_bot_sendtext(f"sth wrong, go cheeck")
print(err)
return None

或者配合嘗試不同 key,同時做連線 retry

· 7 min read
zaxro

code

平常寫 code 時候,不會去想序列排列底層的事情,因緣際會下,就學一學吧,以下 code 是跟 AI 學習的,能用 python 這樣在同一個 function 內執行同一個 function,這個事情是真的沒想過,另外整個程式執行過程,他執行的順序,也是很大學習重點!

· 21 min read
zaxro

basic introduction

對資料進行分析,基本上過程大致如以下

  1. 資料收集
  2. 使用工具分析資料
  3. 可視化分析(作圖)
  4. 建模
  5. 解釋結果

網路上資料收集方式有很多,從簡便到麻煩依序大致如以下,如果遇到需要取資料時,可以條列式的篩選找到合適方法.

API < 開發者工具的xhr/fetch解析 < bs4解析html中css跟tag < selenium解析html跟點擊

基本上當然是先從簡單開始 try,假設今天已經取到所需的資料了,它可能是 csv,Json 等格式,接著分析資料步驟就會是pandas上場的時候.

分析資料:常見流程有 數據準備、選取、過濾、聚合、分組、排序、合併、填充缺失值,另外因為他的資料結構也有跟 python 的繪圖工具做整合,也可以搭配使用,常用的繪圖工具如以下.

  • Plotly 和 Bokeh 是基於 JavaScript 的交互式圖表,也支援其他語言 ex.Python,可以創建互動性更強的圖表,如滑塊、下拉選單等。
  • Altair 是基於 Vega-Lite 的 Python 套件,支持將數據轉換為交互式圖表,優點是語法較簡單。
  • Seaborn 則是一個統計圖表套件,專注於統計繪圖,方便使用者進行常見的數據分析和繪圖操作。
  • Matplotlib 是 Python 最早期的繪圖套件之一,功能強大,可以繪製各種圖表,包括條形圖、直方圖、散點圖等等,基本上 Seaborn 是建立在 Matplotlib 之上。
  • Pyecharts 則是中國的一個開源圖表庫,提供了各種各樣的圖表,能夠繪製地圖、關係圖、熱力圖等多種圖表。

數據準備

以下數據是改過後的數據,會作為後面 demo 過程範例,基本上你想得到的數據格式 pandas 都支援了,md 檔,csv,dict,json...

pandas 主要數據結構有

  • Series:一維數據結構,類似於帶有標籤的一維數組,可以容納不同類型的數據。

  • DataFrame:二維數據結構,類似於一個二維數組或一個關聯數據庫表,每列可以有不同的數據類型。

  • Panel:三維數據結構,類似於一個由 DataFrame 對象組成的字典,可以用來表示由多個 DataFrame 對象組成的數據集。

最常會使用到的就是 DataFrame.

· 3 min read
zaxro

常見 datetime 應用

日期介於幾天之間

範例一.日期介於昨日到七天前

資料ticket_detail['result']['close_time']他的字串值是2023-04-17 02:36:55這類,是否屆於昨日到七天前,是就 print yes

format=datetime.now() #<class 'datetime.datetime'>

today = datetime.now().date() #<class 'datetime.date'>

yesterday = today - timedelta(days=1) #<class 'datetime.date'>

seven_days_ago = today - timedelta(days=7) #<class 'datetime.date'>

date_format = "%Y-%m-%d %H:%M:%S" #把字串為2023-04-17 02:36:55依據格式換成datetime.datetime物件

# 使用datetime.strptime方法把字串依據格式換成 datetime.datetime物件,使用date方法變成datetime.date物件進行比較
if seven_days_ago<= datetime.strptime("2023-04-17 02:36:55",date_format).date() <= yesterday:
print("yes")
info

範例二:timedelta 物件運用,憑證是否在今天到 30 天之間到期

# 將到期日期字串轉換為 datetime 格式
vaild_date = datetime.strptime("2023-04-17 02:36:55", date_format)

# 取得當前時間
now_time = datetime.now() #2023-04-17 20:33:01.577635

# 計算到期日期與當前時間的時間差,注意這邊是timedelta物件囉!
vaild_days = vaild_date - now_time

# 印出時間差和時間差的資料型態
print(vaild_days) #-1 day, 6:11:15.206948
print(type(vaild_days)) #<class 'datetime.timedelta'>

# 判斷時間差是否在 當下到 30 天之間到期
if vaild_days > timedelta(days=0) and vaild_days < timedelta(days=30):
print(vaild_days)
print("ok")

tip

注意~

以上面為例,到期時間為"2023-04-17 02:36:55",當下時間為 2023-04-17 20:25:39.793060,這種情況下很明顯到期時間是小於當下時間的,相減結果會是-1 day, 6:03:53.422373.

由此可知,只要前時間小於後時間,剪出來的 day 都會是負數,也就是你的憑證到期了!

另外,如果是 datetime.date 之類物件減去 timedelta 物件,是不會變成 timedelta 物件!

vaild_date=datetime.strptime("2023-04-17 02:36:55",date_format).date()
print(type(vaild_date-timedelta(days=1))) # <class 'datetime.date'>

如果是 datetime.date 之類物件減去 datetime.date 物件,是會變成 timedelta 物件的!

vaild_date=datetime.strptime("2023-04-17 02:36:55",date_format).date()
print(type(datetime.now().date()-vaild_date)) #<class 'datetime.timedelta'>

但如果是 datetime.datetime 之類物件減去 datetime.date 物件會報錯 TypeError: unsupported operand type(s) for -: 'datetime.date' and 'datetime.datetime'

tip

datetime.datetime 同類別可以相減

datetime.date 同類別可以相減

datetime.datetime 之類減去 datetime.date 會報錯

datetime.datetime 跟 datetime.date 都可以減去 timedelta