此文只是测试使用,后续可能对接应用平台。
#!/bin/python3
#######################################################
# This script is to send messages to Lijuan regularly #
# Date: 2020-2-17 #
# Author: cuijianzhe #
# Email: 598941324@qq.com #
#######################################################
import requests,json,sys
mobiles=sys.argv[1]
messages=sys.argv[2]
def get_token():
data = {"app_id":"cli_xxxxxxxxxxx9d","app_secret":"YJJxxxxxxxxxxxxxxxxxxxxxxxxxxxxYUi"}
headers = {"Content-Type": "application/json"}
url_token = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/"
try:
res = requests.post(url_token, json=data, headers=headers)
if res.status_code == 200:
token = (json.loads(res.text)).get('tenant_access_token')
return token
except:
print('请求失败')
headers_group = {
"Authorization" : "Bearer %s"%(get_token()),
"Content-Type" : "application/json"
}
def getuserid(): #根据手机号get用户id
userurl = "https://open.feishu.cn/open-apis/user/v1/batch_get_id?mobiles=%s"%mobiles
res_data = requests.get(url=userurl, headers=headers_group)
userid = json.loads(res_data.text)['data']['mobile_users'][mobiles][0]['user_id']
return userid
def send_messages(userID):
data1 = {
"user_id": userID,
"msg_type": "text",
"content": {
"text": "%s <at user_id=\"%s\">test</at>"%(messages,userID)
}
}
url_mess = "https://open.feishu.cn/open-apis/message/v4/send/"
res_mess = requests.post(url_mess,json=data1,headers=headers_group)
if __name__ == "__main__":
token = get_token()
user_ID = getuserid()
send_messages(user_ID)
import requests,json,linecache,random
file = 'word'
def get_token():
data = {"app_id":"cli_xxxxxxxxxxxxxxxd","app_secret":"YJJxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxYUi"}
headers = {"Content-Type": "application/json"}
url_token = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/"
try:
res = requests.post(url_token, json=data, headers=headers)
if res.status_code == 200:
token = (json.loads(res.text)).get('tenant_access_token')
return token
except:
print('请求失败')
headers_group = {
"Authorization" : "Bearer %s"%(get_token()),
"Content-Type" : "application/json"
}
def get_chatid():
url_group = "https://open.feishu.cn/open-apis/chat/v4/list?"
try:
res_group = requests.get(url_group,headers=headers_group)
if res_group.status_code == 200:
chatid = ((json.loads(res_group.text)).get('data').get('groups'))[0].get('chat_id')
return chatid
except:
print('请求失败')
def getuserid(): #根据手机号get用户id
mobiles = "18600796142" #测试号,可@所有人
userurl = "https://open.feishu.cn/open-apis/user/v1/batch_get_id?mobiles=%s"%mobiles
res_data = requests.get(url=userurl, headers=headers_group)
userid = json.loads(res_data.text)['data']['mobile_users'][mobiles][0]['user_id']
return userid
def send_messages(userID,chatID):
with open(file, 'r', encoding='utf-8') as good:
word = linecache.getline(file, random.randint(1, len(good.readlines()))).replace(',', ' ')
data1 = {
"chat_id": chatID,
"user_id": userID,
"msg_type": "text",
"content": {
"text": "%s <at user_id=\"%s\">love</at>"%(word,userID)
}
}
url_mess = "https://open.feishu.cn/open-apis/message/v4/send/"
res_mess = requests.post(url_mess,json=data1,headers=headers_group)
if __name__ == "__main__":
token = get_token()
chatid = get_chatid()
user_ID = getuserid()
send_messages(user_ID,chatid)
#!/bin/env python3
########################################################
# This script is to send emails to Lijuan regularly #
# Date: 2020-2-24 #
# Author: cuijianzhe #
# Email: 598941324@qq.com #
########################################################
import requests
import json
import os
import random
import linecache
import logging
import datetime
logging.basicConfig(filename='/scripts/feishu/log', level=logging.DEBUG,
datefmt='%Y-%m-%d %H:%M:%S',
format='%(asctime)s - %(levelname)s - %(lineno)d - %(message)s')
logger = logging.getLogger(__name__)
def get_token():
data = {"app_id":"cli_xxxxxxxxxxx","app_secret":"YJJ7xxxxxxxxxxxxxxxxxxxxYUi"}
headers = {"Content-Type": "application/json"}
url_token = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/"
try:
res = requests.post(url_token, json=data, headers=headers)
if res.status_code == 200:
token = (json.loads(res.text)).get('tenant_access_token')
return token
except:
print('请求失败')
headers_group = {
"Authorization" : "Bearer %s"%(get_token()),
"Content-Type" : "application/json"
}
def getuserid(mobile): #根据手机号get用户id
userurl = "https://open.feishu.cn/open-apis/user/v1/batch_get_id?mobiles=%s"%mobile
res_data = requests.get(url=userurl, headers=headers_group)
userid = json.loads(res_data.text)['data']['mobile_users'][mobile][0]['user_id']
return userid
def uploadimg():
imgname = random.choice(os.listdir('/scripts/feishu/images'))
# 上传图片接口,get image key
with open("/scripts/feishu/images/%s"%imgname,'rb') as p:
image = p.read()
imgurl = "https://open.feishu.cn/open-apis/image/v4/put/"
headers = {"Authorization" : "Bearer %s"%get_token()}
files = {
'image':image
}
imgdata = {
"image_type": "message"
}
resp = requests.post(url=imgurl,headers=headers,files=files,data=imgdata)
os.remove('/scripts/feishu/images/%s'%imgname)
resp.raise_for_status()
content = resp.json()
return content['data']['image_key']
def sendmess(path,user_id,image_key=None):
with open(path, encoding='utf-8') as yuju:
qinghua = linecache.getline(path, random.randint(1, len(yuju.readlines()))).split('、')[1].strip().replace(' ',' ')
message_url = "https://open.feishu.cn/open-apis/message/v4/send/"
# 发送富文本消息
data = {
"user_id": user_id,
"msg_type": "post",
"content": {
"post": {
"zh_cn": {
"title": "表情包来了",
"content": [
[
{
"tag": "text",
"un_escape": True,
"text": "%s :"%qinghua
},
{
"tag": "at",
"user_id": user_id
}
],
[
{
"tag": "img",
"image_key": image_key,
"width": 1080,
"height": 1080
}
]
]
}
}
}
}
request = requests.post(url=message_url, headers=headers_group, json=data)
if __name__ == '__main__':
logger.info('Started..')
token = get_token()
mobiles = ["18xxxxx42","17xxxxxxx3"]
for iphone in mobiles:
user_ID = getuserid(iphone)
imgkey = uploadimg()
sendmess('/scripts/feishu/wenben',user_ID,imgkey)
logger.info("Finished!\n")