本帖最後由 杜龍 於 2023-12-24 18:09 編輯
疫情果時我整左個program fetch 強檢大廈提示send message, 研究過whatsapp但要收錢, 最後用, telegram bot 做.- from tabula import read_pdf, convert_into
- from tabulate import tabulate
- from PyPDF2 import PdfFileReader
- from commonfunction import *
- from telethon.sync import TelegramClient
- from telethon.tl.types import InputPeerUser, InputPeerChannel
- from telethon import TelegramClient, sync, events
- from os.path import exists
- # get your api_id, api_hash, token
- # from telegram as described above
- api_id = 'xx'
- api_hash = 'xxx'
- message = "Working..."
- #BuildingPdfURL = 'https://www.chp.gov.hk/files/pdf/ctn.pdf'
- class ClsCTNPDF:
- def getDownloadPDFLocation(self):
- return self.pdfldownloadpath
- def __init__(self):
- self.pdfldownloadpath = f'/config/workspaces/WorkspacePython/PDFScrape/pdf/restriction-testing{gettime()}.pdf'
- self._building_pdf_url = Config().BuildingPdfURL
- self.scheduleuser = AppDBRepo().getAllChatIDandSchedule()
- logger.info('ini')
- def downloadpdf(self):
- if not exists(self.pdfldownloadpath):
- command = ["wget", "-O",
- "{}".format(self.pdfldownloadpath), "{}".format(self._building_pdf_url)]
- p = subprocess.Popen(command, stdout=subprocess.PIPE)
- out, err = p.communicate()
- logger.info("download out:{0}".format(out))
- if err:
- logger.error(err)
- return False
- return True
- def ConvertPDFtoJson(self):
- # reads table from pdf file
- # tables = read_pdf("abc.pdf",pages="all") #address of pdf file
- jsonpath = f'/config/workspaces/WorkspacePython/PDFScrape/pdf/ctn{gettime()}.json'
- convert_into(self.pdfldownloadpath, jsonpath,
- output_format="json", pages='all')
- jsonfile = open(jsonpath)
- self.data = json.load(jsonfile)
- jsonfile.close()
- return self.data
- def ExtractBuildingList(self, maindistrict, subdistrict):
- # if self.data is None:
- maindistrict = maindistrict.replace("區", "")
- subdistrict = subdistrict.replace("區", "")
- logger.info("filter district main district:{0}, sub district:{1}".format(
- maindistrict, subdistrict))
- self.downloadpdf()
- self.ConvertPDFtoJson()
- building_list = []
- for _d in self.data:
- for _i in _d['data']:
- addr = _i[1]['text']
- if len(addr) > 0 and (maindistrict in addr or (subdistrict in addr)):
- building_list.append(addr.replace('\r', ' '))
- self.building_list = building_list
- return self.building_list
- def getScheduleUserByChatID(self, chatid):
- return self.getTblScheduleByChatID(chatid)[0].tblUser
- def getTblScheduleByChatID(self, chatid):
- return list(filter(lambda s: s.tblUser.chat_id == chatid, self.scheduleuser))
- def sendTelegram(self, chatid):
- objuser = self.getScheduleUserByChatID(chatid)
- logger.info(
- f"Handling telegram msg for USER:{objuser.user_name}/chat id:({objuser.chat_id})")
- building_list = self.ExtractBuildingList(
- objuser.main_district, objuser.sub_district)
- if len(building_list) == 0:
- logger.info("===No building in main district:{0} or subdistrict: {1}, will not post message====".format(
- objuser.main_district, objuser.user_name))
- else:
- _greeting = f"{objuser.user_name.upper()} 您好!\r\n根據香港衞生防護中心最新的公告({getLocalTime().strftime('%Y-%B-%d %H:%M:%S')})\r\n{self._building_pdf_url}\r\n\r\n在您登記的地區 :\r\n{objuser.main_district}/{objuser.sub_district}\r\n\r\n以下是需要進行新冠病毒強檢的大廈名單 :\r\n"
- _strBuildings = _greeting + ("\r\n".join(building_list))
- logger.info(_strBuildings)
- dopost = True
- if dopost:
- SEND_URL = f'https://api.telegram.org/bot{Config().myToken}/sendMessage'
- requests.post(SEND_URL, json={
- 'chat_id': chatid, 'text': _strBuildings})
- def checkScheduleAndSendMsg(self):
- logger.info('======Schedule job started at %s=========',
- getLocalTime())
- chatids = set([int(s.tblUser.chat_id)
- for s in self.scheduleuser])
- logger.info('chatids:{0}'.format(chatids))
- for chatid in chatids:
- sch = self.getTblScheduleByChatID(chatid)
- objuser = sch[0].tblUser
- LstSchedule = set(int(s.tblSchedule.schedule.strftime("%H"))
- for s in sch)
- currenthour = getCurrentHour()
- logger.info('Schedules for user {0}({1}):{2}, current hour:{3}'.format(
- objuser.user_name, chatid, LstSchedule, currenthour))
- #logger.info("current hour: {0}".format(currenthour))
- if (currenthour in LstSchedule):
- # if (currenthour in LstSchedule) or True:
- logger.info(
- '===========Current hour matches schedule============')
- self.sendTelegram(chatid)
- else:
- logger.info("======Current hour not in user:{0}'s schedule=====".format(
- objuser.user_name))
- try:
- ClsCTNPDF().checkScheduleAndSendMsg()
- except Exception as e:
- logging.error('Error', exc_info=e)
- # your phone number
- phone = '+85212347897'
複製代碼 |