123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574 |
- import base64
- import hashlib
- import hmac
- import json
- import urllib.parse
- from queue import Queue
- import requests
- from selenium import webdriver
- from bs4 import BeautifulSoup
- import time
- #driver = webdriver.Chrome() # Chrome浏览器
- from selenium.common import exceptions
- from selenium.webdriver import Keys, ActionChains
- from selenium.webdriver.common.by import By
- from selenium.webdriver.support.wait import WebDriverWait
- from selenium.webdriver.support import expected_conditions as EC
- class amro():
- def __init__(self):
- try:
- self.driver = webdriver.Edge()
- self.actions = ActionChains(self.driver)
- except:
- self.driver = webdriver.Chrome()
- self.actions = ActionChains(self.driver)
- self.url = r"http://me.sichuanair.com/login.shtml"
- self.username = "022673"
- self.password = "yBQG71@8FC"
- self.activeMode=""
- self.q = Queue()
- self.q2 = Queue()
- def openWeb(self, url):
- self.driver.get(url)
- self.q.put("cz")
- #打开并登录
- def openAmro(self):
- self.driver.get(self.url)
- WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.ID, 'singleSubmit')))
- self.timerSingal()
- loginbtn = self.driver.find_element('id', 'singleSubmit')
- time.sleep(0.5)
- self.actions.click(loginbtn)
- self.actions.perform()
- WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.NAME, "j_username"))).send_keys(self.username)
- time.sleep(0.5)
- self.driver.find_element('name', "j_password").send_keys(self.password)
- time.sleep(0.5)
- self.driver.find_element('name', "j_password").send_keys(Keys.ENTER)
- #进入工作指令WO,获取工作包
- def WO(self):
- self.timerSingal()
- self.driver.switch_to.parent_frame()
- time.sleep(0.5)
- nav_lables = WebDriverWait(self.driver, 10).until(EC.presence_of_all_elements_located((By.CLASS_NAME, 'nav-label')))
- for i in nav_lables:
- print(i.text)
- title = i.text
- if title == '航线维修':
- self.actions.click(i)
- self.actions.perform()
- time.sleep(2)
- break
- subtitle = WebDriverWait(self.driver, 10).until(EC.presence_of_all_elements_located((By.CLASS_NAME, 'J_menuItem')))
- # subtitle = driver.find_elements(By.CLASS_NAME,'J_menuItem')
- for i in subtitle:
- title = i.text
- print(title)
- if title == "工作指令(WO)管理":
- self.actions.click(i)
- self.actions.perform()
- time.sleep(1)
- break
- fr = WebDriverWait(self.driver, 30).until(EC.presence_of_all_elements_located((By.TAG_NAME, "iframe")))
- print(len(fr))
- self.driver.switch_to.frame(1)
- jd = WebDriverWait(self.driver, 30).until(
- EC.presence_of_element_located((By.CSS_SELECTOR, '.tdr:nth-child(8) .textbox-icon')))
- # jd = driver.find_element(By.CSS_SELECTOR, '.tdr:nth-child(8) .textbox-icon')
- self.actions.click(jd)
- self.actions.perform()
- time.sleep(1)
- tf = self.driver.find_element(By.ID, '_easyui_combobox_i1_1')
- self.actions.click(tf)
- self.actions.perform()
- time.sleep(1)
- search = self.driver.find_element(By.CSS_SELECTOR, 'td:nth-child(10) > .searchBtn > span')
- self.actions.click(search)
- self.actions.perform()
- WebDriverWait(self.driver, 30).until_not(EC.presence_of_element_located((By.CSS_SELECTOR, '.datagrid-mask-msg')))
- page = WebDriverWait(self.driver, 30).until(
- EC.presence_of_element_located((By.CSS_SELECTOR, '#tt > .panel td:nth-child(1) > input')))
- # page = driver.find_element(By.CSS_SELECTOR, '#tt > .panel td:nth-child(1) > input')
- page.clear()
- page.send_keys('999')
- page.send_keys(Keys.ENTER)
- time.sleep(1)
- WebDriverWait(self.driver, 30).until_not(EC.presence_of_element_located((By.CSS_SELECTOR, '.datagrid-mask-msg')))
- # input("page end")
- time.sleep(1)
- # time.sleep(10)
- content = self.driver.find_element(By.CSS_SELECTOR, '#tt > .panel .datagrid-view2 > .datagrid-body').get_attribute(
- "outerHTML")
- print(content)
- with open('work{}.txt'.format(str(int(time.time()))), 'wb') as f:
- f.write(content.encode('utf-8'))
- def openMaintitle(self, first):
- self.timerSingal()
- self.driver.switch_to.parent_frame()
- time.sleep(0.5)
- nav_lables = WebDriverWait(self.driver, 10).until(
- EC.presence_of_all_elements_located((By.CLASS_NAME, 'nav-label')))
- for i in nav_lables:
- print(i.text)
- title = i.text
- if title == first:
- self.actions.click(i)
- self.actions.perform()
- time.sleep(2)
- break
- def openSubTitle(self, second):
- self.timerSingal()
- self.driver.switch_to.parent_frame()
- subtitle = WebDriverWait(self.driver, 10).until(
- EC.presence_of_all_elements_located((By.CLASS_NAME, 'J_menuItem')))
- # subtitle = driver.find_elements(By.CLASS_NAME,'J_menuItem')
- for i in subtitle:
- title = i.text
- print(title)
- if title == second:
- self.actions.click(i)
- self.actions.perform()
- time.sleep(1)
- break
- def getActiveTitle(self):
- self.timerSingal()
- self.driver.switch_to.parent_frame()
- activeTitle = self.driver.find_element(By.CSS_SELECTOR, ".active > span")
- print(activeTitle.text)
- return activeTitle.text
- def changeToTitle(self, title):
- self.timerSingal()
- self.driver.switch_to.parent_frame()
- activeTitle = self.getActiveTitle()
- try:
- self.driver.find_element(By.CSS_SELECTOR, '.J_menuTab:nth-child(2) > span')
- except:
- try:
- self.driver.find_element(By.CSS_SELECTOR, '.J_menuTab:nth-child(3) > span')
- except:
- print('未找到 {}'.format(title))
- def getWOData(self):
- self.timerSingal()
- self.reStartWebDrive()
- self.isLogining()
- if self.activeMode=="工作指令(WO)管理":
- search = self.driver.find_element(By.CSS_SELECTOR, 'td:nth-child(10) > .searchBtn > span')
- self.actions.click(search)
- self.actions.perform()
- WebDriverWait(self.driver, 30).until_not(
- EC.presence_of_element_located((By.CSS_SELECTOR, '.datagrid-mask-msg')))
- else:
- fr = WebDriverWait(self.driver, 30).until(EC.presence_of_all_elements_located((By.TAG_NAME, "iframe")))
- print(len(fr))
- self.driver.switch_to.frame(1)
- jd = WebDriverWait(self.driver, 30).until(
- EC.presence_of_element_located((By.CSS_SELECTOR, '.tdr:nth-child(8) .textbox-icon')))
- # jd = driver.find_element(By.CSS_SELECTOR, '.tdr:nth-child(8) .textbox-icon')
- self.actions.click(jd)
- self.actions.perform()
- time.sleep(1)
- tf = self.driver.find_element(By.ID, '_easyui_combobox_i1_1')
- self.actions.click(tf)
- self.actions.perform()
- time.sleep(1)
- search = self.driver.find_element(By.CSS_SELECTOR, 'td:nth-child(10) > .searchBtn > span')
- self.actions.click(search)
- self.actions.perform()
- WebDriverWait(self.driver, 30).until_not(
- EC.presence_of_element_located((By.CSS_SELECTOR, '.datagrid-mask-msg')))
- page = WebDriverWait(self.driver, 30).until(
- EC.presence_of_element_located((By.CSS_SELECTOR, '#tt > .panel td:nth-child(1) > input')))
- # page = driver.find_element(By.CSS_SELECTOR, '#tt > .panel td:nth-child(1) > input')
- page.clear()
- page.send_keys('999')
- page.send_keys(Keys.ENTER)
- time.sleep(1)
- WebDriverWait(self.driver, 30).until_not(
- EC.presence_of_element_located((By.CSS_SELECTOR, '.datagrid-mask-msg')))
- # input("page end")
- time.sleep(1)
- # time.sleep(10)
- self.activeMode="工作指令(WO)管理"
- content = self.driver.find_element(By.CSS_SELECTOR,
- '#tt > .panel .datagrid-view2 > .datagrid-body').get_attribute(
- "outerHTML")
- print(content)
- with open('work{}.txt'.format(str(int(time.time()))), 'wb') as f:
- f.write(content.encode('utf-8'))
- soup = BeautifulSoup(content, 'html5lib')
- data_list = []
- try:
- for idx, tr in enumerate(soup.find_all('tr')):
- tds = tr.find_all('td')
- data_list.append({
- '状态': tds[3].contents[0].contents[0],
- '机号': tds[4].contents[0].contents[0],
- '时间': tds[8].contents[0].contents[0].replace('\n', '').replace(' ', ''),
- '内容': tds[10].contents[0].contents[0].replace('\n', '').replace(' ', '')
- })
- except IndexError as e:
- print(e)
- print(data_list)
- return data_list
- def getFlightData(self, *args):
- self.timerSingal()
- self.reStartWebDrive('航班动态')
- self.isLogining('航班动态')
- self.driver.switch_to.parent_frame()
- fr = WebDriverWait(self.driver, 30).until(EC.presence_of_all_elements_located((By.TAG_NAME, "iframe")))
- self.driver.switch_to.frame(1)
- print(len(fr))
- WebDriverWait(self.driver, 30).until_not(
- EC.presence_of_element_located((By.CSS_SELECTOR, '.datagrid-mask-msg')))
- notview = self.driver.find_element(By.ID, 'notView')
- if notview.is_selected():
- print("取消勾选")
- self.actions.click(notview)
- self.actions.perform()
- time.sleep(1)
- searchbtn = self.driver.find_element(By.CSS_SELECTOR, '.searchBtn > span')
- self.actions.click(searchbtn)
- self.actions.perform()
- time.sleep(1)
- WebDriverWait(self.driver, 30).until_not(
- EC.presence_of_element_located((By.CSS_SELECTOR, '.datagrid-mask-msg')))
- if args !=():
- date = self.driver.find_element(By.CSS_SELECTOR, '.datebox > .textbox-text')
- date.clear()
- time.sleep(0.5)
- date.send_keys(args[0])
- time.sleep(0.5)
- self.actions.click(searchbtn)
- self.actions.perform()
- WebDriverWait(self.driver, 30).until_not(
- EC.presence_of_element_located((By.CSS_SELECTOR, '.datagrid-mask-msg')))
- flightDataContent = self.driver.find_element(By.CSS_SELECTOR,
- '.datagrid-view2 > .datagrid-body').get_attribute("outerHTML")
- print(flightDataContent)
- if args!=():
- with open('./static/flightdata/flight{}.txt'.format(args[0]),'wb') as f:
- f.write(flightDataContent.encode('utf-8'))
- else:
- with open('./static/flightdata/flight{}.txt'.format(time.strftime("%Y%m%d", time.localtime())),'wb') as f:
- f.write(flightDataContent.encode('utf-8'))
- soup = BeautifulSoup(flightDataContent, "html5lib")
- flight_list=[]
- for idx, tr in enumerate(soup.find_all('tr')):
- tds = tr.find_all('td')
- temp_list = []
- for i in tds:
- try:
- temp = i.contents[0].contents[0].contents[0]
- except:
- try:
- temp = i.contents[0].contents[0]
- except:
- temp = ""
- temp_list.append(temp)
- flight_list.append({
- "机号": temp_list[0],
- "机型": temp_list[1],
- "发动机": temp_list[2],
- "航班号": temp_list[3],
- "进港起飞": temp_list[4],
- "进港到达时间": temp_list[5],
- "航站": temp_list[6],
- "离港起飞时间": temp_list[7],
- "离港到达": temp_list[8],
- "进港机位": temp_list[9],
- "离港机位": temp_list[10],
- "保留": temp_list[11],
- "任务类型": temp_list[12],
- "随机": temp_list[13],
- "维修人员": temp_list[14],
- "放行人员": temp_list[15],
- "工作状态": temp_list[16],
- "工作状态时间": temp_list[17],
- "重要航班": temp_list[18],
- "备降/返航": temp_list[19],
- "延误": temp_list[20],
- "附加信息": temp_list[21],
- })
- print(flight_list)
- return flight_list
- # .datagrid-view2 > .datagrid-body 数据
- # .searchBtn > span 查询
- # .datagrid-mask-msg 查询中
- def searchFlight(self, *args):
- self.reStartWebDrive('航班动态')
- self.isLogining('航班动态')
- self.driver.switch_to.parent_frame()
- fr = WebDriverWait(self.driver, 30).until(EC.presence_of_all_elements_located((By.TAG_NAME, "iframe")))
- self.driver.switch_to.frame(1)
- print(len(fr))
- WebDriverWait(self.driver, 30).until_not(
- EC.presence_of_element_located((By.CSS_SELECTOR, '.datagrid-mask-msg')))
- notview = self.driver.find_element(By.ID, 'notView')
- if notview.is_selected():
- print("取消勾选")
- self.actions.click(notview)
- self.actions.perform()
- time.sleep(1)
- searchbtn = self.driver.find_element(By.CSS_SELECTOR, '.searchBtn > span')
- self.actions.click(searchbtn)
- self.actions.perform()
- time.sleep(1)
- WebDriverWait(self.driver, 30).until_not(
- EC.presence_of_element_located((By.CSS_SELECTOR, '.datagrid-mask-msg')))
- if args !=():
- date = self.driver.find_element(By.CSS_SELECTOR, '.datebox > .textbox-text')
- date.clear()
- time.sleep(0.5)
- date.send_keys(args[0])
- time.sleep(0.5)
- self.actions.click(searchbtn)
- self.actions.perform()
- WebDriverWait(self.driver, 30).until_not(
- EC.presence_of_element_located((By.CSS_SELECTOR, '.datagrid-mask-msg')))
- flightDataContent = self.driver.find_element(By.CSS_SELECTOR,
- '.datagrid-view2 > .datagrid-body').get_attribute("outerHTML")
- #print(flightDataContent)
- if args != ():
- with open('./static/flightdata/flight{}.txt'.format(args[0]), 'wb') as f:
- f.write(flightDataContent.encode('utf-8'))
- else:
- with open('./static/flightdata/flight{}.txt'.format(time.strftime("%Y%m%d", time.localtime())), 'wb') as f:
- f.write(flightDataContent.encode('utf-8'))
- soup = BeautifulSoup(flightDataContent, "html5lib")
- flight_list = []
- for idx, tr in enumerate(soup.find_all('tr')):
- tds = tr.find_all('td')
- temp_list = []
- for i in tds:
- try:
- temp = i.contents[0].contents[0].contents[0]
- except:
- try:
- temp = i.contents[0].contents[0]
- except:
- temp = ""
- temp_list.append(temp)
- flight_list.append({
- "机号": temp_list[0],
- "机型": temp_list[1],
- "发动机": temp_list[2],
- "航班号": temp_list[3],
- "进港起飞": temp_list[4],
- "进港到达时间": temp_list[5],
- "航站": temp_list[6],
- "离港起飞时间": temp_list[7],
- "离港到达": temp_list[8],
- "进港机位": temp_list[9],
- "离港机位": temp_list[10],
- "保留": temp_list[11],
- "任务类型": temp_list[12],
- "随机": temp_list[13],
- "维修人员": temp_list[14],
- "放行人员": temp_list[15],
- "工作状态": temp_list[16],
- "工作状态时间": temp_list[17],
- "重要航班": temp_list[18],
- "备降/返航": temp_list[19],
- "延误": temp_list[20],
- "附加信息": temp_list[21],
- })
- print(flight_list)
- return flight_list
- def isLogining(self, *args):
- self.timerSingal()
- try:
- notLogining = self.driver.find_element(By.CSS_SELECTOR, '.layui-layer-content')
- print(notLogining.text)
- btn = self.driver.find_element(By.CSS_SELECTOR, '.layui-layer-btn0')
- self.actions.click(btn)
- WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.ID, 'singleSubmit')))
- loginbtn = self.driver.find_element('id', 'singleSubmit')
- time.sleep(0.5)
- self.actions.click(loginbtn)
- self.actions.perform()
- WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.NAME, "j_username"))).send_keys(
- self.username)
- time.sleep(0.5)
- self.driver.find_element('name', "j_password").send_keys(self.password)
- time.sleep(0.5)
- self.driver.find_element('name', "j_password").send_keys(Keys.ENTER)
- # if self.activeMode == "工作指令(WO)管理":
- # self.openMaintitle("航线维修")
- # self.openSubTitle("工作指令(WO)管理")
- # self.activeMode = ""
- self.openMaintitle("航线维修")
- if args!=():
- self.openSubTitle(args[0])
- else:
- self.openSubTitle("工作指令(WO)管理")
- except exceptions.NoSuchElementException as e:
- print("已登录")
- print(e)
- def quitWeb(self):
- self.driver.quit()
- self.driver = None
- def webDriveIsAvailable(self):
- object_existed = False
- if self.driver is not None:
- try:
- self.driver.execute_script('javascript:void(0);')
- object_existed = True
- except:
- # webdriver要求浏览器执行Javascript出现异常
- try:
- self.driver.quit()
- finally:
- self.driver = None
- finally:
- return object_existed
- def reStartWebDrive(self, *args):
- if not self.webDriveIsAvailable():
- try:
- self.driver = webdriver.Edge()
- self.actions = ActionChains(self.driver)
- except:
- self.driver = webdriver.Chrome()
- self.actions = ActionChains(self.driver)
- self.openAmro()
- self.openMaintitle("航线维修")
- if args!=():
- self.openSubTitle(args[0])
- else:
- self.openSubTitle("工作指令(WO)管理")
- self.activeMode = ""
- def timerSingal(self):
- print("发送指令---")
- if self.webDriveIsAvailable():
- print("发送持续指令")
- self.q.put("cz")
- else:
- print("发送重启指令")
- self.q2.put("cz2")
- self.reStartWebDrive()
- def directSendMsg(self, msg, atuser):
- timestamp = str(round(time.time() * 1000))
- secret = 'SEC185f68b4fbce62bb7cfc72ef3d84444e7b9781e602716af36c50bf52c8cee791'
- secret_enc = secret.encode('utf-8')
- string_to_sign = '{}\n{}'.format(timestamp, secret)
- string_to_sign_enc = string_to_sign.encode('utf-8')
- hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
- sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
- # print(timestamp)
- # print(sign)
- headers = {'Content-Type': 'application/json'} # 定义数据类型
- webhook = 'https://oapi.dingtalk.com/robot/send?access_token=56a4674577a0f034fda2ec99cb5e8c31d9e2cb53151cd019691ce62430e9da39×tamp=' + timestamp + "&sign=" + sign
- atTittle = ""
- for n in atuser:
- atTittle += "@{}".format(n)
- data = {
- "msgtype": "markdown",
- "markdown": {
- "title": "定时通知",
- "text": "{}{}".format(atTittle, msg)
- },
- "at": {
- "atMobiles": atuser,
- "isAtAll": False
- }
- }
- res = requests.post(webhook, data=json.dumps(data), headers=headers) # 发送post请求
- print(res.text)
- def searchJob(self):
- nmsg = ""
- data_list = self.getWOData()
- for i in data_list:
- nmsg += self.createMsg(i['机号'], i['状态'], i['时间'], i['内容'])
- self.directSendMsg(nmsg, ['13032201605'])
- def createMsg(self, bnum, state, times, cont):
- MSG = """
- #### 机号:{}
- ##### 状态:{}
- ##### 时间:{}
- ##### 内容:{}
- ###### ------------------------
- """.format(bnum, state, times, cont)
- return MSG
- if __name__ == '__main__':
- AMRO = amro()
- # AMRO.openWeb('https://www.baidu.com')
- # print(AMRO.webDriveIsAvailable())
- # AMRO.quitWeb()
- # print(AMRO.webDriveIsAvailable())
- AMRO.openAmro()
- AMRO.openMaintitle("航线维修")
- #AMRO.openSubTitle("工作指令(WO)管理")
- AMRO.openSubTitle("航班动态")
- AMRO.getFlightData()
- #AMRO.WO()
- #time.sleep(5)
- #AMRO.quitWeb()
|