commit 5989d8800253837edeb794bcd50ad344bf3ef8ea Author: Nathan Windisch Date: Mon Jul 22 01:58:04 2024 +0100 Initial commit. diff --git a/README.md b/README.md new file mode 100644 index 0000000..f6a5bd6 --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# open-webui tool: reddit + +A tool to interact with reddit.com, including subreddit popular feeds and user recent feeds. \ No newline at end of file diff --git a/src/reddit.py b/src/reddit.py new file mode 100644 index 0000000..da65b78 --- /dev/null +++ b/src/reddit.py @@ -0,0 +1,210 @@ +""" +title: Reddit Feeds +author: @nathanwindisch +author_url: https://github.com/nathanwindisch +funding_url: https://patreon.com/NathanWindisch +version: 0.0.1 +changelog: +- 0.0.1 - Initial upload to openwebui community. +""" + + +import re +import json +import requests +from typing import Awaitable, Callable +from pydantic import BaseModel, Field +from requests.models import Response + + +def parse_reddit_page(response: Response): + data = json.loads(response.content) + output = [] + if "data" not in data: return output + if "children" not in data["data"]: return output + for item in data["data"]["children"]: output.append(item) + return output + + +def parse_posts(data: list): + posts = [] + for item in data: + if item["kind"] != "t3": continue # skip non-post items + item = item["data"] + posts.append({ + "id": item["name"], + "title": item["title"], + "description": item["selftext"], + "link": item["url"], + + "author_username": item["author"], + "author_id": item["author_fullname"], + "subreddit_name": item["subreddit"], + "subreddit_id": item["subreddit_id"], + "subreddit_subscribers": item["subreddit_subscribers"], + + "score": item["score"], + "upvotes": item["ups"], + "downvotes": item["downs"], + "upvote_ratio": item["upvote_ratio"], + "total_comments": item["num_comments"], + "total_crossposts": item["num_crossposts"], + "total_awards": item["total_awards_received"], + "domain": item["domain"], + "flair_text": item["link_flair_text"], + "media_embed": item["media_embed"], + + "is_pinned": item["pinned"], + "is_self": item["is_self"], + "is_video": item["is_video"], + "is_media_only": item["media_only"], + "is_over_18": item["over_18"], + "is_edited": item["edited"], + "is_hidden": item["hidden"], + "is_archived": item["archived"], + "is_locked": item["locked"], + "is_quarantined": item["quarantine"], + "is_spoiler": item["spoiler"], + "is_stickied": item["stickied"], + "is_send_replies": item["send_replies"], + + "published_at": item["created_utc"], + }) + return posts + + +def parse_comments(data: list): + comments = [] + for item in data: + if item["kind"] != "t1": continue # skip non-comment items + item = item["data"] + comments.append({ + "id": item["name"], + "body": item["body"], + "link": item["permalink"], + "post_id": item["link_id"], + "post_title": item["link_title"], + "post_link": item["link_permalink"], + + "author_username": item["author"], + "author_id": item["author_fullname"], + "subreddit_name": item["subreddit"], + "subreddit_id": item["subreddit_id"], + + "score": item["score"], + "upvotes": item["ups"], + "downvotes": item["downs"], + "total_comments": item["num_comments"], + "total_awards": item["total_awards_received"], + "is_edited": item["edited"], + "is_archived": item["archived"], + "is_locked": item["locked"], + "is_quarantined": item["quarantine"], + "is_stickied": item["stickied"], + "is_send_replies": item["send_replies"], + "published_at": item["created_utc"], + }) + return comments + + +class Tools: + def __init__(self): pass + + class UserValves(BaseModel): + USER_AGENT: str = Field( + default="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36", + description="The user agent to use when making requests to Reddit." + ) + + async def get_subreddit_feed( + self, + subreddit: str, + __event_emitter__: Callable[[dict], Awaitable[None]], + __user__: dict = {}, + ) -> str: + """ + Get the latest posts from a subreddit, as an array of JSON objects with the following properties: 'id', 'title', 'description', 'link', 'author_username', 'author_id', 'subreddit_name', 'subreddit_id', 'subreddit_subscribers', 'score', 'upvotes', 'downvotes', 'upvote_ratio', 'total_comments', 'total_crossposts', 'total_awards', 'domain', 'flair_text', 'media_embed', 'is_pinned', 'is_self', 'is_video', 'is_media_only', 'is_over_18', 'is_edited', 'is_hidden', 'is_archived', 'is_locked', 'is_quarantined', 'is_spoiler', 'is_stickied', 'is_send_replies', 'published_at'. + :param subreddit: The subreddit to get the latest posts from. + :return: A list of posts with the previously mentioned properties, or an error message. + """ + headers = { "User-Agent": __user__["valves"].USER_AGENT } + await __event_emitter__({ "data": { "description": f"Starting retrieval for r/{subreddit}'s Reddit Feed...", "status": "in_progress", "done": False }, "type": "status" }) + + if subreddit == "": + await __event_emitter__({ "data": { "description": f"Error: No subreddit provided.", "status": "complete", "done": True }, "type": "status" }) + return "Error: No subreddit provided" + subreddit = subreddit.replace("/r/", "").replace("r/", "") + + if not re.match(r"^[A-Za-z0-9_]{2,21}$", subreddit): + await __event_emitter__({ "data": { "description": f"Error: Invalid subreddit name '{subreddit}' (either too long or two short).", "status": "complete", "done": True }, "type": "status" }) + return "Error: Invalid subreddit name" + + try: + response = requests.get(f"https://reddit.com/r/{subreddit}.json", headers=headers) + + if not response.ok: + await __event_emitter__({ "data": { "description": f"Error: Failed to retrieve r/{subreddit}'s Reddit Feed: {response.status_code}.", "status": "complete", "done": True }, "type": "status" }) + return f"Error: {response.status_code}" + else: + output = parse_posts(parse_reddit_page(response)) + await __event_emitter__({ "data": { "description": f"Retrieved {len(output)} posts from r/{subreddit}'s Reddit Feed.", "status": "complete", "done": True }, "type": "status" }) + return json.dumps(output) + except Exception as e: + await __event_emitter__({ "data": { "description": f"Failed to retrieve any posts from r/{subreddit}'s Reddit Feed: {e}.", "status": "complete", "done": True }, "type": "status" }) + return f"Error: {e}" + + + async def get_user_feed( + self, + username: str, + __event_emitter__: Callable[[dict], Awaitable[None]], + __user__: dict = {}, + ) -> str: + """ + Get the latest posts from a given user, as a JSON object with an array of 'post' objects with the following properties: 'id', 'title', 'description', 'link', 'author_username', 'author_id', 'subreddit_name', 'subreddit_id', 'subreddit_subscribers', 'score', 'upvotes', 'downvotes', 'upvote_ratio', 'total_comments', 'total_crossposts', 'total_awards', 'domain', 'flair_text', 'media_embed', 'is_pinned', 'is_self', 'is_video', 'is_media_only', 'is_over_18', 'is_edited', 'is_hidden', 'is_archived', 'is_locked', 'is_quarantined', 'is_spoiler', 'is_stickied', 'is_send_replies', 'published_at'. + Additionally, the resultant object will also contain an array of 'comment' objects with the following properties: 'id', 'body', 'link', 'post_id', 'post_title', 'post_link', 'author_id', 'post_author_username', 'subreddit_name', 'subreddit_id', 'subreddit_subscribers', 'score', 'upvotes', 'downvotes', 'total_comments', 'total_awards', 'is_edited', 'is_archived', 'is_locked', 'is_quarantined', 'is_stickied', 'is_send_replies', 'published_at'. + :param username: The username to get the latest posts from. + :return: A object with list of posts and a list of comments (both with the previously mentioned properties), or an error message. + """ + headers = { "User-Agent": __user__["valves"].USER_AGENT } + await __event_emitter__({ "data": { "description": f"Starting retrieval for u/{username}'s Reddit Feed...", "status": "in_progress", "done": False }, "type": "status" }) + + if username == "": + await __event_emitter__({ "data": { "description": f"Error: No username provided.", "status": "complete", "done": True }, "type": "status" }) + return "Error: No username provided." + username = username.replace("/u/", "").replace("u/", "") + + if not re.match(r"^[A-Za-z0-9_]{3,20}$", username): + await __event_emitter__({ "data": { "description": f"Error: Invalid username '{username}' (either too long or two short).", "status": "complete", "done": True }, "type": "status" }) + return "Error: Invalid username." + + try: + response = requests.get(f"https://reddit.com/u/{username}.json", headers=headers) + + if not response.ok: + await __event_emitter__({ "data": { "description": f"Error: Failed to retrieve u/{username}'s Reddit Feed: {response.status_code}.", "status": "complete", "done": True }, "type": "status" }) + return f"Error: {response.status_code}" + else: + page = parse_reddit_page(response) # user pages can have both posts and comments. + posts = parse_posts(page) + comments = parse_comments(page) + await __event_emitter__({ "data": { "description": f"Retrieved {len(posts)} posts and {len(comments)} comments from u/{username}'s Reddit Feed.", "status": "complete", "done": True }, "type": "status" }) + return json.dumps({ "posts": posts, "comments": comments }) + except Exception as e: + await __event_emitter__({ "data": { "description": f"Failed to retrieve any posts from u/{username}'s Reddit Feed: {e}.", "status": "complete", "done": True }, "type": "status" }) + return f"Error: {e}" + + + +async def main(): + __user__ = { + "valves": Tools.UserValves( + USER_AGENT="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36" + ) + } + async def mock_event_emitter(event: dict): print("Event Emitted:", event["data"]) + await Tools().get_subreddit_feed("sysadmin", mock_event_emitter, __user__) + await Tools().get_user_feed("natfan", mock_event_emitter, __user__) + +import asyncio +if __name__ == "__main__": asyncio.run(main()) \ No newline at end of file