Skip to main content

· 5 min read
zaxro

osi建立目的是確保網路可以通信,牽涉到你的電腦跟中間的交換器,路由器,數據機等等.

  1. 物理層(Physical Layer):負責傳輸數據位元流,定義物理介質和傳輸率等。

  2. 數據鏈路層(Data Link Layer):負責在直接相連的兩個節點之間進行數據傳輸,提供了對物理層的抽象。

  3. 網絡層(Network Layer):處理路由和轉發數據包,確保數據能在不同網絡間傳輸。

  4. 傳輸層(Transport Layer):負責提供端到端的數據傳輸,確保數據的可靠性和流量控制。

  5. 會話層(Session Layer):負責建立、管理和結束通信會話。

  6. 表示層(Presentation Layer):處理數據的表示格式,確保不同系統之間的數據能夠正確解讀。

  7. 應用層(Application Layer):提供各種應用服務,包括文件傳輸、電子郵件、網頁瀏覽等。

ps: 常用的ping,是屬於icmp協議,一個輔助排查網路的協議,建立在網路層之上,但跟4,5,6,7層沒關係!

單看這七層一定還是不知道在幹嘛,這邊拿cloudflare的教學來改成以下範例.

info

請求端是由上層往下層,接收端的解讀是由下層往上層!

範例

  1. Cooper 先生在他的電腦上撰寫了一封電子郵件並點選「傳送」。此時資料開始由應用層向下傳輸。
  2. 在傳輸層,資料被分割成一個個的區段,每個區段包含源和目標的端口號。
  3. 在網路層,每個區段被包裝進一個封包,每個封包包含源IP地址(Cooper 先生的電腦)和目標IP地址(Palmer 女士的電腦)。此時,Cooper 先生的電腦會進行IP地址的比較,判斷目標IP是否在同一個網段。發現目標IP在不同網段,因此決定透過預設閘道(路由器)進行通信。
  4. 如果為第一次連線,Cooper 先生的電腦並不知道預設閘道(路由器)的MAC地址,因此透過 ARP 請求來獲取。此時會將包含路由器IP的ARP請求廣播到網路上(如果有接交換器,會透過交換器把ARP請求轉發到每個端口),路由器接收到此請求後會解析ARP請求並回應其MAC地址,電腦接收到ARP回應,並把該MAC地址加入快取,下次會先查快取資訊。
  5. 在資料連結層,每個封包被進一步包裝到一個數據幀中。每個數據幀包含源MAC地址(Cooper 先生的電腦)和目標MAC地址(已知的路由器MAC地址)。此外,還包含了一些其他控制資訊,如幀順序、錯誤檢查等。
  6. 最後,這些資料幀通過實體層被轉換成位元流,並通過網路傳送到路由器。
  7. 當路由器收到這些資料幀,它會查閱其路由表,確定將資料包發送到下一個網段的最佳路徑。然後將目標MAC地址更改為下一個目的地(另一個路由器或交換器)的MAC地址,將源MAC地址更改為自己的MAC地址,然後將資料幀發送出去。
  8. 這個過程會一直持續,直到資料到達 Palmer 女士的網段。在 Palmer 女士的網段中的交換器會將資料幀傳遞到具有目標MAC地址(即Palmer 女士的電腦)的設備。
  9. 最後,資料在 Palmer 女士的電腦上沿著OSI模型向上移動,從實體層到應用層,然後被她的電子郵件客戶端讀取和顯示。

· 15 min read
zaxro

prometheus

相較於Zabbix系統使用mysql之類的關聯式資料庫,prometheus使用是的TSDB時序資料庫,因其主要功能聚焦在看log跟分析數據,並不需要對不同表格做關聯.

採用tsdb的prometheus最最直觀的差別就是

  1. 使用 TSDB,它對系統資源的需求相對較低,這避免了 MySQL 等關聯式資料庫可能對系統資源的大量消耗
  2. 由於 TSDB 專為時間序列數據設計,它可以更有效地索引和查詢此類數據,使 Prometheus 的查詢速度比使用傳統關聯式資料庫的系統更快 在我自己的測試環境,用一台free tier的機器運行prometheus,也可以跑很順!

prometheus安裝

建立使用者

useradd --no-create-home --shell /bin/false prometheus

建立資料夾並授予使用者

mkdir -p /etc/prometheus /var/lib/prometheus
chown -R prometheus:prometheus /etc/prometheus /var/lib/prometheus

下載prometheus

wget https://github.com/prometheus/prometheus/releases/download/v2.44.0/prometheus-2.44.0.linux-amd64.tar.gz
tar xvfz prometheus-*.tar.gz

mv prometheus-2.44.0.linux-amd64 prometheuspackage
chown -R prometheus:prometheus prometheuspackage

· 3 min read
zaxro

Define the inventory

[filebeat]
10.199.14.[4:8] #change to your ips

playbook

file_beat_install.yml
---
- name: Install Filebeat
hosts: filebeat
become: yes
tasks:
- name: Import Elastic GPG key
rpm_key:
state: present
key: https://packages.elastic.co/GPG-KEY-elasticsearch

- name: Add Elastic repository
yum_repository:
name: elastic-8.x
description: Elastic repository for 8.x packages
baseurl: https://artifacts.elastic.co/packages/8.x/yum
gpgcheck: yes
gpgkey: https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled: yes

- name: Install filebeat
yum:
name: filebeat
state: present

- name: Enable filebeat service
systemd:
name: filebeat
enabled: yes
- name: Write content to /etc/filebeat/ca.crt
ansible.builtin.copy:
dest: /etc/filebeat/ca.crt
content: |
Bag Attributes
friendlyName: ca
localKeyID: "urlocal ID"
subject=/CN=Elasticsearch HTTP CA
issuer=/CN=Elasticsearch HTTP CA
-----BEGIN CERTIFICATE-----
your ca
-----END CERTIFICATE-----
owner: root
group: root
mode: '0644'

playbook command

check the changes,Make sure these chagnes is ok!

ansible-playbook -i hosts file_beat_install.yml --diff --check

Exucutate the command

ansible-playbook -i hosts file_beat_install.yml --diff

小結

以上是直接透過file_beat_install.yml帶入指定hosts,對機器做安裝filebeat.

改成用roles執行

如果是roles配合用site.yml作為接入口,site,yaml如下

/root/Ansible_DG/site.yaml
- hosts: filebeat
gather_facts: True
become: yes
become_user: root
become_method: su
become_exe: sudo su
roles:
- filebeat_install

roles資料夾底下整個結構會變這樣

filebeat_install/       # 角色名
tasks/ # 任务目录
main.yml # 任务定义文件

然後main.yml格式不能照前面的,要改成以下

/root/Ansible_DG/roles/filebeat_install/tasks/main.yml
---
- name: Install necessary packages
apt:
name:
- apt-transport-https
- wget
state: present

- name: Add Elastic GPG key
apt_key:
url: https://artifacts.elastic.co/GPG-KEY-elasticsearch
state: present

- name: Add Elastic repository
apt_repository:
repo: "deb https://artifacts.elastic.co/packages/8.x/apt stable main"
state: present
filename: elastic-8.x

- name: Update apt and install filebeat
apt:
name: filebeat
update_cache: yes
state: present

- name: Enable filebeat service
systemd:
name: filebeat
enabled: yes

- name: Write content to /etc/filebeat/ca.crt
ansible.builtin.copy:
dest: /etc/filebeat/ca.crt
content: |
Bag Attributes
friendlyName: ca
localKeyID: "urlocal ID"
subject=/CN=Elasticsearch HTTP CA
issuer=/CN=Elasticsearch HTTP CA
-----BEGIN CERTIFICATE-----
your ca
-----END CERTIFICATE-----
owner: root
group: root
mode: '0644'

並執行指令

ansible-playbook site.yaml -i hosts --limit filebeat
info

定義hosts有很多方式,在gcp部分因為GCP會自动為每个機器生成这样的内部DNS名稱,ex:[機器名稱].asia-east2-a.c.baseservice-he.internal.所以他在定義ansible那邊其實會比較易懂,可以直接用機器名稱解到IP.

· 13 min read
zaxro

資料夾結構

github位置

.
├── README.md
├── ansible.cfg
├── ansible.log
├── callback_plugins
│   ├── __pycache__
│   │   ├── profile_tasks.cpython-310.pyc
│   │   └── profile_tasks.cpython-39.pyc
│   ├── profile_tasks.py
│   └── profile_tasks.pyc
├── group_vars
│   └── CentOS7.yaml
├── host_vars
│   ├── Centos8.yaml
│   ├── localhost.yaml
│   └── prometheus_server
├── hosts
├── roles
│   ├── docker
│   │   ├── tasks
│   │   │   └── main.yaml
│   │   └── vars
│   │   └── main.yaml
│   ├── nginx
│   │   ├── tasks
│   │   │   └── main.yaml
│   │   └── vars
│   │   └── main.yaml
│   └── ubuntu_base
│   ├── files
│   │   ├── ansible.pub
│   │   └── kaka.pub
│   ├── handlers
│   │   └── main.yml
│   ├── tasks
│   │   └── main.yaml
│   ├── templates
│   │   └── sshd_config.j2
│   └── vars
│   └── main.yaml
├── site.yaml
└── test.yaml
  • 因為檔案太多了,讀的順序會是入口點site.yaml看是使用哪個role
  • roles資料夾底下看對應role,ex ubuntu_base,他底下也有一堆資料夾
  • ubuntu_base資料夾底下看tasks裡面的main.yaml,基本上該role會做的各指令就會在上面
    • tasks裡面會用到的變數要看vars資料夾 or host_vars or group_vars等資料夾,變數有很多層級會依據優先序去拿值!
    • files資料夾會放要複製過去的文件,ex.這邊放的是公鑰,也可以放更新的憑證等
    • templates資料夾也是放要複製到目標主機的文件,跟file資料夾差別是templates裡面放的文件可以透過挖變數把值放入,例如我要設定目標主機的sshd_config,做到吃變數以改ssh port這類操作
    • handlers資料夾裡面放的是當某條件達成後會執行的指令,常常用在設定檔有改動,才會重啟服務的操作.
  • roles結束後,會確認目標主機,這邊是放在hosts文件內(有很多放法,可以參考官方建議best pratices).

執行指令

ansible-playbook -i /path/to/your/inventory site.yml --limit 'yourInstancesGroup'

之後示範是先用這個指令,指定主機跟劇本!

ansible-playbook -i ./hosts site.yaml --limit 'Ubuntu22'

詳細各檔案內容

會以ubuntu為例,之後會寫判斷主機系統!

site.yml

第一個 play 針對 Ubuntu22 主機群組執行。gather_facts: True 表示在運行 play 之前搜集主機的事實。roles 中指定了 ubuntu_base 角色,其他意思差不多,預設情況下,Ansible 會在對應的roles資料夾的 tasks 資料夾中尋找並執行 main.yml 檔案。

site.yml
---
- hosts: Ubuntu22
gather_facts: True
roles:
- ubuntu_base

- hosts: Docker
gather_facts: False
roles:
- docker

- hosts: Nginx
gather_facts: True
roles:
- nginx

roles/ubuntu_base

這邊會用ubuntu_base資料夾做目標,底下tasks做的事情是對系統的基礎操作.ansible_distribution 是 Ansible 的系統事實 (system fact) 之一,它是 Ansible 在目標主機上執行時自動收集的系統相關資訊之一。

roles/ubuntu_base/tasks/main.yaml
# Disable SELinux
- name: diskable SELinux
selinux:
state: disabled
when: ansible_distribution == 'CentOS'


- name: Check if Service Exists
shell: "systemctl -a|grep {{ need_to_stopped_service}}|wc -l"
register: service_exists

- name: Stop service firewalld, if started
ansible.builtin.service:
name: "{{ need_to_stopped_service }}"
state: stopped
enabled: false
when: service_exists|int > 0

- name: Set a hostname
ansible.builtin.hostname:
name: "{{ hostname }}"
become: true

- name: install Base package
apt:
name: "{{ item }}"
state: latest
loop: "{{ base_packages }}"
become: true
# set the TimeZone
- name: Set timezone to Asia/Manila
community.general.timezone:
name: "{{ time_zone }}"
become: true


- name: Set sysctl file limits
#pam_limits: domain='*' limit_type=`item`.`limit_type` limit_item=`item`.`limit_item` value=`item`.`value`
become: true
pam_limits:
dest: "{{ item.dest }}"
domain: '*'
limit_type: "{{ item.limit_type }}"
limit_item: "{{ item.limit_item }}"
value: "{{ item.value }}"
with_items:
- { dest: '/etc/security/limits.conf',limit_type: 'soft',limit_item: 'nofile', value: '655350' }
- { dest: '/etc/security/limits.conf',limit_type: 'hard',limit_item: 'nofile', value: '655350'}
- { dest: '/etc/security/limits.conf',limit_type: 'soft',limit_item: 'nproc', value: '102400' }
- { dest: '/etc/security/limits.conf',limit_type: 'hard',limit_item: 'nproc', value: '102400' }
- { dest: '/etc/security/limits.conf',limit_type: 'soft',limit_item: 'sigpending', value: '255377' }
- { dest: '/etc/security/limits.conf',limit_type: 'hard',limit_item: 'sigpending', value: '255377' }
- { dest: '/etc/security/limits.d/90-nproc.conf', limit_type: 'soft',limit_item: 'nproc', value: '262144' }
- { dest: '/etc/security/limits.d/90-nproc.conf', limit_type: 'hard',limit_item: 'nproc', value: '262144' }
tags:
- setlimits

# add ops group
- name: Ensure group "ops" exists
become: true
ansible.builtin.group:
name: ops
state: present

# add ops user
- name: Add the user 'ops' with a bash shell, appending the group ops
become: true
ansible.builtin.user:
name: ops
shell: /bin/bash
groups: ops
append: yes

# add sudo previleges to the ops user
- name: Config /etc/sudoers
become: true
lineinfile: dest=/etc/sudoers state=present line='{{item}}' validate='visudo -cf %s'
with_items:
- "ops ALL=(ALL) NOPASSWD: ALL"
- "Defaults: ops !requiretty"

- name: Set up multiple authorized keys
become: true
authorized_key:
user: ops
state: present
key: '{{ item }}'
with_file: "{{ ssh_pub_key }}"


# config ssh config
- name: Update sshd configuration safely, avoid locking yourself out
become: true
ansible.builtin.template:
src: sshd_config.j2
dest: /etc/ssh/sshd_config
owner: root
group: root
mode: '0600'
validate: /usr/sbin/sshd -t -f %s
backup: yes
notify:
- (Handler) Restart SSHD Service
tags:
- ssd_config
  • 開頭的條件判斷,ansible_distribution 是用於條件判斷的一部分,以確定 SELinux 禁用任務僅在 CentOS 發行版上執行,在ubuntu不需要操作!

  • 中間的檔案限制, /etc/security/limits.conf 中的 nofile 限制是限制每個進程允許打開的檔案數量./etc/security/limits.conf 中的 nproc 限制是設定單個使用者可啟動的最大進程數量./etc/security/limits.conf 中的 sigpending 限制是設定單個使用者可以等待處理的信號(作業系統和程式互相溝通的機制)數量上限./etc/security/limits.d/90-nproc.conf 中的 nproc 限制也是限制是設定單個使用者可啟動的最大進程數量.會以90-nproc.conf優先,簡言之就是限制以下這幾件事.

    1. 開啟檔案數量
    2. 可以開啟的進程數
    3. 可等待處理的信號數
  • 文尾是告訴他怎麼複製sshd檔案到對面主機,而當一個任務包含了 notify 指令,那麼只有當這個任務的結果是「改變」(change)的時候,對應的 handler 才會被觸發。並且,無論有多少個任務觸發了同一個 handler,在整個 playbook 的運行過程中,那個 handler 都只會被運行一次,並且是在所有的任務都運行完成後。這種模式非常適合於管理那些只需要在配置改變時才需要運行的任務,比如服務的重啟。

roles/ubuntu_base/vars/main.yaml
base_packages:
# - epel-release
- vim
- git
- tree
- lrzsz
- lsof
- net-tools
# - openssl-devel
- wget
# - nc

time_zone: Asia/Manila

ssh_config:
port: 22
disalbe_root_login: False
disalbe_password_login: False


ssh_pub_key:
- files/test.pub

need_to_stopped_service: firewalld

定義挖空的變數,基本上變數順序很多,常用的優先序如下

  1. tasks/var
  2. host_vars
  3. group_vars

他有很多順序,詳細看官網這邊

roles/ubuntu_base/templates/sshd_config.j2
# $OpenBSD: sshd_config,v 1.100 2016/08/15 12:32:04 naddy Exp $

# This is the sshd server system-wide configuration file. See
# sshd_config(5) for more information.

# This sshd was compiled with PATH=/usr/local/bin:/usr/bin

# The strategy used for options in the default sshd_config shipped with
# OpenSSH is to specify options with their default value where
# possible, but leave them commented. Uncommented options override the
# default value.

# If you want to change the port on a SELinux system, you have to tell
# SELinux about this change.
# semanage port -a -t ssh_port_t -p tcp #PORTNUMBER
#
Port {{ ssh_config.port }}
#AddressFamily any
#ListenAddress 0.0.0.0
#ListenAddress ::

HostKey /etc/ssh/ssh_host_rsa_key
#HostKey /etc/ssh/ssh_host_dsa_key
HostKey /etc/ssh/ssh_host_ecdsa_key
HostKey /etc/ssh/ssh_host_ed25519_key

# Ciphers and keying
#RekeyLimit default none

# Logging
#SyslogFacility AUTH
SyslogFacility AUTHPRIV
#LogLevel INFO

# Authentication:

#LoginGraceTime 2m


{% if ssh_config.disalbe_root_login %}
PermitRootLogin no
{% else %}
PermitRootLogin yes
{% endif %}

#StrictModes yes
#MaxAuthTries 6
#MaxSessions 10

#PubkeyAuthentication yes

# The default is to check both .ssh/authorized_keys and .ssh/authorized_keys2
# but this is overridden so installations will only check .ssh/authorized_keys
AuthorizedKeysFile .ssh/authorized_keys

#AuthorizedPrincipalsFile none

#AuthorizedKeysCommand none
#AuthorizedKeysCommandUser nobody

# For this to work you will also need host keys in /etc/ssh/ssh_known_hosts
#HostbasedAuthentication no
# Change to yes if you don't trust ~/.ssh/known_hosts for
# HostbasedAuthentication
#IgnoreUserKnownHosts no
# Don't read the user's ~/.rhosts and ~/.shosts files
#IgnoreRhosts yes

# To disable tunneled clear text passwords, change to no here!

{% if ssh_config.disalbe_password_login %}
PasswordAuthentication no
{% else %}
PasswordAuthentication yes
{% endif %}




#PermitEmptyPasswords no

# Change to no to disable s/key passwords
#ChallengeResponseAuthentication yes
ChallengeResponseAuthentication no

# Kerberos options
#KerberosAuthentication no
#KerberosOrLocalPasswd yes
#KerberosTicketCleanup yes
#KerberosGetAFSToken no
#KerberosUseKuserok yes

# GSSAPI options
GSSAPIAuthentication yes
GSSAPICleanupCredentials no
#GSSAPIStrictAcceptorCheck yes
#GSSAPIKeyExchange no
#GSSAPIEnablek5users no

# Set this to 'yes' to enable PAM authentication, account processing,
# and session processing. If this is enabled, PAM authentication will
# be allowed through the ChallengeResponseAuthentication and
# PasswordAuthentication. Depending on your PAM configuration,
# PAM authentication via ChallengeResponseAuthentication may bypass
# the setting of "PermitRootLogin without-password".
# If you just want the PAM account and session checks to run without
# PAM authentication, then enable this but set PasswordAuthentication
# and ChallengeResponseAuthentication to 'no'.
# WARNING: 'UsePAM no' is not supported in Red Hat Enterprise Linux and may cause several
# problems.
UsePAM yes

#AllowAgentForwarding yes
#AllowTcpForwarding yes
#GatewayPorts no
X11Forwarding yes
#X11DisplayOffset 10
#X11UseLocalhost yes
#PermitTTY yes
#PrintMotd yes
#PrintLastLog yes
#TCPKeepAlive yes
#UseLogin no
#UsePrivilegeSeparation sandbox
#PermitUserEnvironment no
#Compression delayed
#ClientAliveInterval 0
#ClientAliveCountMax 3
#ShowPatchLevel no
#UseDNS yes
#PidFile /var/run/sshd.pid
#MaxStartups 10:30:100
#PermitTunnel no
#ChrootDirectory none
#VersionAddendum none

# no default banner path
#Banner none

# Accept locale-related environment variables
AcceptEnv LANG LC_CTYPE LC_NUMERIC LC_TIME LC_COLLATE LC_MONETARY LC_MESSAGES
AcceptEnv LC_PAPER LC_NAME LC_ADDRESS LC_TELEPHONE LC_MEASUREMENT
AcceptEnv LC_IDENTIFICATION LC_ALL LANGUAGE
AcceptEnv XMODIFIERS

# override default of no subsystems
Subsystem sftp /usr/lib/openssh/sftp-server

# Example of overriding settings on a per-user basis
#Match User anoncvs
# X11Forwarding no
# AllowTcpForwarding no
# PermitTTY no
# ForceCommand cvs server

這邊就是sshd的config,會把變數填入補空!如果相同則不會change!

roles/ubuntu_base/handlers/main.yaml
- name: (Handler) Restart SSHD Service
become: true
service:
name: sshd
state: restarted
enabled: yes

handels底下的yaml會對應tasks裡面某個有設定notify的task,當狀態為change會觸發handler操作,主要用途是用於修改設定檔後的服務重啟,如果設定沒改變就當然不用重啟了.

hosts

hosts
[Ubuntu22]
prometheus_server ansible_host=10.0.0.112

[Ubuntu22:vars]
ansible_user=ubuntu


[CentOS7]
localhost
Centos8

[Docker]
localhost
Centos8

[Nginx]
localhost
Centos8

因為我這邊主要示範對ubnutu部署,所以來看[Ubuntu22],這邊定義主機寫法prometheus_server ansible_host=10.0.0.112是因為方邊辨認,直接用ip看不出來是server用途.要注意的是,如果想要對host直接定義變數值,就需要在host_vars底下做一個對應的prometheus_server文件,這個很重要![Ubuntu22:vars]ansible_user=ubuntu這個是定義ansible連進去的使用者,因為預設會連到root,但有些ssh config是限制root不能連入!

vars

host_vars/prometheus_server
---
test1: value1
ansible_ssh_port: 22
hostname: backend1

這就是前面說的,很重要!要針對特定主機設定變數,你的檔名一定要跟你定義在hosts的相同!ex.host_vars/prometheus_server就會對應到hosts文件內的那行prometheus_server ansible_host=10.0.0.112,如果沒有對到就會沒變數!

group_vars/CentOS7.yaml
---
version: CentOS7

group_vars也是要對應到[group]這樣,跟hosts_vars很像!

檢查指令

你設定完之後,除了直接對主機執行等他跑完看結果,還有其他方式做檢查嗎?

以下指令可以檢查各台主機有吃到的變數

ansible-inventory -i hosts --list

檢查預計改變狀況,同時如果文件有異動,顯示異動情況,帶--check跟--diff

 ansible-playbook -i ./hosts site.yaml --limit 'Ubuntu22' --diff --check

· 6 min read
zaxro

Object-Oriented Programming

物件導向編程(Object-Oriented Programming,OOP)是設計風格,會實作以下幾項

  • 物件:把程式組成一個物件,具有屬性跟方法
  • 封裝:把程式邏輯封裝,只提供公開介面
  • 繼承: 可以繼承其他物件的屬性跟方法
  • 多型: 物件可以根據上下文的不同以不同的方式呈現
  • 抽象化: 抽象類別跟介面結構,並由物件實現

以上東西文字看了會不太懂,所以我這邊用ec2做舉例. aws提供一個api,它提供了很多方法去他的server撈資料,今天我要撈某區的ec2,會先建立連接,然後查詢有哪些ec2,然後把資料匯出並配合他的資訊做管控. 以上過程可以用幾個function串起來,像是connect_api check_ec2_info start_ec2 stop_ec2 reboot_ec2 get_ec2_public_dns(這邊只是舉例,不是實際功能). 那用oop來設計就會把建立check_ec2_info 查到的ec2,封裝到一個定義有start stop reboot get_public_dns方法的物件,你操作就會變成ec2.start(),stop(),reboot()這樣. 我個人是覺得管理跟理解上更好.

實作oop

以下實作項目:

  • 抽象化:就是code開頭在用abc規劃架構
  • 多型: 因為目前只寫aws的,但是要用到gcp那些也都會使用父輩的VM class,畢竟每一台機器,無論是gcp,azure,aws都會需要 開機 關機 重開等功能.
  • 繼承: EC2會繼承至VM架構
  • 封裝: 要做到stop, start, restart並加入一些小東西,並把這些東西集合在各個公開介面.

另外補充,該程式碼功能主要是拿來再起動ec2後,用程式幫我修改ssh config,避免每次都要自己上去查詢並修改. 基本上把ssh config裡面的host拉出來做變數再帶入寫會更好,不過這邊偷懶就先這樣寫囉.

main.py
from abc import ABC, abstractmethod
import boto3
from typing import Iterable
import argparse

# 抽象化去規劃VM結構 會適用於所有vm
class VM(ABC):

@abstractmethod
def start_instance(self):
pass


@abstractmethod
def stop_instance(self):
pass

@abstractmethod
def restart_instance(self):
pass

@abstractmethod
def _show_instance_public_dns(self):
pass

def create_instance(cls, instance_id, instance_type):
pass

# 繼承至VM,for aws vm 示範
class EC2(VM):
def __init__(self, ec2_client, instance_id, instance_type, instance_state, public_dns_name, tags):
self.ec2_client = ec2_client
self.instance_id = instance_id
self.instance_type = instance_type
self.instance_state = instance_state
self.public_dns_name = public_dns_name
self.tags = tags

def start_instance(self):
self.ec2_client.start_instances(InstanceIds=[self.instance_id])
print(f"{self.tags[0]['Value']}開機了")
return

def start_instance_update_ssh_config_too(self, target_host_in_ssh_config: str):
self.start_instance()
self._show_instance_public_dns()
update_result = self._update_ssh_config(target_host=target_host_in_ssh_config)
if update_result:
print(f"{self.public_dns_name} change to set {target_host_in_ssh_config} a new dns")
else:
print(f"Dont see {target_host_in_ssh_config} in ssh config")
return

def stop_instance(self):
self.ec2_client.stop_instances(InstanceIds=[self.instance_id])
if self.tags:

print(f"{self.tags[0]['Value']} is going to stop")
else:
print(f"{self.instance_id} is going to stop")
return

def restart_instance(self):
self.ec2_client.reboot_instances(InstanceIds=[self.instance_id])
if self.tags:
print(f"{self.tags[0]['Value']} is going to restart")
else:
print(f"{self.instance_id} is going to restart")
return
# 因為是內部使用所以在前面加 _ 這個符號
def _show_instance_public_dns(self):
# 等待实例状态变为 "running"
self.ec2_client.get_waiter('instance_running').wait(InstanceIds=[self.instance_id])

# 获取实例详细信息
response = self.ec2_client.describe_instances(InstanceIds=[self.instance_id])

# 提取公共 DNS 名称
self.public_dns_name = response['Reservations'][0]['Instances'][0]['PublicDnsName']
return

def _update_ssh_config(self, target_host: str):

config_file = '/Users/suyuying/.ssh/config' # SSH 配置文件路径

# 读取配置文件内容
with open(config_file, 'r') as file:
lines = file.readlines()

# 查找目标主机名的行索引,-1是為了一旦沒有鎖定到目標host,可以方便做if else處理
target_index = -1
# 尋找目標Host,並用字串處理確認是否符合
for i, line in enumerate(lines):
if line.strip().startswith('Host') and line.strip().split()[1] == target_host:
# 一但找到就把target index做修改
target_index = i
break

# 更新主机名
if target_index != -1:
# 以target_index為基準,處理他的下一行,也就是HostName那一行
lines[target_index + 1] = f' HostName {self.public_dns_name}\n'

# 写入更新后的内容
with open(config_file, 'w') as file:
file.writelines(lines)
print('SSH configuration updated successfully.')
return True
else:
print('Target host not found in SSH configuration.')
return False


# 使用组合来管理 EC2 实例
class EC2Cluster:
def __init__(self, instances: Iterable[EC2]):
self.instances = instances

def start_all(self):
for instance in self.instances:
instance.start_instance()
return

def start_all_update_ssh_config(self):
for instance in self.instances:
if instance.tags[0]['Value'] == 'ford-pmm-server':
target_host = "prometheus-server"
instance.start_instance_update_ssh_config_too(target_host)
elif instance.tags[0]['Value'] == 'ford-pmm-test':
target_host = "backend1"
instance.start_instance_update_ssh_config_too(target_host)
else:
print("not matched tags in cloud")
return

def stop_all(self):
for instance in self.instances:
instance.stop_instance()
return

#
# def datetime_encoder(obj):
# if isinstance(obj, datetime):
# return obj.isoformat()

# 取得ec2資訊,並製作物件
def get_instances(ec2_client_side):
response = ec2_client_side.describe_instances()

instances = []
for reservation in response['Reservations']:
for instance in reservation['Instances']:
instance_id = instance['InstanceId']
instance_type = instance['InstanceType']
instance_state = instance['State']['Name']
instance_public_dns_name = instance.get('PublicDnsName', '')
instance_tags = instance.get('Tags', [])

instance_info = {
'InstanceID': instance_id,
'InstanceType': instance_type,
'InstanceState': instance_state,
'PublicDNSName': instance_public_dns_name,
'InstanceTags': instance_tags
}
print(instance_info)
if instance_tags:
instance_info['Tags'] = instance_tags

ec2_instance = EC2(ec2_client_side,instance_id, instance_type, instance_state, instance_public_dns_name, instance_tags)

instances.append(ec2_instance)
return EC2Cluster(instances)


if __name__ == "__main__":

# 创建 ArgumentParser 对象
parser = argparse.ArgumentParser(description='EC2 Instances Control')

# 添加 start 选项
parser.add_argument('--start', action='store_true', help='Start EC2 instances and update config')

# 添加 stop 选项
parser.add_argument('--stop', action='store_true', help='Stop EC2 instances')

# 解析命令行参数
args = parser.parse_args()
ec2_client = boto3.client('ec2', region_name='us-west-1')
instances_cluster = get_instances(ec2_client_side=ec2_client)
# 根据选项执行相应操作
if args.start:
print('Starting EC2 instances...')
# 在这里添加启动 EC2 实例的逻辑
instances_cluster.start_all_update_ssh_config()
if args.stop:
print('Stopping EC2 instances...')
# 在这里添加停止 EC2 实例的逻辑
instances_cluster.stop_all()

· 9 min read
zaxro

應用情境介紹

如果你有一個套件,他是 I/O 密集的套件,例如 python 的 request,會對對方 server 請求,且這個時候會 block 住 thread,在這種情況下,你無法使用異步(因為同一 thread 會被 block 住),這時候就可以考慮用 multithread 做加速,也就是用並行執行方式提速,以下就是我自己在應用的其中一段 code,這樣用也要注意速率限制就是!

    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(get_file_do_scan, file_tg=file, api_keys=copy.deepcopy(my_api_keys)) for file in
just_compress_files]
for future in concurrent.futures.as_completed(futures):
future_result = future.result()
if future_result == "dangerous" or future_result is None:
continue
print(future_result)
print("以上檔案是安全的")

thread pool vs process pool

這邊都是以內建的concurrent.futures套件為例!主要用在並行執行程式.

  • thread 是 os 系統裡進行運算的最小單位,一個 process 可以有多個 thread(看你設計,也可以只有單個),thread 間會共享記憶體空間.
  • process 是 os 系統做資源分配跟調度的基本單位,每個 process 會有獨立記憶體空間.

在 python 世界裡,thread pool 跟 process pool 是兩樣不同東西,以下講結論

  • thread pool 適用於 i/o 密集工作,例如 同時對 api server 做請求!
  • process poo 適用於運算密集的工作!

thread pool

thread pool 的部分,受限於官方 python 的 CPython 記憶體管理問題,需要透過 GIL(Global Interpreter Lock)去做到單 process 中的多 thread 管理,當一個 process 運行多 thread 時,在 python 世界裡其實一次只會運行一個 thread,這時候會對 process 做 lock,當該 thread 遇到 I/O 密集工作(例如:查詢網站)時,就會釋放 GIL,讓另一個 thread 可以執行.

在使用 thread pool 時要注意,Python 中的 thread 會共享同一進程的記憶體空間,因此它們能夠訪問並修改全局變數。如果多個 thread 同時修改同一個全局變數,可能會導致不可預期的結果。所以全局變數的修改就要注意,避免去影響其他 thread.例如以下程式碼:my_api_keys是一個 list,如果你執行過程中會刪除該 list 內的元素,為了要避免影響其他 thread 就建議改成用 deep.copy 方式,每個 thread 放獨立的my_api_keys.

    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(get_file_do_scan, file_tg=file, api_keys=copy.deepcopy(my_api_keys)) for file in
just_compress_files]

process pool

process pool部分,使用它就會是應用多 cpu 的狀況,它會透過 process pool 利用多核 CPU 進行真正的並行計算(thread pool 有點假並行的感覺,因為它實際上會等另一個 thread),但 process pool 間通信成本高.

教學內容

以下都是講thread pool.

info

我當初是看這篇學習的

主要會用的就是submit()map()這兩種方法,使用submit()會配合as_completed去動態收集結果.

主要講一下,submit()的情況,當執行[executor.submit(fetch_yahoo_homepage, url) for url in url_will_request]時,會產生 future 物件組成的列表,同時即刻開始並行執行程式,接著這個 list 執行完之後.可以用as_completed去取得程式執行的結果,像以下這樣

with ThreadPoolExecutor() as executor:
# 使用 list comprehension 創建一個 Future 對象的列表
# 每個 Future 對象代表一個即將完成的操作(在這裡,操作是 fetch_yahoo_homepage 函數的調用)
# executor.submit 函數的參數是要調用的函數和該函數的參數
# 当你使用executor.submit()提交任务给线程池时,任务会立即开始执行。executor.submit()会立即返回一个concurrent.futures.Future对象
futures = [executor.submit(fetch_yahoo_homepage, url) for url in url_will_request]
for future in as_completed(futures):
# 使用 Future 對象的 result 方法來獲取操作的結果(在這裡,結果是 HTML 字串)
# 通过调用future.result()获取任务的执行结果
html = future.result()

map就有點像是把以上兩件事submitas_completed直接結合,他返回的是迭代器,可以迭代取得執行完畢結果,像這樣

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as e:
# Executors 的相關方法,map(func, *iterables),執行可迭代物件 ex list tuple named tuple
# 其實就是把可迭代物件的元素一個個拆開喂給Executor,提升程式執行速度
for hostinfo in e.map(lambda x: get_certificate(x[0], x[1]), HOSTS):
if hostinfo is None:
continue
print_basic_info(hostinfo)

看起來submit()map()很像,使用submit允許你檢查任務狀態,而使用map則是能夠簡潔的並行執行!

以下是我學習的過程,主要是用 request 網站來做 demo. 基本會用到的 function 跟套件,跟變數:list-url_will_request=["https://www.google.com",'https://www.yahoo.com'," https://www.youtube.com","https://www.nba.com"]

import urllib.request
import ssl
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
ssl._create_default_https_context = ssl._create_unverified_context
url_will_request=["https://www.google.com",'https://www.yahoo.com'," https://www.youtube.com","https://www.nba.com"]

def fetch_yahoo_homepage(url):
try:
response = urllib.request.urlopen(url)
html = response.read().decode('utf-8')
return html
except urllib.error.URLError as e:
print(f'Error: {e}')
return None

單 thread 一般版本

#single_thead
start_time_single_thead=time.time()
for i in url_will_request:
html=fetch_yahoo_homepage(i)
print(html[:100])
end_time_single_thead=time.time()
all_time_single_thead=end_time_single_thead-start_time_single_thead

multi-thead use map

start_time=time.time()
# 使用示例ex
with ThreadPoolExecutor() as executor:
for html in ecutor.map(fetch_yahoo_homepage,url_will_request ):
print(html[:100])
end_time=time.time()
all_time=end_time-start_time`

multi-thead and use submit

start_time_multi_thread_submit=time.time()

# 使用 ThreadPoolExecutor 作為上下文管理器
# 當離開這個區塊時,將自動清理並關閉所有線程
with ThreadPoolExecutor() as executor:
# 使用 list comprehension 創建一個 Future 對象的列表
# 每個 Future 對象代表一個即將完成的操作(在這裡,操作是 fetch_yahoo_homepage 函數的調用)
# executor.submit 函數的參數是要調用的函數和該函數的參數
# 当你使用executor.submit()提交任务给线程池时,任务会立即开始执行。executor.submit()会立即返回一个concurrent.futures.Future对象
futures = [executor.submit(fetch_yahoo_homepage, url) for url in url_will_request]

# 使用as_completed()函数来迭代这些Future对象,并在future中有程式完成时获取结果到future。
for future in as_completed(futures):
# 使用 Future 對象的 result 方法來獲取操作的結果(在這裡,結果是 HTML 字串)
# 通过调用future.result()获取任务的执行结果
html = future.result()

# 如果 HTML 不為空(也就是說,操作成功返回了 HTML),則打印 HTML 的前 100 個字符
if html:
print(html[:100])
end_time_multi_thread_submit=time.time()
all_time_multi_thread_submit=end_time_multi_thread_submit-start_time_multi_thread_submit

以上的 time 做時間比較

print(f"all_time_single_thead is {all_time_single_thead}")
print(f"all map time is {all_time}")
print(f"all_time_single_thead_submit is {all_time_multi_thread_submit}")
  • all_time_single_thead is 2.3154239654541016
  • all map time is 1.4191679954528809
  • all_time_single_thead_submit is 1.0428180694580078

可以看出用 multi_thread 運行還是比較快的!

小結

  • excutor.map(fetch_yahoo_homepage,url_will_request )當使用 map 方法時,第一個參數需要是可調用的對象(一個 function,或帶有__call__方法的物件),第二個參數則是可迭代物件(ex.string,set,list,str,dict)基本上只要實現__iter__就是.

  • executor.submit():一樣第一個參數需要是可調用的對象,第二個參數是可迭代物件

至於使用上可以參考上面的方法

· 6 min read
zaxro

安裝 kubeadm 遇到的問題

kubeadm 用於建立和管理叢集的基礎結構,而 kubectl 用於操作和管理叢集中的資源。這兩個工具在 Kubernetes 的部署和管理中扮演著不同的角色,安裝的官網介紹,

  1. 先安裝 docker(這邊是參考官網,也可以用其他種 container)
info

因為原本 centos 是內建 podman,但這邊為了符合 k8s 網站的教學所以使用 docker,我自己在安裝 docker 過程會遇到跟 podman 的衝突,像這樣

Docker CE Stable - x86_64                                                                                                                    133 kB/s | 3.5 kB     00:00
错误:
问题 1: problem with installed package podman-docker-3:4.3.1-2.module_el8.8.0+1254+78119b6e.noarch
- package docker-ce-3:24.0.2-1.el8.x86_64 conflicts with docker provided by podman-docker-3:4.3.1-2.module_el8.8.0+1254+78119b6e.noarch
- package docker-ce-3:24.0.2-1.el8.x86_64 conflicts with docker provided by podman-docker-3.1.0-0.13.module_el8.5.0+733+9bb5dffa.noarch
- package docker-ce-3:24.0.2-1.el8.x86_64 conflicts with docker provided by podman-docker-3.3.0-0.15.module_el8.5.0+870+f792de72.noarch
- package docker-ce-3:24.0.2-1.el8.x86_64 conflicts with docker provided by podman-docker-3.3.0-0.17.module_el8.5.0+874+6db8bee3.noarch
- package docker-ce-3:24.0.2-1.el8.x86_64 conflicts with docker provided by podman-docker-3.3.0-2.module_el8.5.0+877+1c30e0c9.noarch

解法是先移除 podman,並優先使用 docker 的 repo.安裝在 centos 可以看這篇

  1. 移除 podman
yum remove podman
  1. 擴展工具集
yum install -y yum-utils
  1. 添加軟體庫
yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo
  1. 修改權限,把/etc/yum.repos.d/docker-ce.repo加上 priority=10,這樣就不會跟/etc/yum.repos.d/CentOS-Stream-AppStream.repo衝突!
/etc/yum.repos.d/docker-ce.repo
[docker-ce-stable]
name=Docker CE Stable - $basearch
baseurl=https://download.docker.com/linux/centos/8/$basearch/stable
enabled=1
gpgcheck=1
gpgkey=https://download.docker.com/linux/centos/gpg
priority=10

5.安裝 docker

yum install docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin

6.起服務

systemctl start docker
  1. 安裝 kubelet kubeadm kubectl
cat <<EOF | sudo tee /etc/yum.repos.d/kubernetes.repo
[kubernetes]
name=Kubernetes
baseurl=https://packages.cloud.google.com/yum/repos/kubernetes-el7-\$basearch
enabled=1
gpgcheck=1
gpgkey=https://packages.cloud.google.com/yum/doc/rpm-package-key.gpg
exclude=kubelet kubeadm kubectl
EOF

# Set SELinux in permissive mode (effectively disabling it)
sudo setenforce 0
sudo sed -i 's/^SELINUX=enforcing$/SELINUX=permissive/' /etc/selinux/config

sudo yum install -y kubelet kubeadm kubectl --disableexcludes=kubernetes

sudo systemctl enable --now kubelet

基本上這邊會發現 kubelet 服務報錯無法起來,接著是講解法!

  1. 使用kubeadm init依然會報錯,這個指令目的是建立 master node(也就是 control plane),包括 API Server、Controller Manager 和 Scheduler。當您運行 kubeadm init 時,它將生成初始化控制平面所需的配置文件、證書和其他資源。kubeadm init報錯如下
    [ERROR CRI]: container runtime is not running: output: time="2023-05-29T16:13:25+08:00" level=fatal msg="validate service connection: CRI v1 runtime API is not implemented for endpoint \"unix:///var/run/containerd/containerd.sock\": rpc error: code = Unimplemented desc = unknown service runtime.v1.RuntimeService"

以上報錯如何解決

這個問題是因為/etc/containerd/config.toml設定有問題的關係 ,解法有兩種

  • 最簡單解法把/etc/containerd/config.toml 刪掉(可以先備份),這會讓 k8s 使用預設資料
rm /etc/containerd/config.toml
  • 比較複雜解法就要了解 /etc/containerd/config.toml 如何設定,預設是 disabled_plugins
disabled_plugins = ["cri"]

要改成 enabled_plugins,表示啟用 CRI 插件,因為 k8s 預設使用 CRI(Container Runtime Interface)來管理跟運行容器,如果禁用 CRI 插件或没有指定有效的容器配置,Kubernetes 將會報錯!

enabled_plugins = ["cri"]
[plugins."io.containerd.grpc.v1.cri".containerd]
endpoint = "unix:///var/run/containerd/containerd.sock"
  1. allow user to use this cluster
mkdir -p $HOME/.kube
cp -i /etc/kubernetes/admin.conf $HOME/.kube/config
chown $(id -u):$(id -g) $HOME/.kube/config
  1. Setup Your Pod Network
  • 建立檔案network.yml
network.yml在這!
network.yml
---
kind: Namespace
apiVersion: v1
metadata:
name: kube-flannel
labels:
k8s-app: flannel
pod-security.kubernetes.io/enforce: privileged
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
labels:
k8s-app: flannel
name: flannel
rules:
- apiGroups:
- ""
resources:
- pods
verbs:
- get
- apiGroups:
- ""
resources:
- nodes
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- nodes/status
verbs:
- patch
- apiGroups:
- networking.k8s.io
resources:
- clustercidrs
verbs:
- list
- watch
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
labels:
k8s-app: flannel
name: flannel
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: flannel
subjects:
- kind: ServiceAccount
name: flannel
namespace: kube-flannel
---
apiVersion: v1
kind: ServiceAccount
metadata:
labels:
k8s-app: flannel
name: flannel
namespace: kube-flannel
---
kind: ConfigMap
apiVersion: v1
metadata:
name: kube-flannel-cfg
namespace: kube-flannel
labels:
tier: node
k8s-app: flannel
app: flannel
data:
cni-conf.json: |
{
"name": "cbr0",
"cniVersion": "0.3.1",
"plugins": [
{
"type": "flannel",
"delegate": {
"hairpinMode": true,
"isDefaultGateway": true
}
},
{
"type": "portmap",
"capabilities": {
"portMappings": true
}
}
]
}
net-conf.json: |
{
"Network": "10.244.0.0/16",
"Backend": {
"Type": "vxlan"
}
}
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: kube-flannel-ds
namespace: kube-flannel
labels:
tier: node
app: flannel
k8s-app: flannel
spec:
selector:
matchLabels:
app: flannel
template:
metadata:
labels:
tier: node
app: flannel
spec:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: kubernetes.io/os
operator: In
values:
- linux
hostNetwork: true
priorityClassName: system-node-critical
tolerations:
- operator: Exists
effect: NoSchedule
serviceAccountName: flannel
initContainers:
- name: install-cni-plugin
image: docker.io/flannel/flannel-cni-plugin:v1.1.2
#image: docker.io/rancher/mirrored-flannelcni-flannel-cni-plugin:v1.1.2
command:
- cp
args:
- -f
- /flannel
- /opt/cni/bin/flannel
volumeMounts:
- name: cni-plugin
mountPath: /opt/cni/bin
- name: install-cni
image: docker.io/flannel/flannel:v0.22.0
#image: docker.io/rancher/mirrored-flannelcni-flannel:v0.22.0
command:
- cp
args:
- -f
- /etc/kube-flannel/cni-conf.json
- /etc/cni/net.d/10-flannel.conflist
volumeMounts:
- name: cni
mountPath: /etc/cni/net.d
- name: flannel-cfg
mountPath: /etc/kube-flannel/
containers:
- name: kube-flannel
image: docker.io/flannel/flannel:v0.22.0
#image: docker.io/rancher/mirrored-flannelcni-flannel:v0.22.0
command:
- /opt/bin/flanneld
args:
- --ip-masq
- --kube-subnet-mgr
resources:
requests:
cpu: "100m"
memory: "50Mi"
securityContext:
privileged: false
capabilities:
add: ["NET_ADMIN", "NET_RAW"]
env:
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: EVENT_QUEUE_DEPTH
value: "5000"
volumeMounts:
- name: run
mountPath: /run/flannel
- name: flannel-cfg
mountPath: /etc/kube-flannel/
- name: xtables-lock
mountPath: /run/xtables.lock
volumes:
- name: run
hostPath:
path: /run/flannel
- name: cni-plugin
hostPath:
path: /opt/cni/bin
- name: cni
hostPath:
path: /etc/cni/net.d
- name: flannel-cfg
configMap:
name: kube-flannel-cfg
- name: xtables-lock
hostPath:
path: /run/xtables.lock
type: FileOrCreate
  • 套用設定檔
kubectl apply -f network.yml

會出現以下代表成功

namespace/kube-flannel created
clusterrole.rbac.authorization.k8s.io/flannel created
clusterrolebinding.rbac.authorization.k8s.io/flannel created
serviceaccount/flannel created
configmap/kube-flannel-cfg created
daemonset.apps/kube-flannel-ds created

檢查確認是否 control plane 是 ready.

kubectl get nodes #確認是否為ready

NAME STATUS ROLES AGE VERSION
73gogo Ready control-plane 3h9m v1.27.2

· 7 min read
zaxro

裝飾器

寫程式很常看到高級技巧裝飾器,@xxxx 這種東西,因為之前都可以用一些方式避開不這樣寫,直到某次在跟別人串程式的時候,為了減少影響範圍,所以選用裝飾器去拿對方 return 的東西在處理一次,而對方也很簡單,只要把裝飾器放上去就可以直接用了,很方便!裝飾器主要應用情境是在不影響對方 function 下,給這個 function 新功能,裝飾器的彈性和可擴展性使其成為 Python 中強大的程式設計工具之一.

partial function

因為裝飾器有時候會配合 partial function,所以要了解一下.

partial 是 python 的偏函示,使用上感覺就是方便你帶入預設參數到一個已知 function,並透過偏函式去建立新的 function!使用 partial 函數可以非常方便地對現有函式進行定制和重用。那為何不預設帶入?總會有需要多個預設值的情況吧~

  • 偏函示範例一
from functools import partial

# 原始函式
def add(x, y):
return x + y

# 創建偏函式
add_five = partial(add, y=5)

# 使用偏函式
result = add_five(10)
print(result) # 輸出: 15
  • 偏函示範例二
from functools import partial

# 定義一個具有多個參數的函式
def multiply(x, y, z):
return x * y * z

# 使用偏函數將其中一個參數的預設值固定為 2
new_func = partial(multiply, y=2)

# 調用新函式
result = new_func(3, z=4)
print(result) # 輸出: 24

裝飾器正篇

from functools import partial, wraps
import vt

def partial_decorator1(func=None, *, apikey=api_key_global):
def actual_decorator(func, apikey):
@wraps(func)
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)
sha256_hash = result
client = vt.Client(apikey)
file = client.get_object(f"/files/{sha256_hash}")
print(file.last_analysis_stats)
client.close()
return result

return wrapper

if func:
return actual_decorator(func, apikey)
else:
return partial(actual_decorator, apikey=apikey)

# 使用新的装饰器
@partial_decorator1(apikey=api_key_global)
def my_function(file_path):
# 在这里执行你的逻辑并返回一个哈希值
return file_path

result = func(*args, **kwargs)這邊會拿到原始 function 帶入參數的一般 return!後面的程式碼是對該 return 作處理!

最後面的 if else 會比較難懂.

  • if func: 這部分處理的是當裝飾器被用於一個函數時(也就是被直接應用到函數上,例如 @partial_decorator1)的情況。在這種情況下,func 參數會被自動設置為被裝飾的函數。這時,actual_decorator(func, api_key) 會被調用,其結果(也就是裝飾過的函數)被返回。

  • else: 這部分處理的是當裝飾器被當作函數調用(例如 @partial_decorator1() 或 @partial_decorator1(api_key=some_key))的情況。在這種情況下,func 參數會默認為 None,因此 else 分支會被執行。 partial(actual_decorator, api_key=api_key) 創建了一個新的裝飾器,這個裝飾器會接受一個函數作為參數,並返回該函數的裝飾版本。新的裝飾器會在稍後的時間被調用,這時才會裝飾目標函數。

當裝飾器以 @partial_decorator1 的形式應用到函數上時,會走 if func: 分支;而當裝飾器以 @partial_decorator1() 或 @partial_decorator1(api_key=some_key) 的形式應用時,會走 else: 分支

裝飾器範例二

from functools import partial, wraps

def partial_decorator1(generator_func=None, *, api_key="api_key_global"):
def actual_decorator(generator_func, api_key):
@wraps(generator_func)
def wrapper(*args, **kwargs):
# 打印字串
print("Before generator execution")
# 並不會執行,只是產生生成器而已
result = generator_func(*args, **kwargs)
print(api_key)
# 打印字串
print("After generator execution")
return result

return wrapper

if generator_func is None:
return partial(actual_decorator, api_key=api_key)
else:
return actual_decorator(generator_func, api_key)


# @partial_decorator1(api_key="my_api_key"),這是代表裝飾器被當作函式調用情況,
# 因為調用時沒有帶generator_func,所以generator_func會是預設的None.會走if那邊,
# 用partial(actual_decorator, api_key=api_key),產生新裝飾器,這個裝飾器會接受一個函數作為參數,並返回該函數的裝飾版本。
@partial_decorator1(api_key="my_api_key")
def my_generator():
yield 1
yield 2
yield 3

# 使用装饰器修饰生成器函数,這邊是在修改帶入的既有餐數,api_key="my_api_key"
# 注意這邊的my_generator()是返回<generator object my_generator at 0x10041ce00>
# yield是實現generator的一種方法!
for value in my_generator():
print(value)

# 使用 partial_decorator1 创建装饰器,@partial_decorator1不帶參數,就會直接應用到function
# ,因此參數generator_func就會有帶東西,而走else.else做的事情是修飾並return新function
# actual_decorator(generator_func, api_key)會修飾function,return新function.
@partial_decorator1
def my_generator1():
yield 1
yield 2
yield 3

# 使用装饰器修饰生成器函数,這邊是如果什麼都不帶,就會用默認的api_key="api_key_global"
for value in my_generator1():
print(value)

小結

一般裝飾器不帶括號(ex.@partial_decorator1),代表該裝飾器會自動將 func 參數設定為套用的 func,這滿單純! 如果有帶括號(ex.@partial_decorator1(api_key=some_key)),會需要指定帶入的 function,就需要透過 partial 寫法去指定要帶入哪個 function 作為裝飾器,不過都用裝飾器了,當然會帶入要執行的包裝 function.

另外,裝飾器寫法很多,這邊寫的只是其中一種!

info

裝飾器主要應用情境

  • 記錄日誌:裝飾器可以捕獲函式的輸入和輸出,並將其記錄到日誌中,以便後續調試和追蹤。

  • 身份驗證和授權:裝飾器可以在執行函式之前檢查用戶的身份驗證和權限,以確保只有授權的用戶可以訪問該函式。

  • 快取:裝飾器可以將函式的輸出快取起來,以減少重複計算,提高效能。

  • 異步處理:裝飾器可以將函式的執行轉換為非同步處理,以提高系統的反應性和併發能力。

  • 計時和性能監控:裝飾器可以計時函式的執行時間,並收集性能指標,用於分析和優化代碼的效能。

· 7 min read
zaxro

這邊會記錄一些實用的 coding 方法

coding 部分

False 跟 None 使用時機

False 主要做條件判斷,None 則用於表示缺少值,或變量沒有被給值的請況. 舉例來說,今天 request 想要取得資料放入變數 stock_info, 如果判斷取出來資料不合預期,或者報錯,那就適合用 None. 那 False 就是拿來表達條件成不成立的情況,例如判斷股票是否上漲,可以這樣定義變數,is_price_up,True代表上漲,False代表下跌.

# 使用 None 表示缺少值或變量尚未被賦予任何值
name = None
if name is None:
print("名字尚未被賦值")

# None 作為函式的返回值
def divide(a, b):
if b == 0:
return None
return a / b

result = divide(10, 0)
if result is None:
print("無效的操作")

# 使用 False 進行條件判斷
flag = False
if flag:
print("這個條件不成立")
else:
print("這個條件成立")

請求 api 失敗後 retry

有時候對方 server 不知道啥問題,會出現報錯,過一下又好了,這邊用 while 配合數字開關,去執行 retry,不過要記得 try,except 只存在於當下文檔,不會跨文檔接 Error.

max_retries = 3
retries = 0
while retries<max_retries:
try:
request....
return sth
except Exception as err:
retries += 1
time.sleep(1)# 延遲一秒後再重新執行
if retries==max_retries:
telegram_bot_sendtext(f"sth wrong, go cheeck")
print(err)
return None

或者配合嘗試不同 key,同時做連線 retry

· 5 min read
zaxro

以往常常用 root 連到機器操作,那最近在 lab 機踢到鐵板了,關於不要用 root 登入操作討論很多,這邊就不說原因了!

用 root 執行minikube start會遇到報錯

❌  Exiting due to DRV_AS_ROOT: The "docker" driver should not be used with root privileges.

這邊看到很明顯他不讓你用 root 執行該指令.good!!

創建 ec2-user 並給予權限

執行以下命令以創建新使用者:

adduser ec2-user

設定使用者密碼

passwd ec2-user

將使用者新增至 wheel 群組,以獲得 sudo 權限:

usermod -aG wheel ec2-user

在 CentOS 系統中,wheel 群組通常被配置為具有特殊權限的群組,允許其成員使用 sudo 命令獲得 root 權限。通常,系統管理員可以將自己或其他需要具有管理權限的使用者新增至 wheel 群組,以便執行需要 root 權限的任務。 具有 wheel 群組成員身份的使用者可以使用 sudo 命令執行以 root 權限運行的命令。在上述命令中,-aG 選項指示 usermod 命令將使用者新增至指定的群組中而不移除已有的群組設定。

確認 sudoers 文件中的設定允許 wheel 群組的成員執行 sudo 指令。執行以下命令以編輯 sudoers 文件:

visudo

在打開的文件中,尋找以下行:

## Allows people in group wheel to run all commands
# %wheel ALL=(ALL) ALL

取消註釋(刪除前面的 # 符號)並保存文件,這給在 wheel 群組內的使用者可以用 sudo 執行任何權限

將 ec2-user 加到 docker 群組

將 ec2-user 加到 docker 群組

usermod -aG docker ec2-user

執行以下命令來使群組更改生效:

newgrp docker

加入 'docker' 群組後,您可以在不使用 sudo 的情況下執行 Docker 命令

如何確認有加入群組

看/etc/group 會看到有新增使用者

vi /etc/group
ssh_keys:x:997:
sshd:x:74:
postdrop:x:90:
postfix:x:89:
docker:x:996:ec2-user
cgred:x:995:
mysql:x:27:
apache:x:48:
nginx:x:994:
zabbix:x:993:
tss:x:59:
stapusr:x:156:
stapsys:x:157:
stapdev:x:158:
saslauth:x:76:
mongod:x:992:
ec2-user:x:1000:

如何連線到 ec-user

  1. 確認有生成公私鑰,如無則執行以下指令生成
ssh-keygen -t rsa -b 4096

確認生成的公鑰位置在~/.ssh/id_rsa.pub

  1. 使用 ssh-copy-id 指令將公鑰添加到遠程 ec2-user 使用者的帳戶中。remote_host 記得換成 你的 ip(ex.我的 ip 是 192.168.185.72),記得你的私鑰(~/.ssh/id_rsa)基本上不會給別人
ssh-copy-id -i ~/.ssh/id_rsa.pub ec2-user@<remote_host>
  1. 測試連入
ssh 'ec2-user@192.168.185.72'
  1. 設定~/.ssh/config,像這樣,
~/.ssh/config
Host            agent-ec2user
HostName 192.168.185.72
Port 22
User ec2-user
IdentityFile ~/.ssh/id_rsa
  1. 設定 ssh 的通用配置
Host *
ServerAliveInterval 10
HostKeyAlgorithms +ssh-rsa
PubkeyAcceptedKeyTypes +ssh-rsa
  • Host *: 這是一個通配符,表示適用於所有的主機。在下面的設定中,這個通配符將應用於所有的主機設定。

  • ServerAliveInterval 10: 此選項指定了 SSH 客戶端和伺服器之間的連線活著檢查時間間隔。在此例中,設定為 10 秒,表示每 10 秒發送一次保持連線的訊號。這有助於保持連線的穩定性。

  • HostKeyAlgorithms +ssh-rsa: 此選項指定了客戶端支援的主機金鑰驗證演算法。+ssh-rsa 表示除了預設的驗證演算法外,還允許使用 ssh-rsa 演算法進行主機金鑰驗證。

  • PubkeyAcceptedKeyTypes +ssh-rsa: 此選項指定了客戶端支援的用於公鑰驗證的金鑰類型。+ssh-rsa 表示除了預設的金鑰類型外,還接受使用 ssh-rsa 金鑰進行公鑰驗證。